Spring Batchのチュートリアル https://spring.io/guides/gs/batch-processing/ を動かします。このサンプルはいわゆるデータインポート処理で、CSVからデータ読み込み・変換・保存、を行います。
- プロジェクト作成
- 解説
- 参考情報
プロジェクト作成
Spring Initializr https://start.spring.io/ で作ります。Project Metadataは適当に入力でOKで、ここでは以下のように入力したものとして進めます。Dependenciesには、Spring Batch, Lombok, hsql, を追加します。
Spring Bootの組み込みDB自動設定
Spring Batchは基本的にDBが必須です。
一方、Spring Bootは依存性に組み込みDBを含めると自動設定する機能があるので、これを使います。ここでは組み込みDBにhsqlを用いるため、プロジェクト作成時の依存性にこれを追加します。この組み込みDBは、Spring Bootアプリケーション起動時に作成され、終了時に破棄されます。つまり、アプリケーション起動のたびにDBが再作成されます。サンプルの挙動確認に便利なのでこの機能を使います。
詳細はSpring Boot Reference Documentation - Embedded Database Supportを参照してください。
build.gradle
Spring Initializrが生成するbuild.gradle
は以下の通りです。
plugins { id 'org.springframework.boot' version '2.2.6.RELEASE' id 'io.spring.dependency-management' version '1.0.9.RELEASE' id 'java' } group = 'kagamihoge' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' configurations { developmentOnly runtimeClasspath { extendsFrom developmentOnly } compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter-batch' compileOnly 'org.projectlombok:lombok' runtimeOnly 'org.hsqldb:hsqldb' annotationProcessor 'org.projectlombok:lombok' testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } testImplementation 'org.springframework.batch:spring-batch-test' } test { useJUnitPlatform() }
解説
SpringbatchsampleApplication
Spring Bootアプリケーションの実行ポイントとなるmain
を持つクラスです。このクラスを実行することでSpring Batchのジョブが実行されます。ただしこの段階ではjob
定義が無いので何も起きません。
package kagamihoge.springbatchsample; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringbatchsampleApplication { public static void main(String[] args) { SpringApplication.run(SpringbatchsampleApplication.class, args); } }
スキーマ&データ初期化SQL - schema-all.sql
Spring Bootにはアプリケーション起動時にDDLとデータ投入SQLを実行して、自動的にDB初期化する機能があります。これを利用してサンプルスキーマを作成します。これを使うにはクラスパス下にschema-${platform}.sql
とdata-${platform}.sql
を配置します。
よって、以下のような/src/main/resources/schema-all.sql
を作成します。
DROP TABLE people IF EXISTS; CREATE TABLE people ( person_id BIGINT IDENTITY NOT NULL PRIMARY KEY, first_name VARCHAR(20), last_name VARCHAR(20) );
前述の通り、hsqlがアプリケーションの起動のたびに再作成されます。また、そのたびにDB初期化も行われます。つまり、何回起動しても同一の状態でアプリケーションが起動できます。これもサンプルの挙動確認に便利です。詳細はSpring Boot Reference Documentation - 10.3. Initialize a Databaseを参照してください。
なお、DB初期化(people
テーブルの作成)を確認したい場合、以下のようなSpringbatchsampleApplication
を作成して実行します。
package kagamihoge.springbatchsample; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.jdbc.core.JdbcTemplate; @SpringBootApplication public class SpringbatchsampleApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(SpringbatchsampleApplication.class, args); } @Autowired JdbcTemplate j; @Override public void run(String... args) throws Exception { System.out.println(j.queryForObject("select count(*) from people", Integer.class)); } }
BatchConfiguration - Spring Batchのconfig(最低限度版)
先述の通りSpring BatchのサンプルではDBインポートを行いますが、ここではまずSpring Batchが起動できるか確認するための最低限の設定を作ります。最低限の設定として、以下のジョブはList
を5件ずつ表示します。
なお、説明簡易化のためすべてのクラスは同一packageに配置します。実際には、用途・役割別にpackageに分類すると思います。
package kagamihoge.springbatchsample; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.support.ListItemReader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableBatchProcessing public class BatchConfiguration { @Autowired JobBuilderFactory jobBuilderFactory; @Autowired StepBuilderFactory stepBuilderFactory; @Bean public Job importUserJob(Step step1) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .start(step1) .build(); } @Bean public Step step1() { ItemReader<Integer> reader = new ListItemReader<Integer>( IntStream.range(1, 20).boxed().collect(Collectors.toList())); return stepBuilderFactory.get("step1") .<Integer, Integer> chunk(5) .reader(reader) .writer(items -> System.out.println(items)) .build(); } }
これを実行すると以下の実行結果になります。
[1, 2, 3, 4, 5] [6, 7, 8, 9, 10] [11, 12, 13, 14, 15] [16, 17, 18, 19]
@EnableBatchProcessing
- Spring Batchはいくつかの必須beanの設定が必要です。このbean設定は面倒な上に定型的なため、アノテーションを付与するとデフォルト値で自動設定が行われます。自動設定されるbeanの詳細はEnableBatchProcessingのjavadocやSpring Batch Reference Documentation - 4.2. Java Configを参照してください。JobBuilderFactory
とStepBuilderFactory
- Spring Batchのjob
とstep
を定義するためのbeanです。上述の@EnableBatchProcessing
で自動的に使用可能になるbeanです。サンプルコードの通りbuilderスタイルで各種定義をします。job
- Spring Batchの実行単位です。このサンプルではjob
は1つのstep
を持ちます。step
- このstep
は、List<Integer>
から1つずつreadし、5個ずつのList
をwriterでコンソールに表示します。単にSpring Batchの起動確認のコードに過ぎないため、大して意味のないコードです。後でCSVからreadしてDBにwriteするサンプルコードとの比較にしてください。job
とstep
の関係はぐぐれば分かりやすい図が色々出てくる、例えばTERASOLUNA Batch Framework - Spring Batchの基本構造など、を適宜参照してください。chunk
-chunk(n)
はn
回readしてwriteという挙動をするので、readやwriteの具体的な処理を実装します。図解が無いと理解しにくいのでTERASOLUNA Batch Framework - チャンクモデルなどを参照してください。reader
-ListItemReader
はSpring Batch組み込みクラスです。List
から1要素ずつreadします。writer
-chunk(5)
なのでサイズ5のList<Integer>
がwriterに渡されてコンソールに表示しています。reader
やwriter
には、それぞれ、ItemReader
とItemWriter
インタフェースの実装を渡します。インタフェースの実装であればよく、Springのマネージドbean(@Bean
や@Component
)にしてもしなくても良いです。実装が十分に小さければ、上のようにラムダ式で書くことも可能です。
BatchConfiguration - Spring Batchサンプルの方のconfig
Spring Batchのサンプルをそのままコピペした版のconfigです。こちらは、CSVからデータ読み込み・変換・保存、を行います。
package kagamihoge.springbatchsample; import javax.sql.DataSource; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; @Configuration @EnableBatchProcessing public class BatchConfiguration { @Autowired JobBuilderFactory jobBuilderFactory; @Autowired StepBuilderFactory stepBuilderFactory; @Bean public Job importUserJob(JobCompletionNotificationListener listener, Step step1) { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(JdbcBatchItemWriter<Person> writer) { return stepBuilderFactory.get("step1") .<Person, Person> chunk(10) .reader(reader()) .processor(processor()) .writer(writer) .build(); } @Bean public FlatFileItemReader<Person> reader() { return new FlatFileItemReaderBuilder<Person>() .name("personItemReader") .resource(new ClassPathResource("sample-data.csv")) .delimited() .names(new String[]{"firstName", "lastName"}) .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class); }}) .build(); } @Bean public PersonItemProcessor processor() { return new PersonItemProcessor(); } @Bean public JdbcBatchItemWriter<Person> writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder<Person>() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)") .dataSource(dataSource) .build(); } }
job
incrementer
- 毎回異なるジョブ引数で実行するための設定。後述。JobExecutionListener
-job
のlistener
で実行前後に処理を差し込むリスナーを定義します。後述。flow
- Spring Batchではstep
を複数まとめるflow
でjob
を定義できます。ここではstep
1つだけのflow
ですが、順次実行の他に条件分岐などが可能です。詳細はSpring Batch - Reference Documentation - 5.3. Controlling Step Flowを参照してください。
step
<Person, Person> chunk(10)
- readerとwriterの入出力クラスをジェネリクスで指定します。このサンプルの場合ItemReader<Person>
とItemWriter<Person>
という意味です。ここでは両方とも同一クラスですが、processorで何らかの型変換がある場合などは入出力が異なるクラスになります。reader
- Spring Batchの組み込みクラスFlatFileItemReader
を使用します。このクラスはファイル読み込みの汎用readerです。読み込み元ファイルのパスやどの列をどのプロパティに割り当てるか、などを設定します。Spring Batchには典型的な動作用の組み込み実装があるため、ありがちな実装の場合にはjavadocを参照すると良いです。resource
- クラスパス下のsample-data.csv
(/src/main/resources/sample-data.csv
)delimited().names
- デリミタ有(デフォルト,
)、CSVデータ行(例:Jill,Doe
)をそれぞれfirstName
,lastName
のキー名で読み込みます。fieldSetMapper
- 読み込んだデータをJavaBeanの規則でインスタンス(Person
クラス)作成します。FlatFileItemReader
は各種用途に合うよう様々な拡張ポイントがあり、結構ややこしいため、詳細はTERASOLUNA Batch - 5.3.1.2. フラットファイルの入出力を行うコンポーネントを参照してください。
processor
- 後述。ちなみにprocessor
は不要であれば省略可能です。writer
- これもSpring Batchの組み込みクラスJdbcBatchItemWriter
です。このクラスはprocessorから*1渡されるデータで指定のSQLを実行します。sql
- 各データに適用するSQLを指定します。プレースホルダとPerson
をどう対応付けるかは下のitemSqlParameterSourceProvider
を参照してください。itemSqlParameterSourceProvider
-BeanPropertyItemSqlParameterSourceProvider
はプレースホルダとJavaBeanの対応付けを行う組み込みクラスです。dataSource
- Spring Bootの自動設定により、組み込みDBのDataSource
beanが自動的に作成されるので、それをここで設定しています。JdbcBatchItemWriter
は内部的にこのDataSource
を基にしたJdbcTemplate
でsql
を実行します。
/src/main/resources/sample-data.csv
reader
で指定するCSVファイルです。
Jill,Doe Joe,Doe Justin,Doe Jane,Doe John,Doe
PersonItemProcessor
processorはreaderで読み込んだデータの加工・変換を行います。このサンプルでは姓名を大文字にします。
package kagamihoge.springbatchsample; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemProcessor; public class PersonItemProcessor implements ItemProcessor<Person, Person> { private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class); @Override public Person process(final Person person) throws Exception { final String firstName = person.getFirstName().toUpperCase(); final String lastName = person.getLastName().toUpperCase(); final Person transformedPerson = new Person(firstName, lastName); log.info("Converting (" + person + ") into (" + transformedPerson + ")"); return transformedPerson; } }
JobCompletionNotificationListener
ジョブ完了時のリスナーです。DBにインポートされたデータ行をすべて出力します。
package kagamihoge.springbatchsample; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.listener.JobExecutionListenerSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; @Component public class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class); private final JdbcTemplate jdbcTemplate; @Autowired public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public void afterJob(JobExecution jobExecution) { if(jobExecution.getStatus() == BatchStatus.COMPLETED) { log.info("!!! JOB FINISHED! Time to verify the results"); jdbcTemplate.query("SELECT first_name, last_name FROM people", (rs, row) -> new Person( rs.getString(1), rs.getString(2)) ).forEach(person -> log.info("Found <" + person + "> in the database.")); } } }
jobExecution.getStatus() == BatchStatus.COMPLETED
- この意味は、job
が正常終了した場合、を意味します。job
の正常or異常終了で後処理が変わる場合に利用します。
Person
package kagamihoge.springbatchsample; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @AllArgsConstructor @NoArgsConstructor public class Person { String lastName; String firstName; }
RunIdIncrementer - jobパラメータに連番を付与
Spring Batchのjob
はパラメータを取ることが出来、そのパラメータが同一なjob
は2回以上実行出来ません。これで二重起動防止が出来ますが、パラメータが無い場合や二重起動が問題無いケースは回避策が必要です。
その場合RunIdIncrementer
を使います。このクラスはjob
実行時に連番のidを割り振ります。具体的にはrun.id
というキーで連番のパラメータになります。以下の例はjob
実行時のパラメータの履歴です(このサンプルはDBを永続化しないため、下記は別のjob
の例です)。
なお、上記テーブルはSpring Batchのメタデータと呼ばれる機能ですが、本エントリではこれ以上触れません。詳細はSpring Batch - Appendix B: Meta-Data SchemaやTERASOLUNA Batch Framework - Architectureを参照してください。
ジョブの実行
gradlew bootRun
あるいはgradlew build
実行後にjava -jar build\libs\springbatchsample-0.0.1-SNAPSHOT.jar
で実行します。
Spring BootでSpring Batchを実行する場合、自動的にすべてのJob
型のbeanを実行します。指定のジョブだけ実行したい場合はspring.batch.job.names
プロパティを使用します。以下は指定例です。
spring.batch.job.names importUserJob
また、上記の自動実行の仕組みを使わない場合はプロパティspring.batch.job.enabled false
でオフになります。webアプリケーションなどから任意のタイミングでジョブを起動する、いわゆるオンラインバッチの場合にこのプロパティを使用します。その場合Job
の起動にはJobLauncher
を使います。
spring.batch.job.enabled false