kagamihogeの日記

kagamihogeの日記です。

Spring Bootでjsonプロパティをenumにデシリアライズとvalidation

各種パターン

まず、動作確認用の適当なmainとcontrollerを作成する。

plugins {
  id 'org.springframework.boot' version '2.4.1'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-web'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  
  implementation 'io.springfox:springfox-boot-starter:3.0.0'
  implementation 'org.springframework.boot:spring-boot-starter-validation'
}
test {
  useJUnitPlatform()
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SampleController {
    @PostMapping("/sample")
    public void sample(@RequestBody @Validated SampleRequest request) {
        System.out.println(request);
    }
}

リクエスト格納クラスのプロパティの型を色々変えて試していく。

import lombok.Data;

@Data
public class SampleRequest {
    SimpleEnum simpleEnum;
}
public enum SimpleEnum {
    on, off;
}

単純なenum

上記のとおり、特に何もせずともenumにすればバインドする。上の例だと、"simpleEnum": "on"とか"simpleEnum": "off"が通る。また、"simpleEnum": nullとかjsonプロパティ自体が無い場合はnullになる。

Optionalのenum

Optional<SimpleEnum> optSimpleEnum = Optional.empty();

この場合、"optSimpleEnum": nullを渡すとOptional.emptyになる。また、上記例はデフォルト値を入れており、jsonプロパティ自体が無い場合Optional.emptyになる。デフォルト値が無いとnullになる。

独自マッピングenum

たとえば、onは"1"でoffは"0"のように、enumとは異なる値からマッピングしたい場合。これは@JsonCreatorマッピングを定義する。

import java.util.HashMap;
import java.util.Map;

import com.fasterxml.jackson.annotation.JsonCreator;

public enum ValueEnum {
    on("1"), off("0");
    
    String value;
    ValueEnum(String value) {
        this.value = value;
    }
    
    static Map<String, ValueEnum> BY_VALUE = new HashMap<>();
    static {
        for (ValueEnum e : values()) {
            BY_VALUE.put(e.value, e);
        }
    }
    
    @JsonCreator
    public static ValueEnum create(String value) {
        return BY_VALUE.getOrDefault(value, ValueEnum.off);
    }
}

上記例は、Mapにあらかじめマッピングを保持しておき、@JsonCreatorで文字列からenumへのマッピングを記述している。

なお、nulljsonプロパティ自体が無い場合はnullになる。

独自マッピングのOptionalのenum

Optional<ValueEnum> optValueEnum = Optional.empty();

この場合、nullを渡すとOptional.emptyになる。また、デフォルト値を入れているので、jsonプロパティ自体が無い場合もOptional.emptyになる。

独自マッピングenumのvalidation

妥当な値以外はvalidationエラーにしたい場合。上記例の場合、"0", "1"以外はvalidationエラーとしたい場合を考える。

@ValueEnumConstraint
ValidatedValueEnum validatedValueEnum;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import javax.validation.Constraint;
import javax.validation.Payload;

@Documented
@Constraint(validatedBy = ValueEnumValidator.class)
@Target( { ElementType.METHOD, ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
public @interface ValueEnumConstraint {
    String message() default "Invalid";
    Class<?>[] groups() default {};
    Class<? extends Payload>[] payload() default {};
}
import java.util.Objects;

import javax.validation.ConstraintValidator;
import javax.validation.ConstraintValidatorContext;

public class ValueEnumValidator implements ConstraintValidator<ValueEnumConstraint, ValidatedValueEnum> {

    @Override
    public boolean isValid(ValidatedValueEnum value, ConstraintValidatorContext context) {
        return Objects.nonNull(value);
    }
}
public enum ValidatedValueEnum {
  //(ValueEnumと同一部分は省略)    
    @JsonCreator
    public static ValidatedValueEnum create(String value) {
        return BY_VALUE.getOrDefault(value, null);
    }
}

まず、@JsonCreatorで、妥当な値以外が来た場合はnullを返す。そして、validationはnullが来たらvalidationエラーにする。@JsonCreatorバインディングの後にConstraintValidatorが走る。

ただし、これはnullを許容したい場合にはvalidationエラーとなってしまう。

独自マッピングのOptionalのenumのvalidation

nullを許容するため、まずOptionalにする。

@OptValueEnumConstraint
Optional<OptValidatedValueEnum> optValidatedValueEnum = Optional.empty();

次に、enumにvalidationエラーを示すinvalidを追加する。@JsonCreatorマッピング時に、妥当な値以外はinvalidを返す。

public enum OptValidatedValueEnum {
    on("1"), off("0"), invalid("");
    //(省略) 
    @JsonCreator
    public static OptValidatedValueEnum create(String value) {
        return BY_VALUE.getOrDefault(value, invalid);
    }
}

そして、validationではinvalidが来たらエラーにし、それ以外はOKにする。

public class OptValueEnumValidator implements ConstraintValidator<OptValueEnumConstraint, Optional<OptValidatedValueEnum>> {

    @Override
    public boolean isValid(Optional<OptValidatedValueEnum> value, ConstraintValidatorContext context) {
        if (value.isPresent()) {
            return !(value.get() == invalid);
        }
        return true;
    }
}

これで、妥当な値、ここでは"0", "1", nullおよびjsonプロパティ自体が無い、以外はvalidationエラーになる。

Spring Boot + Apache Kafkaのチュートリアルレベルの事をやる

Spring BootからApache Kafkaにpublish/subscribeする。

環境

dockerでkafka起動

qiita.com

上記を参考に構築する。本エントリで関係する書き換え箇所は以下の通り。

  • KAFKA_ADVERTISED_HOST_NAME - ホストのローカルアドレスを指定。
  • "19092:9092" - 後々、ホストのSpring Bootからアクセスする際にポートを指定する。なので適当な値でポートフォワーディングしている。
  • KAFKA_CREATE_TOPICS - 起動時に動作確認用のtopicを作成。
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: .
    ports:
      - "19092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.10.210
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "Topic:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

CLIでpublish/subscribeが出来るか動作確認する。

まずkafkaのdockerに入る。

docker exec -it kafka-docker_kafka_1 bash

topicの一覧を取得する。KAFKA_CREATE_TOPICSで指定したtopicが作られているのが分かる。

bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Topic

topicの新規作成。

bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --create --topic=mytopic
Created topic mytopic.

topicにpublishする。

bash-4.4# $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic
>sad
>

topicからconsumeする。

bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic=mytopic

Spring Boot

上記で構築したkafkaにSpring Bootからpublish/subscribeする。

docs.spring.io

基本的なことはSpring Bootのドキュメントに記載がある。

string

まず単純な文字列のpublish/subscribeをする。

gradleに関連する依存性などを追加する。Spring Initializr https://start.spring.io/ で必要なものを追加する。Dependenciesにkafkaと入力すると、Spring for Apache KafkaとSpring for Apache Kafka Streamsが出てくるが、とりあえずはSpring for Apache Kafkaだけで良い。

plugins {
  id 'org.springframework.boot' version '2.3.4.RELEASE'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter'
  implementation 'org.springframework.kafka:spring-kafka'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
  testImplementation 'org.springframework.kafka:spring-kafka-test'  
}
test {
  useJUnitPlatform()
}

application.propertiesで設定を行う。

spring.kafka.bootstrap-servers=localhost:19092
spring.kafka.consumer.group-id=myGroup

Spring Boot起動時にpublishを一回行うMainクラスを作成する。

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import lombok.AllArgsConstructor;

@SpringBootApplication
@AllArgsConstructor
public class DemoApplication implements CommandLineRunner {
    StringPublisher publisher;
    
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        publisher.publish();
    }
}

publishするクラス。

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import lombok.AllArgsConstructor;

@Component
@AllArgsConstructor
public class StringPublisher {
    KafkaTemplate<String, String> kafkaTemplate;

    void publish() {
        kafkaTemplate.send("mytopic", "message");
    }
}

consumeするクラス。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class StringConsumer {
    @KafkaListener(topics = "mytopic")
    public void processMessage(String content) {
        System.out.println(content);
    }
}

JSON

次にPOJOのpublish/subscribeをする。実際にはPOJOからJSONシリアライズ・デシリアライズをして通信を行う。なので、まずjackson-databindの依存性を追加する。

implementation 'com.fasterxml.jackson.core:jackson-databind'

application.propertiesJSONシリアライズ・デシリアライズの設定を追加する。デフォルトはStringにシリアライズ・デシリアライズするため、これをJSONに切り替える。また、そのJSONクラスは信頼されたpackageに入れる必要がある。今回は説明簡易化のため、すべて信頼するように*を指定している。

pring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

JSON用のMainクラスを作成する。

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import lombok.AllArgsConstructor;

@SpringBootApplication
@AllArgsConstructor
public class DemoApplication implements CommandLineRunner {
    JsonPublisher publisher;
    
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        publisher.publish();
    }
}

POJOのクラス。

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SampleData {
    String id;
    int value;
}

pubishをするクラス。

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import lombok.AllArgsConstructor;

@Component
@AllArgsConstructor
public class JsonPublisher {
    KafkaTemplate<String, SampleData> kafkaTemplate;

    void publish() {
        kafkaTemplate.send("mytopic", new SampleData("id001", 334));
    }
}

consumeをするクラス。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class JsonConsumer {
    @KafkaListener(topics = "mytopic")
    public void processMessage(SampleData content) {
        System.out.println(content);
    }
}

ハマったところ

/bin/sh: illegal option -

docker-compose up -d --buildの際に以下のエラーが発生した。

OK: 349 MiB in 74 packages
/bin/sh: illegal option -
ERROR: Service 'kafka' failed to build : The command '/bin/sh -c apk add --no-cache bash curl jq docker  && chmod a+x /tmp/*.sh  && mv /tmp/start-kafka.sh /tmp/broker-list.sh /tmp/create-topics.sh /tmp/versions.sh /usr/bin  && sync && /tmp/download-kafka.sh  && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt  && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz  && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME}  && rm /tmp/*  && wget https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VERSION}/glibc-${GLIBC_VERSION}.apk  && apk add --no-cache --allow-untrusted glibc-${GLIBC_VERSION}.apk  && rm glibc-${GLIBC_VERSION}.apk' returned a non-zero code: 2

原因は*.shの改行コードがCRLFのため発生するらしい。windowsの場合はgit cloneの際にcore.autocrlf falseだと改行コードが自動的にCRLFになる。なのでcore.autocrlf trueで再度cloneしなおせばよい。詳しくは以下を参照。

github.com

Invalid url in bootstrap.servers:

bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost --list
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:499)
        at org.apache.kafka.clients.admin.Admin.create(Admin.java:63)
        at kafka.admin.TopicCommand$AdminClientTopicService$.createAdminClient(TopicCommand.scala:216)
        at kafka.admin.TopicCommand$AdminClientTopicService$.apply(TopicCommand.scala:220)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:57)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: localhost
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:59)
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:455)
        ... 5 more

