kagamihogeの日記

kagamihogeの日記です。

Spring Cloud Gatewayさわる

https://spring.io/projects/spring-cloud-gateway をさわる。はじめてさわるのでpredicateとかfilterとかをいくつか使ってみる。使い方わからなかったやつはさわってない。個人の日記レベルのさわってみた程度なんで、ちゃんとした情報はリファレンスを参照願います。

やってみる

build.gradle

https://start.spring.io/gatewayと入力すればorg.springframework.cloud:spring-cloud-starter-gatewayが入る。以下はそれで生成したもののコピペ。

plugins {
  id 'org.springframework.boot' version '2.2.6.RELEASE'
  id 'io.spring.dependency-management' version '1.0.9.RELEASE'
  id 'java'
}

repositories {
  mavenCentral()
}
ext {
  set('springCloudVersion', "Hoxton.SR4")
}
dependencies {
  implementation 'org.springframework.cloud:spring-cloud-starter-gateway'
  implementation 'org.springframework.boot:spring-boot-devtools'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
}
dependencyManagement {
  imports {
    mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
  }
}

gatewayのアプリケーション

gatewayのアプリケーションを作る。以下は2つのrouteを定義している。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class DemoApplication {
  @Bean
  public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
    return builder.routes()
        .route("path_google", r -> r.path("/search")
            .uri("https://www.google.co.jp"))
        .route("host_yahoo", r -> r.host("localhost:8080").and().path("/hotentry/all")
            .uri("https://b.hatena.ne.jp"))
        .build();
  }
  
  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
  }
}

どちらもrouteの引数は識別子なので、適当な一意の値を振る。

  1. 1つ目はhttp://localhost:8080/searchにアクセスするとhttps://www.google.co.jp/searchに飛ぶ。http://localhost:8080/search?q=hogeとかをブラウザでアクセスするとそれっぽい検索結果が表示される*1。なお、本来の使い方はAPIのプロキシ的な使い方なので、https://www.google.co.jpに飛ばすのは意味のある例ではないが、適当なAPI用意するの面倒なのでこのようにしている。動作確認できれば良いや、という判断*2
  2. 2つ目はホストがlocalhost:8080かつリクエストパスが/hotentry/allであればhttps://b.hatena.ne.jpに飛ばす。http://localhost:8080/hotentry/allにブラウザでアクセスするとそれっぽいホットエントリが表示される。

上記は設定ファイル(application.yamlとか)で書くこともできる。が、このエントリではそちらはやらない。

predicate

before/after/between

あるZonedDateTimeの、以前・以降・期間、にマッチしたときだけルーティングする。

      .route("path_google", r -> r.before(ZonedDateTime.of(2020, 5, 10, 0, 0, 0, 0, ZoneId.systemDefault()))
          .uri("https://www.google.co.jp"))
      .route("path_google", r -> r.after(ZonedDateTime.of(2020, 5, 10, 0, 0, 0, 0, ZoneId.systemDefault()))
          .uri("https://www.google.co.jp"))
      .route("path_google", r -> r.between(
          ZonedDateTime.of(2020, 5, 1, 0, 0, 0, 0, ZoneId.systemDefault()),
          ZonedDateTime.of(2020, 5, 10, 0, 0, 0, 0, ZoneId.systemDefault()))
          .uri("https://www.google.co.jp"))

cookie

cookieが指定の値を持ってるかどうか。正規表現使用可能なので下はvalueとかにマッチする。

      .route("path_google", r -> r.cookie("hogehoge", "va..e")
          .uri("https://www.google.co.jp"))

header

headerがあるか or headerが指定の値を持ってるかどうか、をチェックする。

      .route("path_google", r -> r.header("X-Hoge", "va..e")
          .uri("https://www.google.co.jp"))

host

hostが指定の値かどうか。正規表現使えるので、ドメイン名で何らかの分岐するときに使う感じか。

method

GETとかPOSTとかをチェックする。

      .route("path_google", r -> r.method(HttpMethod.GET)
          .uri("https://www.google.co.jp"))

path

上で触れたので省略。

query

上で触れたので省略。

remoteAddress

