kagamihogeの日記

kagamihogeの日記です。

spring-batchのPartitioningをためす

https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#partitioning のPartitioningを試す。

ソースコード

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    runtimeOnly 'com.h2database:h2'
}
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.util.NumberUtils;

@EnableBatchProcessing
@SpringBootApplication
public class Application {
    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) throws IOException {
        return jobBuilderFactory
                .get("sampleJob")
                .start(stepMaster(stepBuilderFactory))
                .build();
    }

    @Bean
    public Step stepMaster(StepBuilderFactory stepBuilderFactory) throws IOException {
        return stepBuilderFactory
                .get("step1.master")
                .partitioner("step1", partitioner())
                .step(step1(stepBuilderFactory))
                .gridSize(10)
                .taskExecutor(new SimpleAsyncTaskExecutor("spring_batch"))
                .build();
    }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory) throws IOException {
        return stepBuilderFactory
                .get("step1")
                .<Integer, Integer>chunk(2)
                .reader(itemReader(null)).writer(itemWriter())
                .build();
    }

    @Bean
    public Partitioner partitioner() throws IOException {
        MultiResourcePartitioner p = new MultiResourcePartitioner();
        PathMatchingResourcePatternResolver resolover = new PathMatchingResourcePatternResolver();
        p.setResources(resolover.getResources("classpath:data/file*.csv"));
        return p;
    }

    @StepScope
    @Bean
    public FlatFileItemReader<Integer> itemReader(@Value("#{stepExecutionContext['fileName']}") Resource resource)
            throws IOException {
        FlatFileItemReader<Integer> reader = new FlatFileItemReader<Integer>();
        DefaultLineMapper<Integer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(new DelimitedLineTokenizer(","));
        lineMapper.setFieldSetMapper(fieldSet -> NumberUtils.parseNumber(fieldSet.getValues()[0],Integer.class));

        reader.setLineMapper(lineMapper);
        reader.setResource(resource);
        return reader;
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return (items) -> items.forEach(System.out::println);
    }

    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }
}

クラスパス下にdata/file*.csvを作成する。具体的にはdata/file1.csv, data/file2.csv, data/file3.csvなど。中身は一列数値だけにしておく。例えば、以下のような感じ。

1
1
1

メモ

Partitioner partitioner()がキモ。stepをどういうアルゴリズムパーティション化するか、をここで定義する。Partitionerインタフェースで実装する。たとえば、適当な範囲で区切られた値・主キー範囲・ユニークなファイル名リスト、など。

spring-batchにはそのインタフェース実装としてMultiResourcePartitionerがある。このクラスは、ファイルのリストを与えると各ファイルごとにパーティションを一つ割り当てる、という実装。また、各パーティションが処理すべきファイル名はstepExecutionContextfileNameというキー名(変更可)で保存する。なので、後続処理はそのファイル名で処理を行うことになる。

reader定義であるitemReaderの引数は@Value("#{stepExecutionContext['fileName']}"となっており、上述のファイル名を取得している。@Value...の詳細は、spring-batchのlate bindingsという機能で、詳細はこちら。SpEL式でstepExecutionContext['fileName']と記述することで、spring-batchのstepのExecutionContextにアクセスしてキー名fileNameの値を取得している。なお、@StepScopeでスコープを変更する必要がある。以上により、パーティション化されたそれぞれのステップにそれぞれが処理すべきファイル名が渡される。

これを実行すると、ただ単にファイルから読みだした値を出力するだけだが、以下のようになる。なお、各ファイルの中身はすべて同じ数値のものを3つ用意した状態で実行している。すべて1・すべて2・すべて3、の3ファイルで実行している。

Job: [SimpleJob: [name=sampleJob]] launched with the following parameters: [{}]
Executing step: [step1.master]
3
3
2
2
1
1
1
1
3
3
2
2
1
2
Step: [step1:partition0] executed in 91ms
3
Step: [step1:partition1] executed in 98ms
Step: [step1:partition2] executed in 98ms
Step: [step1.master] executed in 131ms

3ファイルに対応してパーティション化された3つのstepが実行されているのがわかる。また、マルチスレッドなので出力はバラバラになる。

はまりどころ

Reader must be open before it can be read