kagamihogeの日記

kagamihogeの日記です。

spring-batchでグループ単位のレコード読み込み

https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/ja/Ch05_FileAccess.html によると、コントロールブレイク処理(またはキーブレイク処理)、と呼ぶものをspring-batchで実現する方法について。

コントロールブレイク処理(またはキーブレイク処理)とは、ソート済みのレコードを順次読み込み、 レコード内にある特定の項目(キー項目)が同じレコードを1つのグループとして処理する手法のことを指す。

https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/ja/Ch05_FileAccess.html より抜粋

ソースコード

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    runtimeOnly 'com.h2database:h2'
    compileOnly 'org.projectlombok:lombok'
}

いま、codeとvalueからなるレコードのリストがあるとして、同一codeの複数レコードを単一のリストにまとめたい、とする。

レコードのクラスはこんな感じ。listの役割については後述。

@Data
@NoArgsConstructor
public class MyRecord {
    String code;
    Integer value;
    List<MyRecord> list;
    
    public MyRecord(String code, Integer value) {
        super();
        this.code = code;
        this.value = value;
    }
}
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableBatchProcessing
public class App {
    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs
                .get("job")
                .start(step1(steps))
                .build();
    }
    
    public Step step1(StepBuilderFactory stepBuilderFactory) {
        ItemReader<MyRecord> reader = new ListItemReader<MyRecord>(
                Arrays.asList(
                        new MyRecord("1", 1),
                        new MyRecord("1", 2),
                        new MyRecord("1", 3),
                        new MyRecord("1", 4),
                        new MyRecord("1", 5),
                        new MyRecord("2", 6),
                        new MyRecord("2", 7),
                        new MyRecord("3", 8)
                        ));

        MySingleItemPeekableItemReader peekableReader = new MySingleItemPeekableItemReader();
        peekableReader.setDelegate(reader);
        
        return stepBuilderFactory
                .get("step1")
                .<MyRecord, MyRecord>chunk(2)
                .reader(peekableReader)
                .writer(l -> {
                    System.out.println(l.size());
                    l.forEach(System.out::println);})
                .build();
    }
    
    public static void main(String[] args) {
        new SpringApplicationBuilder(App.class).run(args);
    }
}

カギはSingleItemPeekableItemReaderを拡張したMySingleItemPeekableItemReaderで、そのソースコードは以下の通り。

import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.support.SingleItemPeekableItemReader;

public class MySingleItemPeekableItemReader extends SingleItemPeekableItemReader<MyRecord> {

    @Override
    public MyRecord read() throws Exception, UnexpectedInputException, ParseException {
        MyRecord item = super.read();

        if (item == null) {
            return null;
        }
        
        MyRecord group = new MyRecord();
        group.list = new ArrayList<MyRecord>();

        while (true) {
            MyRecord peek = super.peek();
            group.list.add(item);
            if (peek == null) {
                return group;
            }

            if (item.code.equals(peek.code)) {
                item = super.read();
            } else {
                // コードの切り替わり
                return group;
            }
        }
    }
}

まずSingleItemPeekableItemReaderについて。このクラスは"peek"(ちらっとのぞく)の名の通り、peekメソッドを呼ぶと次にreadするアイテムを得られる。これを利用して、codeが切り替わるタイミングを制御する。もし、codeが同じなら同一グループを意味するのでlistに追加し、異なればグループ終了を意味するのでreadの戻り値を返す。

これを実行すると以下のような出力結果になる。

2
[MyRecord(code=1, value=1, list=null), MyRecord(code=1, value=2, list=null), MyRecord(code=1, value=3, list=null), MyRecord(code=1, value=4, list=null), MyRecord(code=1, value=5, list=null)]
[MyRecord(code=2, value=6, list=null), MyRecord(code=2, value=7, list=null)]
1
[MyRecord(code=3, value=8, list=null)]

メモ

chunk(2)なので2グループごとにwriterが呼ばれる。大本のレコード数は8だが、readerでグループ化するのでread件数はそれとは異なってくる。

ここではreadはグルーピングしたリストを返しているが、グループごとの集計結果を返すようにしても良いと思う。readerがグループ集計結果を返し、writerでファイルとかに書き込む、というイメージ。

ただ、上のコードでちょっと微妙なのはグルーピングのリストをレコードMyRecordに入れちゃってる点。レコードを表すクラスにグループ化のプロパティが居るのは違和感が強い。PeekableItemReaderjavadocによると、ファイル読み込みで論理的には一行だが複数行にまたがっているものを集約する場合などに有用、とか書いてある。なので、異なるクラスを返すような処理は想定していない、のかもしれない。