-bootstrap-serverにはポート番号も必要。

× --bootstrap-server localhost
〇 --bootstrap-server localhost:9092

Can't convert value of class xxxx to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class kafka.sample.SampleData to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: class kafka.sample.SampleData cannot be cast to class java.lang.String (kafka.sample.SampleData is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @43180ffa; java.lang.String is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.5.1.jar:na]

POJOからJSONシリアライズする場合、プロパティでそのための指定をする必要がある。

java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper

Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582) ~[na:na]

JSONを使用する場合、依存性にcom.fasterxml.jackson.core:jackson-databindを追加する必要がある。

Cannot convert from [java.lang.String] to [xxxx]

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [kafka.sample.SampleData] for GenericMessage [payload={"id":"id001","value":334}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32f00752, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=mytopic, kafka_receivedTimestamp=1603606820608, __TypeId__=[B@1a73802f, kafka_groupId=myGroup}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]

JSONからPOJOにデシリアライズする場合、プロパティでそのための指定をする必要がある。

The class 'xxxx' is not in the trusted packages

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition mytopic-0 at offset 10. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'kafka.sample.SampleData' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.5.6.RELEASE.jar:2.5.6.RELEASE]

シリアライズの際に対象クラスは信頼されたpackageに含める必要がある。

参考文献

