kagamihogeの日記

kagamihogeの日記です。

spring-batchのParallel Stepsをためす

https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#scalabilityParallelSteps のパラレルstepを試す。http://kagamihoge.hatenablog.com/entry/2020/01/07/110847 は単一のstepをマルチスレッド化する方法だが、こちらは複数のstepをパラレルに実行する方法となる。

というわけで、とりあえず使ってみる。以下は、2つのパラレルなstepがあり、それぞれのstepで1-500・501-1000の合計を出し、最後にそれらを足し合わせる、というもの。あまり意味のあるサンプルではないが、その辺は勘弁願いたい。

@SpringBootApplication
@EnableBatchProcessing
public class App {
    static AtomicInteger sum = new AtomicInteger(0);
    
    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
            .start(splitFlow(steps))
            .build()
            .listener(new JobExecutionListener() {
                @Override
                public void beforeJob(JobExecution jobExecution) {}
                
                @Override
                public void afterJob(JobExecution jobExecution) {
                    System.out.println("sum="+sum);
                }
            })
            .build();
    }
    
    @Bean
    public Flow splitFlow(StepBuilderFactory steps) {
        return new FlowBuilder<SimpleFlow>("splitFlow")
            .split(taskExecutor())
            .add(
              flow(steps, "flow1", 1, 501), 
              flow(steps, "flow2", 501, 1001))
            .build();
    }
    
    public Flow flow(StepBuilderFactory steps, String name, int start, int end) {
        return new FlowBuilder<SimpleFlow>(name)
            .start(step(steps, name, start, end))
            .build();
    }
    
    @Transactional
    public Step step(StepBuilderFactory steps, String flowname, int start, int end) {
        List<Integer> list = IntStream.range(start, end).boxed().collect(Collectors.toList());
        ItemReader<Integer> reader = new MyItemReader(new ListItemReader<Integer>(list));
        
        MyItemWriter writer = new MyItemWriter();
        TaskletStep step = steps
                .get(flowname + "step1")
                .<Integer ,Integer>chunk(10)
                .reader(reader)
                .writer(writer)
                .build();
        
        return step;
    }
        
    @Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor("spring_batch");
    }

    public static void main(String[] args) {
        new SpringApplicationBuilder(App.class).run(args);
    }

}
public class MyItemWriter implements ItemWriter<Integer>, StepExecutionListener {

    int sum = 0;
    @Override
    public void write(List<? extends Integer> items) throws Exception {
        items.forEach(i -> {
            sum += i;
        });
    }
    @Override
    public void beforeStep(StepExecution stepExecution) { }
    
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("step sum=" + sum);
        App.sum.addAndGet(sum);
        
        return stepExecution.getExitStatus();
    }

}

これを実行すると以下のような実行結果が得られる。

Executing step: [flow2step1]
Executing step: [flow1step1]
step sum=125250
Step: [flow1step1] executed in 116ms
step sum=375250
Step: [flow2step1] executed in 116ms
sum=500500