CIDRで条件をかける。

      .route("path_google", r -> r.remoteAddr("192.168.1.1/24")
          .uri("https://www.google.co.jp"))

weight

重み付け。たとえば、以下は8:2の重み付けでgoogletwitterにアクセスが飛ぶ。

        .route("path_route_high", r -> r.weight("group1", 8)
            .uri("https://www.google.com"))
        .route("path_route_low", r -> r.weight("group1", 2)
            .uri("https://twitter.com"))

filter

gatewayにリクエストが来たときと、ルーティング先から戻ってきたときに、何らかの処理を挟みたい場合にfilterを使う。

AddRequestHeader

gatewayで何らかのリクエストヘッダーを追加する。

        .route("path_google", r -> r.path("/{hoge}")
            .filters(f -> f.rewritePath("/.*", "/tools/request_headers.html").addRequestHeader("X-Hoge", "{hoge}"))
            .uri("https://uchy.me/"))

動作確認にはHTTPリクエストヘッダー表示ツール https://uchy.me/tools/request_headers.html を使わせて頂いた。http://localhost:8080/sample-valueとかするとX-Hoge sample-valueヘッダーが送信される。上記のように、pathのURLの変数を{hoge}にしてaddRequestHeaderでその値を使うことができる。

AddRequestParameter

gatewayで何らかのリクエストパラメータを追加する。

        .route("path_google", r -> r.path("/{hoge}")
            .filters(f -> f.rewritePath("/.*", "/search").addRequestParameter("q", "{hoge}"))
            .uri("https://www.google.com/"))

上はhttp://localhost:8080/search-wordにアクセスするとhttps://www.google.com/search?q=search-wordに行く。

AddResponseHeader

gatewayのクライアントに返すレスポンスにヘッダーを追加する。

        .route("path_google", r -> r.path("/search")
            .filters(f -> f.addResponseHeader("X-Hoge", "hoge"))
            .uri("https://www.google.com/"))

DedupeResponseHeader

試してない。

Hystrix GatewayFilter

リファレンスにこんなことが書いてある。なのでSpring Cloud CircuitBreaker GatewayFilterを使うのが良いと思われる。

Netflix has put Hystrix in maintenance mode. We suggest you use the Spring Cloud CircuitBreaker Gateway Filter with Resilience4J, as support for Hystrix will be removed in a future release.

Spring Cloud CircuitBreaker GatewayFilter

CircuitBreakerの詳細な説明は省略(俺自身ちゃんと理解してないので)して、とりあえず使ってみる。

まずCircuitBreakerの実体としてresilience4jを依存性に追加する。

  implementation 'org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j'

とりあえず使ってみる。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@SpringBootApplication
public class DemoApplication {
  
  @GetMapping("/forward")
  public String forward() {
    return "forward";
  }

  @Bean
  public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
    return builder.routes()
        .route("path_google", r -> r.path("/")
            .filters(f -> f.circuitBreaker(
                c -> c.setName("myCircuitBreaker")
                .setFallbackUri("forward:/forward")))
            .uri("http://asdasdasdf:8080/"))
        .build();
  }
  
  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
  }
}

上のコードは、http://localhost:8080/にアクセスすると存在しないhttp://asdasdasdf:8080/に飛ぼうとするので、fallback-uriforward:/forwardに飛ぶ。

FallbackHeaders

CircuitBreakerと組み合わせて使うもののようだけど、よくわからない……

MapRequestHeader

あるリクエストヘッダーを別の名前としても送信する。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.rewritePath("/.*", "/tools/request_headers.html").mapRequestHeader("X-FROM", "X-TO"))
            .uri("https://uchy.me/"))

上記のようにすると、リクエストヘッダーX-FROMにプラスでX-TOも送信される。

PrefixPath

ルーティング前にリクエストパスにプレフィクスを追加する。

        .route("path_google", r -> r.path("/{date}")
            .filters(f -> f.prefixPath("/hotentry/all/"))
            .uri("https://b.hatena.ne.jp/"))

たとえばhttp://localhost:8080/20200504とかするとhttps://b.hatena.ne.jp/hotentry/all/20200504にアクセスが行く。

PreserveHostHeader