spring-batchでJDBCのキーブレイク

背景

Javaでキーブレイク*1を意識することはあまり無い。JPAならOneToManyで自動的にコレクションにマッピングされるし、JSONXMLでも同様である。

今回、spring-batchでO/Rをとある事情で使えずSQLを直接使用、かつ、キーブレイクが必要になった。その実現方法について考える。こちらにある通り、SingleItemPeekableItemReaderとtaskletを組み合わせれば可能だが、今回はchunkで何とかする方法について。

基本的な考え方はこちらと同様にSingleItemPeekableItemReaderでキーブレイクを実現する。このときdelegate先はJdbcCursorItemReaderにする。そして、一回のItemReader#read内でキーブレイクに達するまでJDBCカーソルを回し続ける。これにより、ItemReader#readがキーブレイク単位のオブジェクトを返すようになる。

spring-batchのchunkの考え方からするとちょっと気持ち悪いが……とりえあず実装を見ていく。

実装

plugins {
  id 'org.springframework.boot' version '2.3.4.RELEASE'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-batch'
  implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
  compileOnly 'org.projectlombok:lombok'
  runtimeOnly 'com.h2database:h2'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
  testImplementation 'org.springframework.batch:spring-batch-test'
}
test {
  useJUnitPlatform()
}

サンプルとして、1:nのテーブルからデータを取得し、親側のキーでキーブレイクする処理、を考える。

