kagamihogeの日記

kagamihogeの日記です。

Spring Boot 2.2.x + Spring Batchうごかす

Spring Batchのチュートリアル https://spring.io/guides/gs/batch-processing/ を動かします。このサンプルはいわゆるデータインポート処理で、CSVからデータ読み込み・変換・保存、を行います。

プロジェクト作成

Spring Initializr https://start.spring.io/ で作ります。Project Metadataは適当に入力でOKで、ここでは以下のように入力したものとして進めます。Dependenciesには、Spring Batch, Lombok, hsql, を追加します。

f:id:kagamihoge:20200405132232p:plain

Spring Bootの組み込みDB自動設定

Spring Batchは基本的にDBが必須です。

一方、Spring Bootは依存性に組み込みDBを含めると自動設定する機能があるので、これを使います。ここでは組み込みDBにhsqlを用いるため、プロジェクト作成時の依存性にこれを追加します。この組み込みDBは、Spring Bootアプリケーション起動時に作成され、終了時に破棄されます。つまり、アプリケーション起動のたびにDBが再作成されます。サンプルの挙動確認に便利なのでこの機能を使います。

詳細はSpring Boot Reference Documentation - Embedded Database Supportを参照してください。

build.gradle

Spring Initializrが生成するbuild.gradleは以下の通りです。

plugins {
    id 'org.springframework.boot' version '2.2.6.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'kagamihoge'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
    developmentOnly
    runtimeClasspath {
        extendsFrom developmentOnly
    }
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    compileOnly 'org.projectlombok:lombok'
    runtimeOnly 'org.hsqldb:hsqldb'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.springframework.batch:spring-batch-test'
}

test {
    useJUnitPlatform()
}

解説

SpringbatchsampleApplication

Spring Bootアプリケーションの実行ポイントとなるmainを持つクラスです。このクラスを実行することでSpring Batchのジョブが実行されます。ただしこの段階ではjob定義が無いので何も起きません。

package kagamihoge.springbatchsample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbatchsampleApplication {

  public static void main(String[] args) {
    SpringApplication.run(SpringbatchsampleApplication.class, args);
  }
}

スキーマ&データ初期化SQL - schema-all.sql

Spring Bootにはアプリケーション起動時にDDLとデータ投入SQLを実行して、自動的にDB初期化する機能があります。これを利用してサンプルスキーマを作成します。これを使うにはクラスパス下にschema-${platform}.sqldata-${platform}.sqlを配置します。

よって、以下のような/src/main/resources/schema-all.sqlを作成します。

DROP TABLE people IF EXISTS;

CREATE TABLE people  (
    person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);

前述の通り、hsqlがアプリケーションの起動のたびに再作成されます。また、そのたびにDB初期化も行われます。つまり、何回起動しても同一の状態でアプリケーションが起動できます。これもサンプルの挙動確認に便利です。詳細はSpring Boot Reference Documentation - 10.3. Initialize a Databaseを参照してください。

なお、DB初期化(peopleテーブルの作成)を確認したい場合、以下のようなSpringbatchsampleApplicationを作成して実行します。

package kagamihoge.springbatchsample;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jdbc.core.JdbcTemplate;

@SpringBootApplication
public class SpringbatchsampleApplication implements CommandLineRunner {

  public static void main(String[] args) {
    SpringApplication.run(SpringbatchsampleApplication.class, args);
  }

  @Autowired
  JdbcTemplate j;

  @Override
  public void run(String... args) throws Exception {
    System.out.println(j.queryForObject("select count(*) from people", Integer.class));
  }

}

BatchConfiguration - Spring Batchのconfig(最低限度版)

先述の通りSpring BatchのサンプルではDBインポートを行いますが、ここではまずSpring Batchが起動できるか確認するための最低限の設定を作ります。最低限の設定として、以下のジョブはListを5件ずつ表示します。

なお、説明簡易化のためすべてのクラスは同一packageに配置します。実際には、用途・役割別にpackageに分類すると思います。

package kagamihoge.springbatchsample;

import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
  @Autowired
  JobBuilderFactory jobBuilderFactory;

  @Autowired
  StepBuilderFactory stepBuilderFactory;
  
  @Bean
  public Job importUserJob(Step step1) {
    return jobBuilderFactory.get("importUserJob")
      .incrementer(new RunIdIncrementer())
      .start(step1)
      .build();
  }
  
  @Bean
  public Step step1() {
    ItemReader<Integer> reader = new ListItemReader<Integer>(
        IntStream.range(1, 20).boxed().collect(Collectors.toList()));
    
    return stepBuilderFactory.get("step1")
      .<Integer, Integer> chunk(5)
      .reader(reader)
      .writer(items -> System.out.println(items))
      .build();
  }
}

