https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/ja/Ch05_FileAccess.html によると、コントロールブレイク処理(またはキーブレイク処理)、と呼ぶものをspring-batchで実現する方法について。
コントロールブレイク処理(またはキーブレイク処理)とは、ソート済みのレコードを順次読み込み、 レコード内にある特定の項目(キー項目)が同じレコードを1つのグループとして処理する手法のことを指す。
https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/ja/Ch05_FileAccess.html より抜粋
plugins {
id 'org.springframework.boot' version '2.2.2.RELEASE'
id 'io.spring.dependency-management' version '1.0.8.RELEASE'
id 'java'
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
runtimeOnly 'com.h2database:h2'
compileOnly 'org.projectlombok:lombok'
}
いま、codeとvalueからなるレコードのリストがあるとして、同一codeの複数レコードを単一のリストにまとめたい、とする。
レコードのクラスはこんな感じ。listの役割については後述。
@Data
@NoArgsConstructor
public class MyRecord {
String code;
Integer value;
List<MyRecord> list;
public MyRecord(String code, Integer value) {
super();
this.code = code;
this.value = value;
}
}
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
@EnableBatchProcessing
public class App {
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs
.get("job")
.start(step1(steps))
.build();
}
public Step step1(StepBuilderFactory stepBuilderFactory) {
ItemReader<MyRecord> reader = new ListItemReader<MyRecord>(
Arrays.asList(
new MyRecord("1", 1),
new MyRecord("1", 2),
new MyRecord("1", 3),
new MyRecord("1", 4),
new MyRecord("1", 5),
new MyRecord("2", 6),
new MyRecord("2", 7),
new MyRecord("3", 8)
));
MySingleItemPeekableItemReader peekableReader = new MySingleItemPeekableItemReader();
peekableReader.setDelegate(reader);
return stepBuilderFactory
.get("step1")
.<MyRecord, MyRecord>chunk(2)
.reader(peekableReader)
.writer(l -> {
System.out.println(l.size());
l.forEach(System.out::println);})
.build();
}
public static void main(String[] args) {
new SpringApplicationBuilder(App.class).run(args);
}
}
カギはSingleItemPeekableItemReader
を拡張したMySingleItemPeekableItemReader
で、そのソースコードは以下の通り。
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.support.SingleItemPeekableItemReader;
public class MySingleItemPeekableItemReader extends SingleItemPeekableItemReader<MyRecord> {
@Override
public MyRecord read() throws Exception, UnexpectedInputException, ParseException {
MyRecord item = super.read();
if (item == null) {
return null;
}
MyRecord group = new MyRecord();
group.list = new ArrayList<MyRecord>();
while (true) {
MyRecord peek = super.peek();
group.list.add(item);
if (peek == null) {
return group;
}
if (item.code.equals(peek.code)) {
item = super.read();
} else {
return group;
}
}
}
}
まずSingleItemPeekableItemReader
について。このクラスは"peek"(ちらっとのぞく)の名の通り、peek
メソッドを呼ぶと次にreadするアイテムを得られる。これを利用して、codeが切り替わるタイミングを制御する。もし、codeが同じなら同一グループを意味するのでlistに追加し、異なればグループ終了を意味するのでreadの戻り値を返す。
これを実行すると以下のような出力結果になる。
2
[MyRecord(code=1, value=1, list=null), MyRecord(code=1, value=2, list=null), MyRecord(code=1, value=3, list=null), MyRecord(code=1, value=4, list=null), MyRecord(code=1, value=5, list=null)]
[MyRecord(code=2, value=6, list=null), MyRecord(code=2, value=7, list=null)]
1
[MyRecord(code=3, value=8, list=null)]
メモ
chunk(2)
なので2グループごとにwriterが呼ばれる。大本のレコード数は8だが、readerでグループ化するのでread件数はそれとは異なってくる。
ここではreadはグルーピングしたリストを返しているが、グループごとの集計結果を返すようにしても良いと思う。readerがグループ集計結果を返し、writerでファイルとかに書き込む、というイメージ。
ただ、上のコードでちょっと微妙なのはグルーピングのリストをレコードMyRecord
に入れちゃってる点。レコードを表すクラスにグループ化のプロパティが居るのは違和感が強い。PeekableItemReader
のjavadocによると、ファイル読み込みで論理的には一行だが複数行にまたがっているものを集約する場合などに有用、とか書いてある。なので、異なるクラスを返すような処理は想定していない、のかもしれない。
と、いうわけで。SingleItemPeekableItemReader
をラップし、かつ、read
はグループ化のクラスを返す、というクラスを自作しても良いかもしれない。
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.support.SingleItemPeekableItemReader;
public class CustomSingleItemPeekableItemReader implements ItemReader<List<MyRecord>> {
SingleItemPeekableItemReader<MyRecord> reader;
public CustomSingleItemPeekableItemReader(SingleItemPeekableItemReader<MyRecord> reader) {
super();
this.reader = reader;
}
@Override
public List<MyRecord> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
MyRecord item = reader.read();
if (item == null) {
return null;
}
ArrayList<MyRecord> list = new ArrayList<MyRecord>();
while (true) {
MyRecord peek = reader.peek();
list.add(item);
if (peek == null) {
return list;
}
if (item.code.equals(peek.code)) {
item = reader.read();
} else {
return list;
}
}
}
}
参考URL