試してない。

RequestRateLimiter

なんかむつかしそうなので試してない。

RedirectTo

リダイレクトする。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.redirect(302, "https://b.hatena.ne.jp/hotentry/all"))
            .uri("forward:/forward"))

RemoveRequestHeader

任意のリクエストヘッダーを削除する。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.removeRequestHeader("X-Hoge"))
            .uri("forward:/forward"))

RemoveResponseHeader

なんかエラーになる。追加で設定が必要ぽい?

java.lang.UnsupportedOperationException: null
    at org.springframework.http.ReadOnlyHttpHeaders.remove(ReadOnlyHttpHeaders.java:131) ~[spring-web-5.2.5.RELEASE.jar:5.2.5.RELEASE]
 Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 

RemoveRequestParameter

リクエストパラメータを削除する。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.removeRequestParameter("hoge"))
            .uri("forward:/forward"))

たとえばhttp://localhost:8080/?hoge=hogeとかするとhogeパラメータが削除される。

RewritePath

        .route("path_google", r -> r.path("/**")
            .filters(f -> f.rewritePath("path-is-(?<segment>.*)", "${segment}"))
            .uri("https://www.google.com"))

http://localhost:8080/path-is-searchとかするとhttps://www.google.com/searchにアクセスがいく。

Spring Cloud Gatewayとは直接関係ないが(?<name>X)という記法は https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/regex/Pattern.html にある通り「名前付きの前方参照を行う正規表現グループ」というもの。

RewriteLocationResponseHeader

試してない。

RewriteResponseHeader

なんかうまく動いてくれない。

        .route("path_google", r -> r.path("/**")
            .filters(f -> f.rewriteResponseHeader("X-Hoge", ".*", "foo"))
            .uri("http://localhost:8080/"))

SaveSession

Spring SessionとかSpring Securityとかが云々って書いてあるので試してない。

SecureHeaders

試してない。

SetPath

        .route("path_google", r -> r.path("/{date}")
            .filters(f -> f.setPath("/hotentry/all/{date}"))
            .uri("https://b.hatena.ne.jp/"))

http://localhost:8080/20200501とかするとhttps://b.hatena.ne.jp/hotentry/all/20200501にアクセスがいく。

SetRequestHeader

addでなくてreplaceする、とリファレンスに書いてあるけどイマイチ違いがわからん。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.setRequestHeader("X-Hoge", "hogeValue"))
            .uri("forward:/forward"))

SetResponseHeader

なんか実行時例外になる。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.setResponseHeader("X-Hoge-Hoge", "hogeValue"))
            .uri("forward:/forward"))

SetStatus

gatewayクライアントに返すときにステータスを変更する。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.setStatus(999))
            .uri("https://b.hatena.ne.jp/"))

上にするとステータスコード999が返ってくる。

StripPrefix

リクエストのパスを前から指定数分削除する。

        .route("path_google", r -> r.path("/**")
            .filters(f -> f.stripPrefix(2))
            .uri("https://b.hatena.ne.jp/"))

http://localhost:8080/strip1/strip2/hotentry/allとするとhttps://b.hatena.ne.jp/hotentry/allに行く。

Retry

リトライ。

        .route("path_google", r -> r.path("/**")
            .filters(f -> f.retry(3))
            .uri("http://localhost:8081/"))

動作確認用に、以下のような適当な5xxを返すエンドポイントを用意して(ポートは8081)、

@RestController
@SpringBootApplication
public class Main {
  
  @ResponseStatus(value = HttpStatus.BAD_GATEWAY)
  @GetMapping("/return-5xx")
  public String return5xx(@RequestHeader Map<String, String> map) {
    System.out.println("5xx");
    return "";
  }

  public static void main(String[] args) {
    SpringApplication.run(Main.class, args);
  }

}

http://localhost:8080/return-5xxとすると3回リトライする。

GatewayFilterSpec.retry(int retries)javadocによると、デフォルトでは5xxかつGETのときリトライ、と書いてある。細かく挙動を指定したい場合は別のメソッドを使う。他に指定可能な条件としては、例外とかbackoffとかがある。

RequestSize

