kagamihogeの日記

kagamihogeの日記です。

spring-batchでグループ単位のレコード読み込み

https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/ja/Ch05_FileAccess.html によると、コントロールブレイク処理(またはキーブレイク処理)、と呼ぶものをspring-batchで実現する方法について。

コントロールブレイク処理(またはキーブレイク処理)とは、ソート済みのレコードを順次読み込み、 レコード内にある特定の項目(キー項目)が同じレコードを1つのグループとして処理する手法のことを指す。

https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/ja/Ch05_FileAccess.html より抜粋

ソースコード

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    runtimeOnly 'com.h2database:h2'
    compileOnly 'org.projectlombok:lombok'
}

いま、codeとvalueからなるレコードのリストがあるとして、同一codeの複数レコードを単一のリストにまとめたい、とする。

レコードのクラスはこんな感じ。listの役割については後述。

@Data
@NoArgsConstructor
public class MyRecord {
    String code;
    Integer value;
    List<MyRecord> list;
    
    public MyRecord(String code, Integer value) {
        super();
        this.code = code;
        this.value = value;
    }
}
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableBatchProcessing
public class App {
    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs
                .get("job")
                .start(step1(steps))
                .build();
    }
    
    public Step step1(StepBuilderFactory stepBuilderFactory) {
        ItemReader<MyRecord> reader = new ListItemReader<MyRecord>(
                Arrays.asList(
                        new MyRecord("1", 1),
                        new MyRecord("1", 2),
                        new MyRecord("1", 3),
                        new MyRecord("1", 4),
                        new MyRecord("1", 5),
                        new MyRecord("2", 6),
                        new MyRecord("2", 7),
                        new MyRecord("3", 8)
                        ));

        MySingleItemPeekableItemReader peekableReader = new MySingleItemPeekableItemReader();
        peekableReader.setDelegate(reader);
        
        return stepBuilderFactory
                .get("step1")
                .<MyRecord, MyRecord>chunk(2)
                .reader(peekableReader)
                .writer(l -> {
                    System.out.println(l.size());
                    l.forEach(System.out::println);})
                .build();
    }
    
    public static void main(String[] args) {
        new SpringApplicationBuilder(App.class).run(args);
    }
}

カギはSingleItemPeekableItemReaderを拡張したMySingleItemPeekableItemReaderで、そのソースコードは以下の通り。

import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.support.SingleItemPeekableItemReader;

public class MySingleItemPeekableItemReader extends SingleItemPeekableItemReader<MyRecord> {

    @Override
    public MyRecord read() throws Exception, UnexpectedInputException, ParseException {
        MyRecord item = super.read();

        if (item == null) {
            return null;
        }
        
        MyRecord group = new MyRecord();
        group.list = new ArrayList<MyRecord>();

        while (true) {
            MyRecord peek = super.peek();
            group.list.add(item);
            if (peek == null) {
                return group;
            }

            if (item.code.equals(peek.code)) {
                item = super.read();
            } else {
                // コードの切り替わり
                return group;
            }
        }
    }
}

まずSingleItemPeekableItemReaderについて。このクラスは"peek"(ちらっとのぞく)の名の通り、peekメソッドを呼ぶと次にreadするアイテムを得られる。これを利用して、codeが切り替わるタイミングを制御する。もし、codeが同じなら同一グループを意味するのでlistに追加し、異なればグループ終了を意味するのでreadの戻り値を返す。

これを実行すると以下のような出力結果になる。

2
[MyRecord(code=1, value=1, list=null), MyRecord(code=1, value=2, list=null), MyRecord(code=1, value=3, list=null), MyRecord(code=1, value=4, list=null), MyRecord(code=1, value=5, list=null)]
[MyRecord(code=2, value=6, list=null), MyRecord(code=2, value=7, list=null)]
1
[MyRecord(code=3, value=8, list=null)]

メモ

chunk(2)なので2グループごとにwriterが呼ばれる。大本のレコード数は8だが、readerでグループ化するのでread件数はそれとは異なってくる。

ここではreadはグルーピングしたリストを返しているが、グループごとの集計結果を返すようにしても良いと思う。readerがグループ集計結果を返し、writerでファイルとかに書き込む、というイメージ。

ただ、上のコードでちょっと微妙なのはグルーピングのリストをレコードMyRecordに入れちゃってる点。レコードを表すクラスにグループ化のプロパティが居るのは違和感が強い。PeekableItemReaderjavadocによると、ファイル読み込みで論理的には一行だが複数行にまたがっているものを集約する場合などに有用、とか書いてある。なので、異なるクラスを返すような処理は想定していない、のかもしれない。

と、いうわけで。SingleItemPeekableItemReaderをラップし、かつ、readはグループ化のクラスを返す、というクラスを自作しても良いかもしれない。

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.support.SingleItemPeekableItemReader;

public class CustomSingleItemPeekableItemReader implements ItemReader<List<MyRecord>> {

    SingleItemPeekableItemReader<MyRecord> reader;

    public CustomSingleItemPeekableItemReader(SingleItemPeekableItemReader<MyRecord> reader) {
        super();
        this.reader = reader;
    }

    @Override
    public List<MyRecord> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        MyRecord item = reader.read();

        if (item == null) {
            return null;
        }

        ArrayList<MyRecord> list = new ArrayList<MyRecord>();

        while (true) {
            MyRecord peek = reader.peek();
            list.add(item);
            if (peek == null) {
                return list;
            }

            if (item.code.equals(peek.code)) {
                item = reader.read();
            } else {
                return list;
            }
        }
    }
}

参考URL