kagamihogeの日記

kagamihogeの日記です。

spring-batchでJDBCのキーブレイク

背景

Javaでキーブレイク*1を意識することはあまり無い。JPAならOneToManyで自動的にコレクションにマッピングされるし、JSONXMLでも同様である。

今回、spring-batchでO/Rをとある事情で使えずSQLを直接使用、かつ、キーブレイクが必要になった。その実現方法について考える。こちらにある通り、SingleItemPeekableItemReaderとtaskletを組み合わせれば可能だが、今回はchunkで何とかする方法について。

基本的な考え方はこちらと同様にSingleItemPeekableItemReaderでキーブレイクを実現する。このときdelegate先はJdbcCursorItemReaderにする。そして、一回のItemReader#read内でキーブレイクに達するまでJDBCカーソルを回し続ける。これにより、ItemReader#readがキーブレイク単位のオブジェクトを返すようになる。

spring-batchのchunkの考え方からするとちょっと気持ち悪いが……とりえあず実装を見ていく。

実装

plugins {
  id 'org.springframework.boot' version '2.3.4.RELEASE'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-batch'
  implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
  compileOnly 'org.projectlombok:lombok'
  runtimeOnly 'com.h2database:h2'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
  testImplementation 'org.springframework.batch:spring-batch-test'
}
test {
  useJUnitPlatform()
}

サンプルとして、1:nのテーブルからデータを取得し、親側のキーでキーブレイクする処理、を考える。

エンティティ的にはこんな感じ。このサンプルではテーブルを自動生成するためだけにしか使わない。

@Entity
@Data
public class Sample {
    @Id
    Long id;
    String value;

    @OneToMany
    @JoinColumn(name = "sampleId")
    List<SampleDetail> details;
}
@Entity
@Data
public class SampleDetail {
    @Id
    SampleDetailId id;
    String value;
}
@SuppressWarnings("serial")
@Embeddable
public class SampleDetailId implements Serializable {
    Long id;
    Long sampleId;
}

データ取得のために実行するSQLはこんな感じ。

select s.id, d.id, d.value 
from sample s 
join sample_detail d on s.id = d.sample_id 
order by s.id

以下はこのエントリの核となるキーブレイクするreader。

import javax.sql.DataSource;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.support.SingleItemPeekableItemReader;

public class JdbcCursorKeyBreakRreader implements ItemStreamReader<KeyBreakItem> {
    SingleItemPeekableItemReader<SampleRow> peekableReader = new SingleItemPeekableItemReader<>();

    public JdbcCursorKeyBreakRreader(DataSource dataSource) {
        JdbcCursorItemReader<SampleRow> jdbcCursor = new JdbcCursorItemReader<>();
        jdbcCursor.setDataSource(dataSource);
        jdbcCursor.setName("jdbcCursor");
        jdbcCursor.setSql("select s.id, d.id, d.value from sample s join sample_detail d on s.id = d.sample_id order by s.id");
        jdbcCursor.setRowMapper((rs, rowNum) -> new SampleRow(rs.getLong(1), rs.getLong(2), rs.getString(3)));

        peekableReader.setDelegate(jdbcCursor);
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        peekableReader.open(executionContext);
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        peekableReader.update(executionContext);
    }

    @Override
    public void close() throws ItemStreamException {
        peekableReader.close();
    }

    KeyBreakItem data = null;

    @Override
    public KeyBreakItem read()
            throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        data = new KeyBreakItem();

        while (peekableReader.peek() != null) {
            SampleRow current = peekableReader.read();
            SampleRow next = peekableReader.peek();

            // chunkアイテムに各行の内容を反映させる
            data.setId(current.getId());
            data.getDetailIds().add(current.getDetailId());

            if (next != null) {
                // キーブレイク判定処理
                if (!current.getId().equals(next.getId())) {
                    return data;
                }
            } else {
                return data;
            }
        }

        return null;
    }
}

キーブレイク判定処理がtrueの間はJDBCカーソルから読み取り続け、falseになったらItemReader#readとしてchunkアイテムを返す。また、読み取った各行の内容を最終的な戻り値となる予定の変数に反映する。

説明用に汎用性は無くしている。といっても、SQLとかマッピングやキーブレイク判定とかを関数化して渡せるようにすれば良い程度だが。

上のJdbcCursorItemReaderRowMapperが返すクラスはこんな感じ。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SampleRow {
    Long id;
    Long detailId;
    String value;
}

chunkアイテムのItemReader#readが返すクラスはこんな感じ。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class KeyBreakItem {
    Long id;
    List<Long> detailIds = new ArrayList<>();
}

サンプル実行するためのjob定義はこんな感じ。

@EnableJpaRepositories
@EnableBatchProcessing
@SpringBootApplication
public class Application {
    @Bean
    public Job job(JobBuilderFactory jobs, @Qualifier("myjobstep1") Step s1) {
        return jobs.get("demo-batch-job").incrementer(new RunIdIncrementer()).start(s1).build();
    }

    @Bean(name = "myjobstep1")
    public Step step1(StepBuilderFactory steps, DataSource dataSource) {
        return steps
                .get("myjobstep1")
                .<KeyBreakItem, KeyBreakItem>chunk(2)
                .reader(new JdbcCursorKeyBreakRreader(dataSource)).writer(items -> {
                    items.forEach(item -> System.out.println(item));
                    System.out.println("=== write ===");
                })

                .build();
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

サンプルデータ。src/main/resources/data.sql

insert into sample (id, value) values (1 , 'a');
insert into sample (id, value) values (2 , 'a');
insert into sample (id, value) values (3 , 'a');
insert into sample (id, value) values (4 , 'a');
insert into sample (id, value) values (5 , 'a');

insert into sample_detail (id, sample_id, value) values (1 , 1,  'a');
insert into sample_detail (id, sample_id, value) values (2 , 1,  'b');
insert into sample_detail (id, sample_id, value) values (3 , 1,  'c');
insert into sample_detail (id, sample_id, value) values (4 , 2,  'a');
insert into sample_detail (id, sample_id, value) values (5 , 2,  'b');
insert into sample_detail (id, sample_id, value) values (6 , 2,  'c');
insert into sample_detail (id, sample_id, value) values (7 , 3,  'a');
insert into sample_detail (id, sample_id, value) values (8 , 3,  'b');
insert into sample_detail (id, sample_id, value) values (9 , 3,  'c');
insert into sample_detail (id, sample_id, value) values (10 , 4,  'a');
insert into sample_detail (id, sample_id, value) values (11 , 4,  'b');
insert into sample_detail (id, sample_id, value) values (12 , 4,  'c');
insert into sample_detail (id, sample_id, value) values (13 , 5,  'a');
insert into sample_detail (id, sample_id, value) values (14 , 5,  'b');
insert into sample_detail (id, sample_id, value) values (15 , 5,  'c');

実行時の様子。chunk(2)にしているので、キーブレイク2回ごとにwriterが実行されているのがわかる。

KeyBreakItem(id=1, detailIds=[1, 2, 3])
KeyBreakItem(id=2, detailIds=[4, 5, 6])
=== write ===
KeyBreakItem(id=3, detailIds=[7, 8, 9])
KeyBreakItem(id=4, detailIds=[10, 11, 12])
=== write ===
KeyBreakItem(id=5, detailIds=[13, 14, 15])
=== write ===