リクエストのサイズに制限をかける。

        .route("path_google", r -> r.path("/**").and().method(HttpMethod.POST)
            .filters(f -> f.setRequestSize(DataSize.ofBytes(1)))
            .uri("https://b.hatena.ne.jp/"))

上のようにするとmax1byte制限になるので、適当なサイズのPOSTをすると413Request Entity Too Largeが返される。

ModifyRequestBody

リクエストボディを書き換える。

        .route("path_google", r -> r.path("/**").and().method(HttpMethod.POST)
            .filters(f -> f.modifyRequestBody(String.class, String.class, (exchange, s) -> Mono.just(s.toUpperCase())))
            .uri("http://httpbin.org/"))

上はPOSTの中身を大文字に書き換えている。

ModifyResponseBody

レスポンスボディを書き換える。

        .route("path_google", r -> r.path("/**").and().method(HttpMethod.POST)
            .filters(f -> f.modifyResponseBody(String.class, String.class, (exchange, s) -> Mono.just(s.toUpperCase())))
            .uri("http://httpbin.org/"))

上はgateway先から返ってきたレスポンスボディを大文字に書き換えている。

*1:gatewaygoogleにアクセスした結果のhtmlを返すのでブラウザで表示すると色々表示がおかしくなる

*2:http://httpbin.org の存在はこのエントリの後半に気づいた……

Spring Batch + MySQL + JTA(Atomikos)のUnable to commit new sequence value changes for BATCH_JOB_SEQ

現象

タイトル通りSpring Batch + MySQL + JTA(Atomikos)環境下で以下のような例外がスローされる。

Caused by: org.springframework.dao.DataAccessResourceFailureException: Unable to commit new sequence value changes for BATCH_JOB_SEQ
    at org.springframework.jdbc.support.incrementer.MySQLMaxValueIncrementer.getNextKey(MySQLMaxValueIncrementer.java:178) ~[spring-jdbc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.jdbc.support.incrementer.AbstractDataFieldMaxValueIncrementer.nextLongValue(AbstractDataFieldMaxValueIncrementer.java:128) ~[spring-jdbc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.batch.core.repository.dao.JdbcJobInstanceDao.createJobInstance(JdbcJobInstanceDao.java:113) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:140) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]

該当箇所をデバッガで停止してみると、内部的には以下のような例外をスローしている。

com.atomikos.jdbc.AtomikosSQLException: Cannot call method 'commit' while a global transaction is running

検証用ソースコード

package springbatchmysql;

import java.util.List;

import javax.sql.DataSource;
import javax.transaction.SystemException;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.cj.jdbc.MysqlXADataSource;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
  @Bean
  public DataSource ds() {
    MysqlXADataSource d = new MysqlXADataSource();
    d.setUrl("jdbc:mysql://localhost/mysql");
    d.setUser("root");
    d.setPassword("mysql");
    
//    PGXADataSource d = new PGXADataSource();
//    d.setUrl("jdbc:postgresql://localhost/postgres");
//    d.setUser("postgres");
//    d.setPassword("mysecretpassword");
    
    AtomikosDataSourceBean b = new AtomikosDataSourceBean();
    b.setXaDataSource(d);
    b.setMaxPoolSize(10);
    b.setUniqueResourceName("resourceName");
    
    return b;
  }
  
  @Bean
  public UserTransactionManager userTransactionManager() throws SystemException {
    UserTransactionManager ut = new UserTransactionManager();
    return ut;
  }

  @Bean("myTransactionManager")
  public PlatformTransactionManager t(UserTransactionManager userTransactionManager) {
    JtaTransactionManager tm = new JtaTransactionManager(userTransactionManager, userTransactionManager);
    tm.setAllowCustomIsolationLevels(true);
    return tm;
  }

  @Autowired
  JobBuilderFactory jobBuilderFactory;

  @Autowired
  StepBuilderFactory stepBuilderFactory;

  @Bean
  public Job importUserJob(Step step1) {
    return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).start(step1)
        .build();
  }

  @Autowired
  JdbcTemplate jdbc;

  @Bean
  public Step step1() {
    ListItemReader<Integer> r = new ListItemReader<Integer>(List.of(1, 2, 3, 4, 5));
    
    return stepBuilderFactory.get("step1").chunk(1).reader(r).writer(items -> {
      items.forEach(i -> jdbc.update("insert into  aaa(user_id) values (1)"));
      
    }).build();
  }

  @Bean
  public BatchConfigurer batchConfigurer(@Qualifier("myTransactionManager") PlatformTransactionManager tm) {
    return new DefaultBatchConfigurer() {
      @Override
      public PlatformTransactionManager getTransactionManager() {
        return tm;
      }
    };
  }
}

