https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#partitioning のPartitioningを試す。
ソースコード
plugins { id 'org.springframework.boot' version '2.2.2.RELEASE' id 'io.spring.dependency-management' version '1.0.8.RELEASE' id 'java' } dependencies { implementation 'org.springframework.boot:spring-boot-starter-batch' runtimeOnly 'com.h2database:h2' }
import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.partition.support.MultiResourcePartitioner; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.Bean; import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.util.NumberUtils; @EnableBatchProcessing @SpringBootApplication public class Application { @Bean public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) throws IOException { return jobBuilderFactory .get("sampleJob") .start(stepMaster(stepBuilderFactory)) .build(); } @Bean public Step stepMaster(StepBuilderFactory stepBuilderFactory) throws IOException { return stepBuilderFactory .get("step1.master") .partitioner("step1", partitioner()) .step(step1(stepBuilderFactory)) .gridSize(10) .taskExecutor(new SimpleAsyncTaskExecutor("spring_batch")) .build(); } @Bean public Step step1(StepBuilderFactory stepBuilderFactory) throws IOException { return stepBuilderFactory .get("step1") .<Integer, Integer>chunk(2) .reader(itemReader(null)).writer(itemWriter()) .build(); } @Bean public Partitioner partitioner() throws IOException { MultiResourcePartitioner p = new MultiResourcePartitioner(); PathMatchingResourcePatternResolver resolover = new PathMatchingResourcePatternResolver(); p.setResources(resolover.getResources("classpath:data/file*.csv")); return p; } @StepScope @Bean public FlatFileItemReader<Integer> itemReader(@Value("#{stepExecutionContext['fileName']}") Resource resource) throws IOException { FlatFileItemReader<Integer> reader = new FlatFileItemReader<Integer>(); DefaultLineMapper<Integer> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(new DelimitedLineTokenizer(",")); lineMapper.setFieldSetMapper(fieldSet -> NumberUtils.parseNumber(fieldSet.getValues()[0],Integer.class)); reader.setLineMapper(lineMapper); reader.setResource(resource); return reader; } @Bean public ItemWriter<Integer> itemWriter() { return (items) -> items.forEach(System.out::println); } public static void main(String[] args) { new SpringApplicationBuilder(Application.class).run(args); } }
クラスパス下にdata/file*.csv
を作成する。具体的にはdata/file1.csv
, data/file2.csv
, data/file3.csv
など。中身は一列数値だけにしておく。例えば、以下のような感じ。
1 1 1
メモ
Partitioner partitioner()
がキモ。stepをどういうアルゴリズムでパーティション化するか、をここで定義する。Partitioner
インタフェースで実装する。たとえば、適当な範囲で区切られた値・主キー範囲・ユニークなファイル名リスト、など。
spring-batchにはそのインタフェース実装としてMultiResourcePartitioner
がある。このクラスは、ファイルのリストを与えると各ファイルごとにパーティションを一つ割り当てる、という実装。また、各パーティションが処理すべきファイル名はstepExecutionContext
にfileName
というキー名(変更可)で保存する。なので、後続処理はそのファイル名で処理を行うことになる。
reader定義であるitemReader
の引数は@Value("#{stepExecutionContext['fileName']}"
となっており、上述のファイル名を取得している。@Value...
の詳細は、spring-batchのlate bindingsという機能で、詳細はこちら。SpEL式でstepExecutionContext['fileName']
と記述することで、spring-batchのstepのExecutionContextにアクセスしてキー名fileName
の値を取得している。なお、@StepScope
でスコープを変更する必要がある。以上により、パーティション化されたそれぞれのステップにそれぞれが処理すべきファイル名が渡される。
これを実行すると、ただ単にファイルから読みだした値を出力するだけだが、以下のようになる。なお、各ファイルの中身はすべて同じ数値のものを3つ用意した状態で実行している。すべて1・すべて2・すべて3、の3ファイルで実行している。
Job: [SimpleJob: [name=sampleJob]] launched with the following parameters: [{}] Executing step: [step1.master] 3 3 2 2 1 1 1 1 3 3 2 2 1 2 Step: [step1:partition0] executed in 91ms 3 Step: [step1:partition1] executed in 98ms Step: [step1:partition2] executed in 98ms Step: [step1.master] executed in 131ms
3ファイルに対応してパーティション化された3つのstepが実行されているのがわかる。また、マルチスレッドなので出力はバラバラになる。