これを実行すると以下の実行結果になります。

[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]
[11, 12, 13, 14, 15]
[16, 17, 18, 19]
  • @EnableBatchProcessing - Spring Batchはいくつかの必須beanの設定が必要です。このbean設定は面倒な上に定型的なため、アノテーションを付与するとデフォルト値で自動設定が行われます。自動設定されるbeanの詳細はEnableBatchProcessingのjavadocSpring Batch Reference Documentation - 4.2. Java Configを参照してください。
  • JobBuilderFactoryStepBuilderFactory - Spring Batchのjobstepを定義するためのbeanです。上述の@EnableBatchProcessing で自動的に使用可能になるbeanです。サンプルコードの通りbuilderスタイルで各種定義をします。
  • job - Spring Batchの実行単位です。このサンプルではjobは1つのstepを持ちます。
  • step - このstepは、List<Integer>から1つずつreadし、5個ずつのListをwriterでコンソールに表示します。単にSpring Batchの起動確認のコードに過ぎないため、大して意味のないコードです。後でCSVからreadしてDBにwriteするサンプルコードとの比較にしてください。
    • jobstepの関係はぐぐれば分かりやすい図が色々出てくる、例えばTERASOLUNA Batch Framework - Spring Batchの基本構造など、を適宜参照してください。
    • chunk - chunk(n)n回readしてwriteという挙動をするので、readやwriteの具体的な処理を実装します。図解が無いと理解しにくいのでTERASOLUNA Batch Framework - チャンクモデルなどを参照してください。
      • reader - ListItemReaderはSpring Batch組み込みクラスです。Listから1要素ずつreadします。
      • writer - chunk(5)なのでサイズ5のList<Integer>がwriterに渡されてコンソールに表示しています。
      • readerwriterには、それぞれ、ItemReaderItemWriterインタフェースの実装を渡します。インタフェースの実装であればよく、Springのマネージドbean(@Bean@Component)にしてもしなくても良いです。実装が十分に小さければ、上のようにラムダ式で書くことも可能です。

BatchConfiguration - Spring Batchサンプルの方のconfig

Spring Batchのサンプルをそのままコピペした版のconfigです。こちらは、CSVからデータ読み込み・変換・保存、を行います。

package kagamihoge.springbatchsample;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
  @Autowired
  JobBuilderFactory jobBuilderFactory;

  @Autowired
  StepBuilderFactory stepBuilderFactory;
  
  @Bean
  public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
    return jobBuilderFactory.get("importUserJob")
      .incrementer(new RunIdIncrementer())
      .listener(listener)
      .flow(step1)
      .end()
      .build();
  }

  @Bean
  public Step step1(JdbcBatchItemWriter<Person> writer) {
    return stepBuilderFactory.get("step1")
      .<Person, Person> chunk(10)
      .reader(reader())
      .processor(processor())
      .writer(writer)
      .build();
  }

  @Bean
  public FlatFileItemReader<Person> reader() {
    return new FlatFileItemReaderBuilder<Person>()
      .name("personItemReader")
      .resource(new ClassPathResource("sample-data.csv"))
      .delimited()
      .names(new String[]{"firstName", "lastName"})
      .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
        setTargetType(Person.class);
      }})
      .build();
  }

  @Bean
  public PersonItemProcessor processor() {
    return new PersonItemProcessor();
  }

  @Bean
  public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Person>()
      .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
      .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
      .dataSource(dataSource)
      .build();
  }
}

job

  • incrementer - 毎回異なるジョブ引数で実行するための設定。後述。
  • JobExecutionListener - joblistenerで実行前後に処理を差し込むリスナーを定義します。後述。
  • flow - Spring Batchではstepを複数まとめるflowjobを定義できます。ここではstep1つだけのflowですが、順次実行の他に条件分岐などが可能です。詳細はSpring Batch - Reference Documentation - 5.3. Controlling Step Flowを参照してください。