原因

先に断り書きだが、俺のJTAとAtomikosに対する理解が浅いので推測が混じっている点は容赦願いたい。

まず、例外が起きているMySQLMaxValueIncrementerについて。これはクラス名の通りMySQL用のincrementerで、その挙動はインクリメントする際に別のコネクションを取得しようとする。

で、エラー内容がCannot call method 'commit' while a global transaction is runningを踏まえると、おそらくJTAのグルーバルトランザクションが既にある状態では別コネクションを取得できない、のだと思われる。

なので、JTAの仕様的には妥当な挙動に思える。

ただし、先に「インクリメントする際に別のコネクションを取得」と書いたが、これはMySQLのみの挙動。実際、データソースをPostgreSQLに変えたところ、例外は発生しない。

MySQLMaxValueIncrementerでコネクション取得する箇所は以下になっている。useNewConnectiontrueが指定される。

               if (this.useNewConnection) {
                    con = getDataSource().getConnection();
                    if (con.getAutoCommit()) {
                        mustRestoreAutoCommit = true;
                        con.setAutoCommit(false);
                    }
                }

一方でPostgeSQLの場合こうなっている。DataSourceUtils.getConnectionjavadoc読む限りではJTA下では新規コネクションを生成はしない(たぶん)。

Connection con = DataSourceUtils.getConnection(getDataSource());

で、このケースでMySQLだけ特別扱いな理由は、たぶん以下のソースコード内のコメントと思われる。以下はMySQLMaxValueIncrementerの抜粋。

           /*
           * If useNewConnection is true, then we obtain a non-managed connection so our modifications
           * are handled in a separate transaction. If it is false, then we use the current transaction's
           * connection relying on the use of a non-transactional storage engine like MYISAM for the
           * incrementer table. We also use straight JDBC code because we need to make sure that the insert
           * and select are performed on the same connection (otherwise we can't be sure that last_insert_id()
           * returned the correct value).
           */

Spring Boot 2.2.x + Spring Batchうごかす

Spring Batchのチュートリアル https://spring.io/guides/gs/batch-processing/ を動かします。このサンプルはいわゆるデータインポート処理で、CSVからデータ読み込み・変換・保存、を行います。

プロジェクト作成

Spring Initializr https://start.spring.io/ で作ります。Project Metadataは適当に入力でOKで、ここでは以下のように入力したものとして進めます。Dependenciesには、Spring Batch, Lombok, hsql, を追加します。

f:id:kagamihoge:20200405132232p:plain

Spring Bootの組み込みDB自動設定

Spring Batchは基本的にDBが必須です。

一方、Spring Bootは依存性に組み込みDBを含めると自動設定する機能があるので、これを使います。ここでは組み込みDBにhsqlを用いるため、プロジェクト作成時の依存性にこれを追加します。この組み込みDBは、Spring Bootアプリケーション起動時に作成され、終了時に破棄されます。つまり、アプリケーション起動のたびにDBが再作成されます。サンプルの挙動確認に便利なのでこの機能を使います。

詳細はSpring Boot Reference Documentation - Embedded Database Supportを参照してください。

build.gradle

Spring Initializrが生成するbuild.gradleは以下の通りです。

plugins {
    id 'org.springframework.boot' version '2.2.6.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'kagamihoge'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
    developmentOnly
    runtimeClasspath {
        extendsFrom developmentOnly
    }
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    compileOnly 'org.projectlombok:lombok'
    runtimeOnly 'org.hsqldb:hsqldb'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.springframework.batch:spring-batch-test'
}

test {
    useJUnitPlatform()
}

解説