エンティティ的にはこんな感じ。このサンプルではテーブルを自動生成するためだけにしか使わない。

@Entity
@Data
public class Sample {
    @Id
    Long id;
    String value;

    @OneToMany
    @JoinColumn(name = "sampleId")
    List<SampleDetail> details;
}
@Entity
@Data
public class SampleDetail {
    @Id
    SampleDetailId id;
    String value;
}
@SuppressWarnings("serial")
@Embeddable
public class SampleDetailId implements Serializable {
    Long id;
    Long sampleId;
}

データ取得のために実行するSQLはこんな感じ。

select s.id, d.id, d.value 
from sample s 
join sample_detail d on s.id = d.sample_id 
order by s.id

以下はこのエントリの核となるキーブレイクするreader。

import javax.sql.DataSource;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.support.SingleItemPeekableItemReader;

public class JdbcCursorKeyBreakRreader implements ItemStreamReader<KeyBreakItem> {
    SingleItemPeekableItemReader<SampleRow> peekableReader = new SingleItemPeekableItemReader<>();

    public JdbcCursorKeyBreakRreader(DataSource dataSource) {
        JdbcCursorItemReader<SampleRow> jdbcCursor = new JdbcCursorItemReader<>();
        jdbcCursor.setDataSource(dataSource);
        jdbcCursor.setName("jdbcCursor");
        jdbcCursor.setSql("select s.id, d.id, d.value from sample s join sample_detail d on s.id = d.sample_id order by s.id");
        jdbcCursor.setRowMapper((rs, rowNum) -> new SampleRow(rs.getLong(1), rs.getLong(2), rs.getString(3)));

        peekableReader.setDelegate(jdbcCursor);
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        peekableReader.open(executionContext);
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        peekableReader.update(executionContext);
    }

    @Override
    public void close() throws ItemStreamException {
        peekableReader.close();
    }

    KeyBreakItem data = null;

    @Override
    public KeyBreakItem read()
            throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        data = new KeyBreakItem();