step

  • <Person, Person> chunk(10) - readerとwriterの入出力クラスをジェネリクスで指定します。このサンプルの場合ItemReader<Person>ItemWriter<Person>という意味です。ここでは両方とも同一クラスですが、processorで何らかの型変換がある場合などは入出力が異なるクラスになります。
  • reader - Spring Batchの組み込みクラスFlatFileItemReaderを使用します。このクラスはファイル読み込みの汎用readerです。読み込み元ファイルのパスやどの列をどのプロパティに割り当てるか、などを設定します。Spring Batchには典型的な動作用の組み込み実装があるため、ありがちな実装の場合にはjavadocを参照すると良いです。
    • resource - クラスパス下のsample-data.csv/src/main/resources/sample-data.csv
    • delimited().names - デリミタ有(デフォルト,)、CSVデータ行(例:Jill,Doe)をそれぞれfirstName, lastNameのキー名で読み込みます。
    • fieldSetMapper - 読み込んだデータをJavaBeanの規則でインスタンスPersonクラス)作成します。
    • FlatFileItemReaderは各種用途に合うよう様々な拡張ポイントがあり、結構ややこしいため、詳細はTERASOLUNA Batch - 5.3.1.2. フラットファイルの入出力を行うコンポーネントを参照してください。
  • processor - 後述。ちなみにprocessorは不要であれば省略可能です。
  • writer - これもSpring Batchの組み込みクラスJdbcBatchItemWriterです。このクラスはprocessorから*1渡されるデータで指定のSQLを実行します。
    • sql - 各データに適用するSQLを指定します。プレースホルダPersonをどう対応付けるかは下のitemSqlParameterSourceProviderを参照してください。
    • itemSqlParameterSourceProvider - BeanPropertyItemSqlParameterSourceProviderプレースホルダとJavaBeanの対応付けを行う組み込みクラスです。
    • dataSource - Spring Bootの自動設定により、組み込みDBのDataSource beanが自動的に作成されるので、それをここで設定しています。JdbcBatchItemWriterは内部的にこのDataSourceを基にしたJdbcTemplatesqlを実行します。

/src/main/resources/sample-data.csv

readerで指定するCSVファイルです。

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

PersonItemProcessor

processorはreaderで読み込んだデータの加工・変換を行います。このサンプルでは姓名を大文字にします。

package kagamihoge.springbatchsample;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

  private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

  @Override
  public Person process(final Person person) throws Exception {
    final String firstName = person.getFirstName().toUpperCase();
    final String lastName = person.getLastName().toUpperCase();

    final Person transformedPerson = new Person(firstName, lastName);

    log.info("Converting (" + person + ") into (" + transformedPerson + ")");

    return transformedPerson;
  }

}
  • ItemProcessor<Person, Person> - processorの入出力クラスをジェネリクスで指定します。今回はどちらもPersonですが、異なるクラスにすることも可能です。

JobCompletionNotificationListener

ジョブ完了時のリスナーです。DBにインポートされたデータ行をすべて出力します。

package kagamihoge.springbatchsample;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

  private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

  private final JdbcTemplate jdbcTemplate;

  @Autowired
  public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
  }

  @Override
  public void afterJob(JobExecution jobExecution) {
    if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
      log.info("!!! JOB FINISHED! Time to verify the results");

      jdbcTemplate.query("SELECT first_name, last_name FROM people",
        (rs, row) -> new Person(
          rs.getString(1),
          rs.getString(2))
      ).forEach(person -> log.info("Found <" + person + "> in the database."));
    }
  }
}
  • jobExecution.getStatus() == BatchStatus.COMPLETED - この意味は、jobが正常終了した場合、を意味します。jobの正常or異常終了で後処理が変わる場合に利用します。

Person

package kagamihoge.springbatchsample;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Person {
  String lastName;
  String firstName;
}

RunIdIncrementer - jobパラメータに連番を付与

Spring Batchのjobはパラメータを取ることが出来、そのパラメータが同一なjobは2回以上実行出来ません。これで二重起動防止が出来ますが、パラメータが無い場合や二重起動が問題無いケースは回避策が必要です。

その場合RunIdIncrementerを使います。このクラスはjob実行時に連番のidを割り振ります。具体的にはrun.idというキーで連番のパラメータになります。以下の例はjob実行時のパラメータの履歴です(このサンプルはDBを永続化しないため、下記は別のjobの例です)。

f:id:kagamihoge:20200406201410p:plain

なお、上記テーブルはSpring Batchのメタデータと呼ばれる機能ですが、本エントリではこれ以上触れません。詳細はSpring Batch - Appendix B: Meta-Data SchemaTERASOLUNA Batch Framework - Architectureを参照してください。

ジョブの実行

gradlew bootRunあるいはgradlew build実行後にjava -jar build\libs\springbatchsample-0.0.1-SNAPSHOT.jarで実行します。

Spring BootでSpring Batchを実行する場合、自動的にすべてのJob型のbeanを実行します。指定のジョブだけ実行したい場合はspring.batch.job.namesプロパティを使用します。以下は指定例です。

spring.batch.job.names importUserJob

また、上記の自動実行の仕組みを使わない場合はプロパティspring.batch.job.enabled falseでオフになります。webアプリケーションなどから任意のタイミングでジョブを起動する、いわゆるオンラインバッチの場合にこのプロパティを使用します。その場合Jobの起動にはJobLauncherを使います。

spring.batch.job.enabled false

参考情報

*1:processorが無ければreaderから