SpringbatchsampleApplication

Spring Bootアプリケーションの実行ポイントとなるmainを持つクラスです。このクラスを実行することでSpring Batchのジョブが実行されます。ただしこの段階ではjob定義が無いので何も起きません。

package kagamihoge.springbatchsample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbatchsampleApplication {

  public static void main(String[] args) {
    SpringApplication.run(SpringbatchsampleApplication.class, args);
  }
}

スキーマ&データ初期化SQL - schema-all.sql

Spring Bootにはアプリケーション起動時にDDLとデータ投入SQLを実行して、自動的にDB初期化する機能があります。これを利用してサンプルスキーマを作成します。これを使うにはクラスパス下にschema-${platform}.sqldata-${platform}.sqlを配置します。

よって、以下のような/src/main/resources/schema-all.sqlを作成します。

DROP TABLE people IF EXISTS;

CREATE TABLE people  (
    person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);

前述の通り、hsqlがアプリケーションの起動のたびに再作成されます。また、そのたびにDB初期化も行われます。つまり、何回起動しても同一の状態でアプリケーションが起動できます。これもサンプルの挙動確認に便利です。詳細はSpring Boot Reference Documentation - 10.3. Initialize a Databaseを参照してください。

なお、DB初期化(peopleテーブルの作成)を確認したい場合、以下のようなSpringbatchsampleApplicationを作成して実行します。

package kagamihoge.springbatchsample;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jdbc.core.JdbcTemplate;

@SpringBootApplication
public class SpringbatchsampleApplication implements CommandLineRunner {

  public static void main(String[] args) {
    SpringApplication.run(SpringbatchsampleApplication.class, args);
  }

  @Autowired
  JdbcTemplate j;

  @Override
  public void run(String... args) throws Exception {
    System.out.println(j.queryForObject("select count(*) from people", Integer.class));
  }

}

BatchConfiguration - Spring Batchのconfig(最低限度版)

先述の通りSpring BatchのサンプルではDBインポートを行いますが、ここではまずSpring Batchが起動できるか確認するための最低限の設定を作ります。最低限の設定として、以下のジョブはListを5件ずつ表示します。

なお、説明簡易化のためすべてのクラスは同一packageに配置します。実際には、用途・役割別にpackageに分類すると思います。

package kagamihoge.springbatchsample;

import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
  @Autowired
  JobBuilderFactory jobBuilderFactory;

  @Autowired
  StepBuilderFactory stepBuilderFactory;
  
  @Bean
  public Job importUserJob(Step step1) {
    return jobBuilderFactory.get("importUserJob")
      .incrementer(new RunIdIncrementer())
      .start(step1)
      .build();
  }
  
  @Bean
  public Step step1() {
    ItemReader<Integer> reader = new ListItemReader<Integer>(
        IntStream.range(1, 20).boxed().collect(Collectors.toList()));
    
    return stepBuilderFactory.get("step1")
      .<Integer, Integer> chunk(5)
      .reader(reader)
      .writer(items -> System.out.println(items))
      .build();
  }
}

これを実行すると以下の実行結果になります。

[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]
[11, 12, 13, 14, 15]
[16, 17, 18, 19]
  • @EnableBatchProcessing - Spring Batchはいくつかの必須beanの設定が必要です。このbean設定は面倒な上に定型的なため、アノテーションを付与するとデフォルト値で自動設定が行われます。自動設定されるbeanの詳細はEnableBatchProcessingのjavadocSpring Batch Reference Documentation - 4.2. Java Configを参照してください。
  • JobBuilderFactoryStepBuilderFactory - Spring Batchのjobstepを定義するためのbeanです。上述の@EnableBatchProcessing で自動的に使用可能になるbeanです。サンプルコードの通りbuilderスタイルで各種定義をします。
  • job - Spring Batchの実行単位です。このサンプルではjobは1つのstepを持ちます。
  • step - このstepは、List<Integer>から1つずつreadし、5個ずつのListをwriterでコンソールに表示します。単にSpring Batchの起動確認のコードに過ぎないため、大して意味のないコードです。後でCSVからreadしてDBにwriteするサンプルコードとの比較にしてください。
    • jobstepの関係はぐぐれば分かりやすい図が色々出てくる、例えばTERASOLUNA Batch Framework - Spring Batchの基本構造など、を適宜参照してください。
    • chunk - chunk(n)n回readしてwriteという挙動をするので、readやwriteの具体的な処理を実装します。図解が無いと理解しにくいのでTERASOLUNA Batch Framework - チャンクモデルなどを参照してください。
      • reader - ListItemReaderはSpring Batch組み込みクラスです。Listから1要素ずつreadします。
      • writer - chunk(5)なのでサイズ5のList<Integer>がwriterに渡されてコンソールに表示しています。
      • readerwriterには、それぞれ、ItemReaderItemWriterインタフェースの実装を渡します。インタフェースの実装であればよく、Springのマネージドbean(@Bean@Component)にしてもしなくても良いです。実装が十分に小さければ、上のようにラムダ式で書くことも可能です。