と、いうわけで。SingleItemPeekableItemReaderをラップし、かつ、readはグループ化のクラスを返す、というクラスを自作しても良いかもしれない。

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.support.SingleItemPeekableItemReader;

public class CustomSingleItemPeekableItemReader implements ItemReader<List<MyRecord>> {

    SingleItemPeekableItemReader<MyRecord> reader;

    public CustomSingleItemPeekableItemReader(SingleItemPeekableItemReader<MyRecord> reader) {
        super();
        this.reader = reader;
    }

    @Override
    public List<MyRecord> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        MyRecord item = reader.read();

        if (item == null) {
            return null;
        }

        ArrayList<MyRecord> list = new ArrayList<MyRecord>();

        while (true) {
            MyRecord peek = reader.peek();
            list.add(item);
            if (peek == null) {
                return list;
            }

            if (item.code.equals(peek.code)) {
                item = reader.read();
            } else {
                return list;
            }
        }
    }
}

参考URL

spring-batchのPartitioningをためす

https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#partitioning のPartitioningを試す。

ソースコード

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    runtimeOnly 'com.h2database:h2'
}
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.util.NumberUtils;

@EnableBatchProcessing
@SpringBootApplication
public class Application {
    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) throws IOException {
        return jobBuilderFactory
                .get("sampleJob")
                .start(stepMaster(stepBuilderFactory))
                .build();
    }

    @Bean
    public Step stepMaster(StepBuilderFactory stepBuilderFactory) throws IOException {
        return stepBuilderFactory
                .get("step1.master")
                .partitioner("step1", partitioner())
                .step(step1(stepBuilderFactory))
                .gridSize(10)
                .taskExecutor(new SimpleAsyncTaskExecutor("spring_batch"))
                .build();
    }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory) throws IOException {
        return stepBuilderFactory
                .get("step1")
                .<Integer, Integer>chunk(2)
                .reader(itemReader(null)).writer(itemWriter())
                .build();
    }

    @Bean
    public Partitioner partitioner() throws IOException {
        MultiResourcePartitioner p = new MultiResourcePartitioner();
        PathMatchingResourcePatternResolver resolover = new PathMatchingResourcePatternResolver();
        p.setResources(resolover.getResources("classpath:data/file*.csv"));
        return p;
    }

    @StepScope
    @Bean
    public FlatFileItemReader<Integer> itemReader(@Value("#{stepExecutionContext['fileName']}") Resource resource)
            throws IOException {
        FlatFileItemReader<Integer> reader = new FlatFileItemReader<Integer>();
        DefaultLineMapper<Integer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(new DelimitedLineTokenizer(","));
        lineMapper.setFieldSetMapper(fieldSet -> NumberUtils.parseNumber(fieldSet.getValues()[0],Integer.class));

        reader.setLineMapper(lineMapper);
        reader.setResource(resource);
        return reader;
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return (items) -> items.forEach(System.out::println);
    }

    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }
}

クラスパス下にdata/file*.csvを作成する。具体的にはdata/file1.csv, data/file2.csv, data/file3.csvなど。中身は一列数値だけにしておく。例えば、以下のような感じ。

1
1
1

メモ

Partitioner partitioner()がキモ。stepをどういうアルゴリズムパーティション化するか、をここで定義する。Partitionerインタフェースで実装する。たとえば、適当な範囲で区切られた値・主キー範囲・ユニークなファイル名リスト、など。

spring-batchにはそのインタフェース実装としてMultiResourcePartitionerがある。このクラスは、ファイルのリストを与えると各ファイルごとにパーティションを一つ割り当てる、という実装。また、各パーティションが処理すべきファイル名はstepExecutionContextfileNameというキー名(変更可)で保存する。なので、後続処理はそのファイル名で処理を行うことになる。

