背景
Javaでキーブレイク*1を意識することはあまり無い。JPAならOneToMany
で自動的にコレクションにマッピングされるし、JSONやXMLでも同様である。
今回、spring-batchでO/Rをとある事情で使えずSQLを直接使用、かつ、キーブレイクが必要になった。その実現方法について考える。こちらにある通り、SingleItemPeekableItemReader
とtaskletを組み合わせれば可能だが、今回はchunkで何とかする方法について。
基本的な考え方はこちらと同様にSingleItemPeekableItemReader
でキーブレイクを実現する。このときdelegate先はJdbcCursorItemReader
にする。そして、一回のItemReader#read
内でキーブレイクに達するまでJDBCカーソルを回し続ける。これにより、ItemReader#read
がキーブレイク単位のオブジェクトを返すようになる。
spring-batchのchunkの考え方からするとちょっと気持ち悪いが……とりえあず実装を見ていく。
実装
plugins { id 'org.springframework.boot' version '2.3.4.RELEASE' id 'io.spring.dependency-management' version '1.0.10.RELEASE' id 'java' } group = 'com.example' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter-batch' implementation 'org.springframework.boot:spring-boot-starter-data-jpa' compileOnly 'org.projectlombok:lombok' runtimeOnly 'com.h2database:h2' annotationProcessor 'org.projectlombok:lombok' testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } testImplementation 'org.springframework.batch:spring-batch-test' } test { useJUnitPlatform() }
サンプルとして、1:nのテーブルからデータを取得し、親側のキーでキーブレイクする処理、を考える。
エンティティ的にはこんな感じ。このサンプルではテーブルを自動生成するためだけにしか使わない。
@Entity @Data public class Sample { @Id Long id; String value; @OneToMany @JoinColumn(name = "sampleId") List<SampleDetail> details; }
@Entity @Data public class SampleDetail { @Id SampleDetailId id; String value; }
@SuppressWarnings("serial") @Embeddable public class SampleDetailId implements Serializable { Long id; Long sampleId; }
データ取得のために実行するSQLはこんな感じ。
select s.id, d.id, d.value from sample s join sample_detail d on s.id = d.sample_id order by s.id
以下はこのエントリの核となるキーブレイクするreader。
import javax.sql.DataSource; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemStreamException; import org.springframework.batch.item.ItemStreamReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import org.springframework.batch.item.database.JdbcCursorItemReader; import org.springframework.batch.item.support.SingleItemPeekableItemReader; public class JdbcCursorKeyBreakRreader implements ItemStreamReader<KeyBreakItem> { SingleItemPeekableItemReader<SampleRow> peekableReader = new SingleItemPeekableItemReader<>(); public JdbcCursorKeyBreakRreader(DataSource dataSource) { JdbcCursorItemReader<SampleRow> jdbcCursor = new JdbcCursorItemReader<>(); jdbcCursor.setDataSource(dataSource); jdbcCursor.setName("jdbcCursor"); jdbcCursor.setSql("select s.id, d.id, d.value from sample s join sample_detail d on s.id = d.sample_id order by s.id"); jdbcCursor.setRowMapper((rs, rowNum) -> new SampleRow(rs.getLong(1), rs.getLong(2), rs.getString(3))); peekableReader.setDelegate(jdbcCursor); } @Override public void open(ExecutionContext executionContext) throws ItemStreamException { peekableReader.open(executionContext); } @Override public void update(ExecutionContext executionContext) throws ItemStreamException { peekableReader.update(executionContext); } @Override public void close() throws ItemStreamException { peekableReader.close(); } KeyBreakItem data = null; @Override public KeyBreakItem read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { data = new KeyBreakItem(); while (peekableReader.peek() != null) { SampleRow current = peekableReader.read(); SampleRow next = peekableReader.peek(); // chunkアイテムに各行の内容を反映させる data.setId(current.getId()); data.getDetailIds().add(current.getDetailId()); if (next != null) { // キーブレイク判定処理 if (!current.getId().equals(next.getId())) { return data; } } else { return data; } } return null; } }
キーブレイク判定処理がtrue
の間はJDBCカーソルから読み取り続け、false
になったらItemReader#read
としてchunkアイテムを返す。また、読み取った各行の内容を最終的な戻り値となる予定の変数に反映する。
説明用に汎用性は無くしている。といっても、SQLとかマッピングやキーブレイク判定とかを関数化して渡せるようにすれば良い程度だが。
上のJdbcCursorItemReader
のRowMapper
が返すクラスはこんな感じ。
@Data @AllArgsConstructor @NoArgsConstructor public class SampleRow { Long id; Long detailId; String value; }
chunkアイテムのItemReader#read
が返すクラスはこんな感じ。
@Data @AllArgsConstructor @NoArgsConstructor public class KeyBreakItem { Long id; List<Long> detailIds = new ArrayList<>(); }
サンプル実行するためのjob定義はこんな感じ。
@EnableJpaRepositories @EnableBatchProcessing @SpringBootApplication public class Application { @Bean public Job job(JobBuilderFactory jobs, @Qualifier("myjobstep1") Step s1) { return jobs.get("demo-batch-job").incrementer(new RunIdIncrementer()).start(s1).build(); } @Bean(name = "myjobstep1") public Step step1(StepBuilderFactory steps, DataSource dataSource) { return steps .get("myjobstep1") .<KeyBreakItem, KeyBreakItem>chunk(2) .reader(new JdbcCursorKeyBreakRreader(dataSource)).writer(items -> { items.forEach(item -> System.out.println(item)); System.out.println("=== write ==="); }) .build(); } public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
サンプルデータ。src/main/resources/data.sql
insert into sample (id, value) values (1 , 'a'); insert into sample (id, value) values (2 , 'a'); insert into sample (id, value) values (3 , 'a'); insert into sample (id, value) values (4 , 'a'); insert into sample (id, value) values (5 , 'a'); insert into sample_detail (id, sample_id, value) values (1 , 1, 'a'); insert into sample_detail (id, sample_id, value) values (2 , 1, 'b'); insert into sample_detail (id, sample_id, value) values (3 , 1, 'c'); insert into sample_detail (id, sample_id, value) values (4 , 2, 'a'); insert into sample_detail (id, sample_id, value) values (5 , 2, 'b'); insert into sample_detail (id, sample_id, value) values (6 , 2, 'c'); insert into sample_detail (id, sample_id, value) values (7 , 3, 'a'); insert into sample_detail (id, sample_id, value) values (8 , 3, 'b'); insert into sample_detail (id, sample_id, value) values (9 , 3, 'c'); insert into sample_detail (id, sample_id, value) values (10 , 4, 'a'); insert into sample_detail (id, sample_id, value) values (11 , 4, 'b'); insert into sample_detail (id, sample_id, value) values (12 , 4, 'c'); insert into sample_detail (id, sample_id, value) values (13 , 5, 'a'); insert into sample_detail (id, sample_id, value) values (14 , 5, 'b'); insert into sample_detail (id, sample_id, value) values (15 , 5, 'c');
実行時の様子。chunk(2)
にしているので、キーブレイク2回ごとにwriterが実行されているのがわかる。
KeyBreakItem(id=1, detailIds=[1, 2, 3]) KeyBreakItem(id=2, detailIds=[4, 5, 6]) === write === KeyBreakItem(id=3, detailIds=[7, 8, 9]) KeyBreakItem(id=4, detailIds=[10, 11, 12]) === write === KeyBreakItem(id=5, detailIds=[13, 14, 15]) === write ===