BatchConfiguration - Spring Batchサンプルの方のconfig

Spring Batchのサンプルをそのままコピペした版のconfigです。こちらは、CSVからデータ読み込み・変換・保存、を行います。

package kagamihoge.springbatchsample;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
  @Autowired
  JobBuilderFactory jobBuilderFactory;

  @Autowired
  StepBuilderFactory stepBuilderFactory;
  
  @Bean
  public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
    return jobBuilderFactory.get("importUserJob")
      .incrementer(new RunIdIncrementer())
      .listener(listener)
      .flow(step1)
      .end()
      .build();
  }

  @Bean
  public Step step1(JdbcBatchItemWriter<Person> writer) {
    return stepBuilderFactory.get("step1")
      .<Person, Person> chunk(10)
      .reader(reader())
      .processor(processor())
      .writer(writer)
      .build();
  }

  @Bean
  public FlatFileItemReader<Person> reader() {
    return new FlatFileItemReaderBuilder<Person>()
      .name("personItemReader")
      .resource(new ClassPathResource("sample-data.csv"))
      .delimited()
      .names(new String[]{"firstName", "lastName"})
      .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
        setTargetType(Person.class);
      }})
      .build();
  }

  @Bean
  public PersonItemProcessor processor() {
    return new PersonItemProcessor();
  }

  @Bean
  public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Person>()
      .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
      .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
      .dataSource(dataSource)
      .build();
  }
}

job

  • incrementer - 毎回異なるジョブ引数で実行するための設定。後述。
  • JobExecutionListener - joblistenerで実行前後に処理を差し込むリスナーを定義します。後述。
  • flow - Spring Batchではstepを複数まとめるflowjobを定義できます。ここではstep1つだけのflowですが、順次実行の他に条件分岐などが可能です。詳細はSpring Batch - Reference Documentation - 5.3. Controlling Step Flowを参照してください。

step

  • <Person, Person> chunk(10) - readerとwriterの入出力クラスをジェネリクスで指定します。このサンプルの場合ItemReader<Person>ItemWriter<Person>という意味です。ここでは両方とも同一クラスですが、processorで何らかの型変換がある場合などは入出力が異なるクラスになります。
  • reader - Spring Batchの組み込みクラスFlatFileItemReaderを使用します。このクラスはファイル読み込みの汎用readerです。読み込み元ファイルのパスやどの列をどのプロパティに割り当てるか、などを設定します。Spring Batchには典型的な動作用の組み込み実装があるため、ありがちな実装の場合にはjavadocを参照すると良いです。
    • resource - クラスパス下のsample-data.csv/src/main/resources/sample-data.csv
    • delimited().names - デリミタ有(デフォルト,)、CSVデータ行(例:Jill,Doe)をそれぞれfirstName, lastNameのキー名で読み込みます。
    • fieldSetMapper - 読み込んだデータをJavaBeanの規則でインスタンスPersonクラス)作成します。
    • FlatFileItemReaderは各種用途に合うよう様々な拡張ポイントがあり、結構ややこしいため、詳細はTERASOLUNA Batch - 5.3.1.2. フラットファイルの入出力を行うコンポーネントを参照してください。
  • processor - 後述。ちなみにprocessorは不要であれば省略可能です。
  • writer - これもSpring Batchの組み込みクラスJdbcBatchItemWriterです。このクラスはprocessorから*1渡されるデータで指定のSQLを実行します。
    • sql - 各データに適用するSQLを指定します。プレースホルダPersonをどう対応付けるかは下のitemSqlParameterSourceProviderを参照してください。
    • itemSqlParameterSourceProvider - BeanPropertyItemSqlParameterSourceProviderプレースホルダとJavaBeanの対応付けを行う組み込みクラスです。
    • dataSource - Spring Bootの自動設定により、組み込みDBのDataSource beanが自動的に作成されるので、それをここで設定しています。JdbcBatchItemWriterは内部的にこのDataSourceを基にしたJdbcTemplatesqlを実行します。