reader定義であるitemReaderの引数は@Value("#{stepExecutionContext['fileName']}"となっており、上述のファイル名を取得している。@Value...の詳細は、spring-batchのlate bindingsという機能で、詳細はこちら。SpEL式でstepExecutionContext['fileName']と記述することで、spring-batchのstepのExecutionContextにアクセスしてキー名fileNameの値を取得している。なお、@StepScopeでスコープを変更する必要がある。以上により、パーティション化されたそれぞれのステップにそれぞれが処理すべきファイル名が渡される。

これを実行すると、ただ単にファイルから読みだした値を出力するだけだが、以下のようになる。なお、各ファイルの中身はすべて同じ数値のものを3つ用意した状態で実行している。すべて1・すべて2・すべて3、の3ファイルで実行している。

Job: [SimpleJob: [name=sampleJob]] launched with the following parameters: [{}]
Executing step: [step1.master]
3
3
2
2
1
1
1
1
3
3
2
2
1
2
Step: [step1:partition0] executed in 91ms
3
Step: [step1:partition1] executed in 98ms
Step: [step1:partition2] executed in 98ms
Step: [step1.master] executed in 131ms

3ファイルに対応してパーティション化された3つのstepが実行されているのがわかる。また、マルチスレッドなので出力はバラバラになる。

はまりどころ

Reader must be open before it can be read

spring-batchのRemote Chunkingをためす

https://docs.spring.io/spring-batch/docs/4.1.x/reference/html/spring-batch-integration.html#remote-chunking のRemote Chunkingをためす。

このサンプルは、Spring Integegration -> Active MQを介してMasterからworkerおprocessor + writerを呼び出す。

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    implementation 'org.springframework.boot:spring-boot-starter-integration'
    implementation 'org.springframework.integration:spring-integration-jms'
    implementation 'org.springframework.batch:spring-batch-integration'
    implementation 'org.springframework.boot:spring-boot-starter-artemis'
}

以下はMaster側。

package springbatchsample.remote.master;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import javax.jms.JMSException;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;

@EnableBatchProcessing
@SpringBootApplication
public class Application {    
    @Bean
    ActiveMQConnectionFactory connectionFactory() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL("tcp://localhost:61616");
        return factory;
    }
    
    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(requests())
                .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
                .get();
    }
    
    @Bean
    public QueueChannel replies() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
                .channel(replies())
                .get();
    }

    @Bean
    public ItemWriter<String> itemWriter() {
        MessagingTemplate messagingTemplate = new MessagingTemplate();
        messagingTemplate.setDefaultChannel(requests());
        messagingTemplate.setReceiveTimeout(2000);
        ChunkMessageChannelItemWriter<String> chunkMessageChannelItemWriter
                = new ChunkMessageChannelItemWriter<>();
        chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
        chunkMessageChannelItemWriter.setReplyChannel(replies());

        return chunkMessageChannelItemWriter;
    }
    
    @Bean
    public Job chunkJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        List<String> list = IntStream.range(0, 1001).mapToObj(Integer::toString).collect(Collectors.toList());
        ListItemReader<String> itemReader = new ListItemReader<String>(list);
        
        return jobBuilderFactory.get("personJob")
                 .start(stepBuilderFactory.get("step1")
                         .<String, String>chunk(200)
                         .reader(itemReader)
                         .writer(itemWriter())
                         .build())
                 .build();
    }
    
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }
}

次がworker側。

package springbatchsample.remote.worker;

import javax.jms.JMSException;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.step.item.ChunkProcessor;
import org.springframework.batch.core.step.item.SimpleChunkProcessor;
import org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;

@EnableBatchProcessing
@SpringBootApplication
public class Application {
    @Bean
    ActiveMQConnectionFactory connectionFactory() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL("tcp://localhost:61616");
        return factory;
    }
    
    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
                .channel(requests())
                .get();
    }
    
    @Bean
    public DirectChannel replies() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(replies())
                .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
                .get();
    }
    
    @Bean
    @ServiceActivator(inputChannel = "requests", outputChannel = "replies")
    public ChunkProcessorChunkHandler<String> chunkProcessorChunkHandler() {
        ChunkProcessor<String> chunkProcessor
                = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
        ChunkProcessorChunkHandler<String> chunkProcessorChunkHandler
                = new ChunkProcessorChunkHandler<>();
        chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
        return chunkProcessorChunkHandler;
    }
    
    ItemProcessor<String, String> itemProcessor() {
        return item -> {
            System.out.println("## processor:" + item);
            return item;
            };
    }
    
    ItemWriter<String> itemWriter() {
        return items -> {
            for (String string : items) {
                System.out.print(string + ",");
            }
            System.out.println("");
        };
    }
    
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }

}

実行前にはActive MQを起動しておく

master側は1-1000をrederで読み込みchunk(200)ごとにリモート呼び出しをする。workerは特に何もしておらず、200件ごとにwriterが一度呼ばれる。