        while (peekableReader.peek() != null) {
            SampleRow current = peekableReader.read();
            SampleRow next = peekableReader.peek();

            // chunkアイテムに各行の内容を反映させる
            data.setId(current.getId());
            data.getDetailIds().add(current.getDetailId());

            if (next != null) {
                // キーブレイク判定処理
                if (!current.getId().equals(next.getId())) {
                    return data;
                }
            } else {
                return data;
            }
        }

        return null;
    }
}

キーブレイク判定処理がtrueの間はJDBCカーソルから読み取り続け、falseになったらItemReader#readとしてchunkアイテムを返す。また、読み取った各行の内容を最終的な戻り値となる予定の変数に反映する。

説明用に汎用性は無くしている。といっても、SQLとかマッピングやキーブレイク判定とかを関数化して渡せるようにすれば良い程度だが。

上のJdbcCursorItemReaderRowMapperが返すクラスはこんな感じ。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SampleRow {
    Long id;
    Long detailId;
    String value;
}

chunkアイテムのItemReader#readが返すクラスはこんな感じ。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class KeyBreakItem {
    Long id;
    List<Long> detailIds = new ArrayList<>();
}

サンプル実行するためのjob定義はこんな感じ。

@EnableJpaRepositories
@EnableBatchProcessing
@SpringBootApplication
public class Application {
    @Bean
    public Job job(JobBuilderFactory jobs, @Qualifier("myjobstep1") Step s1) {
        return jobs.get("demo-batch-job").incrementer(new RunIdIncrementer()).start(s1).build();
    }

    @Bean(name = "myjobstep1")
    public Step step1(StepBuilderFactory steps, DataSource dataSource) {
        return steps
                .get("myjobstep1")
                .<KeyBreakItem, KeyBreakItem>chunk(2)
                .reader(new JdbcCursorKeyBreakRreader(dataSource)).writer(items -> {
                    items.forEach(item -> System.out.println(item));
                    System.out.println("=== write ===");
                })

                .build();
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

サンプルデータ。src/main/resources/data.sql

insert into sample (id, value) values (1 , 'a');
insert into sample (id, value) values (2 , 'a');
insert into sample (id, value) values (3 , 'a');
insert into sample (id, value) values (4 , 'a');
insert into sample (id, value) values (5 , 'a');

insert into sample_detail (id, sample_id, value) values (1 , 1,  'a');
insert into sample_detail (id, sample_id, value) values (2 , 1,  'b');
insert into sample_detail (id, sample_id, value) values (3 , 1,  'c');
insert into sample_detail (id, sample_id, value) values (4 , 2,  'a');
insert into sample_detail (id, sample_id, value) values (5 , 2,  'b');
insert into sample_detail (id, sample_id, value) values (6 , 2,  'c');
insert into sample_detail (id, sample_id, value) values (7 , 3,  'a');
insert into sample_detail (id, sample_id, value) values (8 , 3,  'b');
insert into sample_detail (id, sample_id, value) values (9 , 3,  'c');
insert into sample_detail (id, sample_id, value) values (10 , 4,  'a');
insert into sample_detail (id, sample_id, value) values (11 , 4,  'b');
insert into sample_detail (id, sample_id, value) values (12 , 4,  'c');
insert into sample_detail (id, sample_id, value) values (13 , 5,  'a');
insert into sample_detail (id, sample_id, value) values (14 , 5,  'b');
insert into sample_detail (id, sample_id, value) values (15 , 5,  'c');

実行時の様子。chunk(2)にしているので、キーブレイク2回ごとにwriterが実行されているのがわかる。

KeyBreakItem(id=1, detailIds=[1, 2, 3])
KeyBreakItem(id=2, detailIds=[4, 5, 6])
=== write ===
KeyBreakItem(id=3, detailIds=[7, 8, 9])
KeyBreakItem(id=4, detailIds=[10, 11, 12])
=== write ===
KeyBreakItem(id=5, detailIds=[13, 14, 15])
=== write ===