/src/main/resources/sample-data.csv

readerで指定するCSVファイルです。

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

PersonItemProcessor

processorはreaderで読み込んだデータの加工・変換を行います。このサンプルでは姓名を大文字にします。

package kagamihoge.springbatchsample;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

  private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

  @Override
  public Person process(final Person person) throws Exception {
    final String firstName = person.getFirstName().toUpperCase();
    final String lastName = person.getLastName().toUpperCase();

    final Person transformedPerson = new Person(firstName, lastName);

    log.info("Converting (" + person + ") into (" + transformedPerson + ")");

    return transformedPerson;
  }

}
  • ItemProcessor<Person, Person> - processorの入出力クラスをジェネリクスで指定します。今回はどちらもPersonですが、異なるクラスにすることも可能です。

JobCompletionNotificationListener

ジョブ完了時のリスナーです。DBにインポートされたデータ行をすべて出力します。

package kagamihoge.springbatchsample;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

  private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

  private final JdbcTemplate jdbcTemplate;

  @Autowired
  public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
  }

  @Override
  public void afterJob(JobExecution jobExecution) {
    if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
      log.info("!!! JOB FINISHED! Time to verify the results");

      jdbcTemplate.query("SELECT first_name, last_name FROM people",
        (rs, row) -> new Person(
          rs.getString(1),
          rs.getString(2))
      ).forEach(person -> log.info("Found <" + person + "> in the database."));
    }
  }
}
  • jobExecution.getStatus() == BatchStatus.COMPLETED - この意味は、jobが正常終了した場合、を意味します。jobの正常or異常終了で後処理が変わる場合に利用します。

Person

package kagamihoge.springbatchsample;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Person {
  String lastName;
  String firstName;
}

RunIdIncrementer - jobパラメータに連番を付与

Spring Batchのjobはパラメータを取ることが出来、そのパラメータが同一なjobは2回以上実行出来ません。これで二重起動防止が出来ますが、パラメータが無い場合や二重起動が問題無いケースは回避策が必要です。

その場合RunIdIncrementerを使います。このクラスはjob実行時に連番のidを割り振ります。具体的にはrun.idというキーで連番のパラメータになります。以下の例はjob実行時のパラメータの履歴です(このサンプルはDBを永続化しないため、下記は別のjobの例です)。

f:id:kagamihoge:20200406201410p:plain

なお、上記テーブルはSpring Batchのメタデータと呼ばれる機能ですが、本エントリではこれ以上触れません。詳細はSpring Batch - Appendix B: Meta-Data SchemaTERASOLUNA Batch Framework - Architectureを参照してください。

ジョブの実行

gradlew bootRunあるいはgradlew build実行後にjava -jar build\libs\springbatchsample-0.0.1-SNAPSHOT.jarで実行します。

Spring BootでSpring Batchを実行する場合、自動的にすべてのJob型のbeanを実行します。指定のジョブだけ実行したい場合はspring.batch.job.namesプロパティを使用します。以下は指定例です。

spring.batch.job.names importUserJob

また、上記の自動実行の仕組みを使わない場合はプロパティspring.batch.job.enabled falseでオフになります。webアプリケーションなどから任意のタイミングでジョブを起動する、いわゆるオンラインバッチの場合にこのプロパティを使用します。その場合Jobの起動にはJobLauncherを使います。

spring.batch.job.enabled false

参考情報

*1:processorが無ければreaderから