https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#scalabilityParallelSteps のパラレルstepを試す。http://kagamihoge.hatenablog.com/entry/2020/01/07/110847 は単一のstepをマルチスレッド化する方法だが、こちらは複数のstepをパラレルに実行する方法となる。
というわけで、とりあえず使ってみる。以下は、2つのパラレルなstepがあり、それぞれのstepで1-500・501-1000の合計を出し、最後にそれらを足し合わせる、というもの。あまり意味のあるサンプルではないが、その辺は勘弁願いたい。
@SpringBootApplication @EnableBatchProcessing public class App { static AtomicInteger sum = new AtomicInteger(0); @Bean public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) { return jobs.get("job") .start(splitFlow(steps)) .build() .listener(new JobExecutionListener() { @Override public void beforeJob(JobExecution jobExecution) {} @Override public void afterJob(JobExecution jobExecution) { System.out.println("sum="+sum); } }) .build(); } @Bean public Flow splitFlow(StepBuilderFactory steps) { return new FlowBuilder<SimpleFlow>("splitFlow") .split(taskExecutor()) .add( flow(steps, "flow1", 1, 501), flow(steps, "flow2", 501, 1001)) .build(); } public Flow flow(StepBuilderFactory steps, String name, int start, int end) { return new FlowBuilder<SimpleFlow>(name) .start(step(steps, name, start, end)) .build(); } @Transactional public Step step(StepBuilderFactory steps, String flowname, int start, int end) { List<Integer> list = IntStream.range(start, end).boxed().collect(Collectors.toList()); ItemReader<Integer> reader = new MyItemReader(new ListItemReader<Integer>(list)); MyItemWriter writer = new MyItemWriter(); TaskletStep step = steps .get(flowname + "step1") .<Integer ,Integer>chunk(10) .reader(reader) .writer(writer) .build(); return step; } @Bean public TaskExecutor taskExecutor(){ return new SimpleAsyncTaskExecutor("spring_batch"); } public static void main(String[] args) { new SpringApplicationBuilder(App.class).run(args); } }
public class MyItemWriter implements ItemWriter<Integer>, StepExecutionListener { int sum = 0; @Override public void write(List<? extends Integer> items) throws Exception { items.forEach(i -> { sum += i; }); } @Override public void beforeStep(StepExecution stepExecution) { } @Override public ExitStatus afterStep(StepExecution stepExecution) { System.out.println("step sum=" + sum); App.sum.addAndGet(sum); return stepExecution.getExitStatus(); } }
これを実行すると以下のような実行結果が得られる。
Executing step: [flow2step1] Executing step: [flow1step1] step sum=125250 Step: [flow1step1] executed in 116ms step sum=375250 Step: [flow2step1] executed in 116ms sum=500500