kagamihogeの日記

kagamihogeの日記です。

Spring Cloud Circuit Breakerさわる

Spring Cloud Circuit Breaker https://spring.io/projects/spring-cloud-circuitbreaker#overviewチュートリアルレベルのことをやる。

Spring Cloud Circuit Breaker自身は抽象APIを提供するだけで、使う際には具体的な実装を選択する、というタイプ。以前のJSUG勉強会でResilience4Jが良いとかなんとか聞いた記憶があるので、今回はこれを使う。

とりあえず使ってみる

plugins {
  id 'org.springframework.boot' version '2.2.7.RELEASE'
  id 'io.spring.dependency-management' version '1.0.9.RELEASE'
  id 'java'
}


sourceCompatibility = '11'
configurations {
  developmentOnly
  runtimeClasspath {
    extendsFrom developmentOnly
  }
}
repositories {
  mavenCentral()
}
ext {
  set('springCloudVersion', "Hoxton.SR4")
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-web'
  implementation 'org.springframework.cloud:spring-cloud-starter-circuitbreaker-resilience4j'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
}
dependencyManagement {
  imports {
    mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
  }
}
test {
  useJUnitPlatform()
}
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
public class Application {

  @Bean
  public RestTemplate restTemplate() {
    return new RestTemplate();
  }

  public static void main(String[] args) {
    new SpringApplicationBuilder(Application.class).web(WebApplicationType.SERVLET).run(args);
  }
}
import org.springframework.cloud.client.circuitbreaker.CircuitBreakerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

@RestController
public class DemoController {
  private RestTemplate rest;
  private CircuitBreakerFactory cbFactory;

  public DemoController(RestTemplate rest, CircuitBreakerFactory cbFactory) {
    this.rest = rest;
    this.cbFactory = cbFactory;
  }

  @GetMapping("/sample")
  public String sample() {
    return cbFactory.create("sample").run(
        () -> rest.getForObject("http://localhost:8080/hoge", String.class), throwable -> "fallback");
  }

  @GetMapping("/hoge")
  public String slow() {
    return "hoge";
  }
}

これでhttp://localhost:8080/sampleにアクセスするとhogeと返ってくる。

で、以下のように意図的に実行時例外を発生させてみると、fallbackと返ってくる。

    return cbFactory.create("slow").run(
        () -> rest.getForObject("invalid-url", String.class), throwable -> "fallback"); //不正なURL文字列

設定変更

デフォルト設定変更

特に何も設定しない状態だと1秒でタイムアウトする。ので、デフォルトのタイムアウト設定を変更してみる。以下はほぼチュートリアルからコピペしてきたもの。5秒タイムアウトにしてある。

  @Bean
  public Customizer<Resilience4JCircuitBreakerFactory> defaultCustomizer() {
      return factory -> {factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
              .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(5)).build())
              .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults())
              .build());
      };
  }

デモ用に6秒スリープするエンドポイントを用意し、そこへアクセスする。こうすると5秒でタイムアウト、fallbackしてfallbackが返ってくる。

  @GetMapping("/sample")
  public String sample() {
    return cbFactory.create("sample").run(() -> rest.getForObject("http://localhost:8080/slow", String.class),
        throwable -> "fallback");
  }

  @GetMapping("/slow")
  public String slow() throws InterruptedException {
    TimeUnit.SECONDS.sleep(6L);
    return "slow";
  }

指定

次に、それぞれのCircuitBreakerごとに異なる設定をしてみる。

以下のように、デフォルトは10秒、idがtimeout-3secは3秒でタイムアウトに設定する。

  @Bean
  public Customizer<Resilience4JCircuitBreakerFactory> defaultCustomizer() {
    return factory -> {
      factory.configureDefault(id -> new Resilience4JConfigBuilder(id)
          .timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(10)).build())
          .circuitBreakerConfig(CircuitBreakerConfig.ofDefaults()).build());

      factory.configure(
          c -> c.timeLimiterConfig(TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(3)).build()).build(), "timeout-3sec");
    };
  }

動作確認用のcontrollerをつくる。/sample2のところはidをtimeout-3secにするのがポイント。

  @GetMapping("/sample")
  public String sample() {
    return cbFactory.create("sample").run(() -> rest.getForObject("http://localhost:8080/slow", String.class),
        throwable -> "fallback");
  }
  
  @GetMapping("/sample2")
  public String sample2() {
    return cbFactory.create("timeout-3sec").run(() -> rest.getForObject("http://localhost:8080/slow", String.class),
        throwable -> "fallback");
  }

  @GetMapping("/slow")
  public String slow() throws InterruptedException {
    TimeUnit.SECONDS.sleep(6L);
    return "slow";
  }

この状態で、

  • http://localhost:8080/sampleは、デフォルト10秒タイムアウトなので、6秒経過してslowが返る。
  • http://localhost:8080/sample2は、3秒タイムアウトなので、3秒経過するとfallbackが返る。

Spring Cloud Gatewayさわる

https://spring.io/projects/spring-cloud-gateway をさわる。はじめてさわるのでpredicateとかfilterとかをいくつか使ってみる。使い方わからなかったやつはさわってない。個人の日記レベルのさわってみた程度なんで、ちゃんとした情報はリファレンスを参照願います。

やってみる

build.gradle

https://start.spring.io/gatewayと入力すればorg.springframework.cloud:spring-cloud-starter-gatewayが入る。以下はそれで生成したもののコピペ。

plugins {
  id 'org.springframework.boot' version '2.2.6.RELEASE'
  id 'io.spring.dependency-management' version '1.0.9.RELEASE'
  id 'java'
}

repositories {
  mavenCentral()
}
ext {
  set('springCloudVersion', "Hoxton.SR4")
}
dependencies {
  implementation 'org.springframework.cloud:spring-cloud-starter-gateway'
  implementation 'org.springframework.boot:spring-boot-devtools'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
}
dependencyManagement {
  imports {
    mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
  }
}

gatewayのアプリケーション

gatewayのアプリケーションを作る。以下は2つのrouteを定義している。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class DemoApplication {
  @Bean
  public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
    return builder.routes()
        .route("path_google", r -> r.path("/search")
            .uri("https://www.google.co.jp"))
        .route("host_yahoo", r -> r.host("localhost:8080").and().path("/hotentry/all")
            .uri("https://b.hatena.ne.jp"))
        .build();
  }
  
  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
  }
}

どちらもrouteの引数は識別子なので、適当な一意の値を振る。

  1. 1つ目はhttp://localhost:8080/searchにアクセスするとhttps://www.google.co.jp/searchに飛ぶ。http://localhost:8080/search?q=hogeとかをブラウザでアクセスするとそれっぽい検索結果が表示される*1。なお、本来の使い方はAPIのプロキシ的な使い方なので、https://www.google.co.jpに飛ばすのは意味のある例ではないが、適当なAPI用意するの面倒なのでこのようにしている。動作確認できれば良いや、という判断*2
  2. 2つ目はホストがlocalhost:8080かつリクエストパスが/hotentry/allであればhttps://b.hatena.ne.jpに飛ばす。http://localhost:8080/hotentry/allにブラウザでアクセスするとそれっぽいホットエントリが表示される。

上記は設定ファイル(application.yamlとか)で書くこともできる。が、このエントリではそちらはやらない。

predicate

before/after/between

あるZonedDateTimeの、以前・以降・期間、にマッチしたときだけルーティングする。

      .route("path_google", r -> r.before(ZonedDateTime.of(2020, 5, 10, 0, 0, 0, 0, ZoneId.systemDefault()))
          .uri("https://www.google.co.jp"))
      .route("path_google", r -> r.after(ZonedDateTime.of(2020, 5, 10, 0, 0, 0, 0, ZoneId.systemDefault()))
          .uri("https://www.google.co.jp"))
      .route("path_google", r -> r.between(
          ZonedDateTime.of(2020, 5, 1, 0, 0, 0, 0, ZoneId.systemDefault()),
          ZonedDateTime.of(2020, 5, 10, 0, 0, 0, 0, ZoneId.systemDefault()))
          .uri("https://www.google.co.jp"))

cookie

cookieが指定の値を持ってるかどうか。正規表現使用可能なので下はvalueとかにマッチする。

      .route("path_google", r -> r.cookie("hogehoge", "va..e")
          .uri("https://www.google.co.jp"))

header

headerがあるか or headerが指定の値を持ってるかどうか、をチェックする。

      .route("path_google", r -> r.header("X-Hoge", "va..e")
          .uri("https://www.google.co.jp"))

host

hostが指定の値かどうか。正規表現使えるので、ドメイン名で何らかの分岐するときに使う感じか。

method

GETとかPOSTとかをチェックする。

      .route("path_google", r -> r.method(HttpMethod.GET)
          .uri("https://www.google.co.jp"))

path

上で触れたので省略。

query

上で触れたので省略。

remoteAddress

CIDRで条件をかける。

      .route("path_google", r -> r.remoteAddr("192.168.1.1/24")
          .uri("https://www.google.co.jp"))

weight

重み付け。たとえば、以下は8:2の重み付けでgoogletwitterにアクセスが飛ぶ。

        .route("path_route_high", r -> r.weight("group1", 8)
            .uri("https://www.google.com"))
        .route("path_route_low", r -> r.weight("group1", 2)
            .uri("https://twitter.com"))

filter

gatewayにリクエストが来たときと、ルーティング先から戻ってきたときに、何らかの処理を挟みたい場合にfilterを使う。

AddRequestHeader

gatewayで何らかのリクエストヘッダーを追加する。

        .route("path_google", r -> r.path("/{hoge}")
            .filters(f -> f.rewritePath("/.*", "/tools/request_headers.html").addRequestHeader("X-Hoge", "{hoge}"))
            .uri("https://uchy.me/"))

動作確認にはHTTPリクエストヘッダー表示ツール https://uchy.me/tools/request_headers.html を使わせて頂いた。http://localhost:8080/sample-valueとかするとX-Hoge sample-valueヘッダーが送信される。上記のように、pathのURLの変数を{hoge}にしてaddRequestHeaderでその値を使うことができる。

AddRequestParameter

gatewayで何らかのリクエストパラメータを追加する。

        .route("path_google", r -> r.path("/{hoge}")
            .filters(f -> f.rewritePath("/.*", "/search").addRequestParameter("q", "{hoge}"))
            .uri("https://www.google.com/"))

上はhttp://localhost:8080/search-wordにアクセスするとhttps://www.google.com/search?q=search-wordに行く。

AddResponseHeader

gatewayのクライアントに返すレスポンスにヘッダーを追加する。

        .route("path_google", r -> r.path("/search")
            .filters(f -> f.addResponseHeader("X-Hoge", "hoge"))
            .uri("https://www.google.com/"))

DedupeResponseHeader

試してない。

Hystrix GatewayFilter

リファレンスにこんなことが書いてある。なのでSpring Cloud CircuitBreaker GatewayFilterを使うのが良いと思われる。

Netflix has put Hystrix in maintenance mode. We suggest you use the Spring Cloud CircuitBreaker Gateway Filter with Resilience4J, as support for Hystrix will be removed in a future release.

Spring Cloud CircuitBreaker GatewayFilter

CircuitBreakerの詳細な説明は省略(俺自身ちゃんと理解してないので)して、とりあえず使ってみる。

まずCircuitBreakerの実体としてresilience4jを依存性に追加する。

  implementation 'org.springframework.cloud:spring-cloud-starter-circuitbreaker-reactor-resilience4j'

とりあえず使ってみる。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@SpringBootApplication
public class DemoApplication {
  
  @GetMapping("/forward")
  public String forward() {
    return "forward";
  }

  @Bean
  public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
    return builder.routes()
        .route("path_google", r -> r.path("/")
            .filters(f -> f.circuitBreaker(
                c -> c.setName("myCircuitBreaker")
                .setFallbackUri("forward:/forward")))
            .uri("http://asdasdasdf:8080/"))
        .build();
  }
  
  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
  }
}

上のコードは、http://localhost:8080/にアクセスすると存在しないhttp://asdasdasdf:8080/に飛ぼうとするので、fallback-uriforward:/forwardに飛ぶ。

FallbackHeaders

CircuitBreakerと組み合わせて使うもののようだけど、よくわからない……

MapRequestHeader

あるリクエストヘッダーを別の名前としても送信する。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.rewritePath("/.*", "/tools/request_headers.html").mapRequestHeader("X-FROM", "X-TO"))
            .uri("https://uchy.me/"))

上記のようにすると、リクエストヘッダーX-FROMにプラスでX-TOも送信される。

PrefixPath

ルーティング前にリクエストパスにプレフィクスを追加する。

        .route("path_google", r -> r.path("/{date}")
            .filters(f -> f.prefixPath("/hotentry/all/"))
            .uri("https://b.hatena.ne.jp/"))

たとえばhttp://localhost:8080/20200504とかするとhttps://b.hatena.ne.jp/hotentry/all/20200504にアクセスが行く。

PreserveHostHeader

試してない。

RequestRateLimiter

なんかむつかしそうなので試してない。

RedirectTo

リダイレクトする。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.redirect(302, "https://b.hatena.ne.jp/hotentry/all"))
            .uri("forward:/forward"))

RemoveRequestHeader

任意のリクエストヘッダーを削除する。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.removeRequestHeader("X-Hoge"))
            .uri("forward:/forward"))

RemoveResponseHeader

なんかエラーになる。追加で設定が必要ぽい?

java.lang.UnsupportedOperationException: null
    at org.springframework.http.ReadOnlyHttpHeaders.remove(ReadOnlyHttpHeaders.java:131) ~[spring-web-5.2.5.RELEASE.jar:5.2.5.RELEASE]
 Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 

RemoveRequestParameter

リクエストパラメータを削除する。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.removeRequestParameter("hoge"))
            .uri("forward:/forward"))

たとえばhttp://localhost:8080/?hoge=hogeとかするとhogeパラメータが削除される。

RewritePath

        .route("path_google", r -> r.path("/**")
            .filters(f -> f.rewritePath("path-is-(?<segment>.*)", "${segment}"))
            .uri("https://www.google.com"))

http://localhost:8080/path-is-searchとかするとhttps://www.google.com/searchにアクセスがいく。

Spring Cloud Gatewayとは直接関係ないが(?<name>X)という記法は https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/regex/Pattern.html にある通り「名前付きの前方参照を行う正規表現グループ」というもの。

RewriteLocationResponseHeader

試してない。

RewriteResponseHeader

なんかうまく動いてくれない。

        .route("path_google", r -> r.path("/**")
            .filters(f -> f.rewriteResponseHeader("X-Hoge", ".*", "foo"))
            .uri("http://localhost:8080/"))

SaveSession

Spring SessionとかSpring Securityとかが云々って書いてあるので試してない。

SecureHeaders

試してない。

SetPath

        .route("path_google", r -> r.path("/{date}")
            .filters(f -> f.setPath("/hotentry/all/{date}"))
            .uri("https://b.hatena.ne.jp/"))

http://localhost:8080/20200501とかするとhttps://b.hatena.ne.jp/hotentry/all/20200501にアクセスがいく。

SetRequestHeader

addでなくてreplaceする、とリファレンスに書いてあるけどイマイチ違いがわからん。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.setRequestHeader("X-Hoge", "hogeValue"))
            .uri("forward:/forward"))

SetResponseHeader

なんか実行時例外になる。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.setResponseHeader("X-Hoge-Hoge", "hogeValue"))
            .uri("forward:/forward"))

SetStatus

gatewayクライアントに返すときにステータスを変更する。

        .route("path_google", r -> r.path("/")
            .filters(f -> f.setStatus(999))
            .uri("https://b.hatena.ne.jp/"))

上にするとステータスコード999が返ってくる。

StripPrefix

リクエストのパスを前から指定数分削除する。

        .route("path_google", r -> r.path("/**")
            .filters(f -> f.stripPrefix(2))
            .uri("https://b.hatena.ne.jp/"))

http://localhost:8080/strip1/strip2/hotentry/allとするとhttps://b.hatena.ne.jp/hotentry/allに行く。

Retry

リトライ。

        .route("path_google", r -> r.path("/**")
            .filters(f -> f.retry(3))
            .uri("http://localhost:8081/"))

動作確認用に、以下のような適当な5xxを返すエンドポイントを用意して(ポートは8081)、

@RestController
@SpringBootApplication
public class Main {
  
  @ResponseStatus(value = HttpStatus.BAD_GATEWAY)
  @GetMapping("/return-5xx")
  public String return5xx(@RequestHeader Map<String, String> map) {
    System.out.println("5xx");
    return "";
  }

  public static void main(String[] args) {
    SpringApplication.run(Main.class, args);
  }

}

http://localhost:8080/return-5xxとすると3回リトライする。

GatewayFilterSpec.retry(int retries)javadocによると、デフォルトでは5xxかつGETのときリトライ、と書いてある。細かく挙動を指定したい場合は別のメソッドを使う。他に指定可能な条件としては、例外とかbackoffとかがある。

RequestSize

リクエストのサイズに制限をかける。

        .route("path_google", r -> r.path("/**").and().method(HttpMethod.POST)
            .filters(f -> f.setRequestSize(DataSize.ofBytes(1)))
            .uri("https://b.hatena.ne.jp/"))

上のようにするとmax1byte制限になるので、適当なサイズのPOSTをすると413Request Entity Too Largeが返される。

ModifyRequestBody

リクエストボディを書き換える。

        .route("path_google", r -> r.path("/**").and().method(HttpMethod.POST)
            .filters(f -> f.modifyRequestBody(String.class, String.class, (exchange, s) -> Mono.just(s.toUpperCase())))
            .uri("http://httpbin.org/"))

上はPOSTの中身を大文字に書き換えている。

ModifyResponseBody

レスポンスボディを書き換える。

        .route("path_google", r -> r.path("/**").and().method(HttpMethod.POST)
            .filters(f -> f.modifyResponseBody(String.class, String.class, (exchange, s) -> Mono.just(s.toUpperCase())))
            .uri("http://httpbin.org/"))

上はgateway先から返ってきたレスポンスボディを大文字に書き換えている。

*1:gatewaygoogleにアクセスした結果のhtmlを返すのでブラウザで表示すると色々表示がおかしくなる

*2:http://httpbin.org の存在はこのエントリの後半に気づいた……

Spring Batch + MySQL + JTA(Atomikos)のUnable to commit new sequence value changes for BATCH_JOB_SEQ

現象

タイトル通りSpring Batch + MySQL + JTA(Atomikos)環境下で以下のような例外がスローされる。

Caused by: org.springframework.dao.DataAccessResourceFailureException: Unable to commit new sequence value changes for BATCH_JOB_SEQ
    at org.springframework.jdbc.support.incrementer.MySQLMaxValueIncrementer.getNextKey(MySQLMaxValueIncrementer.java:178) ~[spring-jdbc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.jdbc.support.incrementer.AbstractDataFieldMaxValueIncrementer.nextLongValue(AbstractDataFieldMaxValueIncrementer.java:128) ~[spring-jdbc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.batch.core.repository.dao.JdbcJobInstanceDao.createJobInstance(JdbcJobInstanceDao.java:113) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:140) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]

該当箇所をデバッガで停止してみると、内部的には以下のような例外をスローしている。

com.atomikos.jdbc.AtomikosSQLException: Cannot call method 'commit' while a global transaction is running

検証用ソースコード

package springbatchmysql;

import java.util.List;

import javax.sql.DataSource;
import javax.transaction.SystemException;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.cj.jdbc.MysqlXADataSource;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
  @Bean
  public DataSource ds() {
    MysqlXADataSource d = new MysqlXADataSource();
    d.setUrl("jdbc:mysql://localhost/mysql");
    d.setUser("root");
    d.setPassword("mysql");
    
//    PGXADataSource d = new PGXADataSource();
//    d.setUrl("jdbc:postgresql://localhost/postgres");
//    d.setUser("postgres");
//    d.setPassword("mysecretpassword");
    
    AtomikosDataSourceBean b = new AtomikosDataSourceBean();
    b.setXaDataSource(d);
    b.setMaxPoolSize(10);
    b.setUniqueResourceName("resourceName");
    
    return b;
  }
  
  @Bean
  public UserTransactionManager userTransactionManager() throws SystemException {
    UserTransactionManager ut = new UserTransactionManager();
    return ut;
  }

  @Bean("myTransactionManager")
  public PlatformTransactionManager t(UserTransactionManager userTransactionManager) {
    JtaTransactionManager tm = new JtaTransactionManager(userTransactionManager, userTransactionManager);
    tm.setAllowCustomIsolationLevels(true);
    return tm;
  }

  @Autowired
  JobBuilderFactory jobBuilderFactory;

  @Autowired
  StepBuilderFactory stepBuilderFactory;

  @Bean
  public Job importUserJob(Step step1) {
    return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).start(step1)
        .build();
  }

  @Autowired
  JdbcTemplate jdbc;

  @Bean
  public Step step1() {
    ListItemReader<Integer> r = new ListItemReader<Integer>(List.of(1, 2, 3, 4, 5));
    
    return stepBuilderFactory.get("step1").chunk(1).reader(r).writer(items -> {
      items.forEach(i -> jdbc.update("insert into  aaa(user_id) values (1)"));
      
    }).build();
  }

  @Bean
  public BatchConfigurer batchConfigurer(@Qualifier("myTransactionManager") PlatformTransactionManager tm) {
    return new DefaultBatchConfigurer() {
      @Override
      public PlatformTransactionManager getTransactionManager() {
        return tm;
      }
    };
  }
}

原因

先に断り書きだが、俺のJTAとAtomikosに対する理解が浅いので推測が混じっている点は容赦願いたい。

まず、例外が起きているMySQLMaxValueIncrementerについて。これはクラス名の通りMySQL用のincrementerで、その挙動はインクリメントする際に別のコネクションを取得しようとする。

で、エラー内容がCannot call method 'commit' while a global transaction is runningを踏まえると、おそらくJTAのグルーバルトランザクションが既にある状態では別コネクションを取得できない、のだと思われる。

なので、JTAの仕様的には妥当な挙動に思える。

ただし、先に「インクリメントする際に別のコネクションを取得」と書いたが、これはMySQLのみの挙動。実際、データソースをPostgreSQLに変えたところ、例外は発生しない。

MySQLMaxValueIncrementerでコネクション取得する箇所は以下になっている。useNewConnectiontrueが指定される。

               if (this.useNewConnection) {
                    con = getDataSource().getConnection();
                    if (con.getAutoCommit()) {
                        mustRestoreAutoCommit = true;
                        con.setAutoCommit(false);
                    }
                }

一方でPostgeSQLの場合こうなっている。DataSourceUtils.getConnectionjavadoc読む限りではJTA下では新規コネクションを生成はしない(たぶん)。

Connection con = DataSourceUtils.getConnection(getDataSource());

で、このケースでMySQLだけ特別扱いな理由は、たぶん以下のソースコード内のコメントと思われる。以下はMySQLMaxValueIncrementerの抜粋。

           /*
           * If useNewConnection is true, then we obtain a non-managed connection so our modifications
           * are handled in a separate transaction. If it is false, then we use the current transaction's
           * connection relying on the use of a non-transactional storage engine like MYISAM for the
           * incrementer table. We also use straight JDBC code because we need to make sure that the insert
           * and select are performed on the same connection (otherwise we can't be sure that last_insert_id()
           * returned the correct value).
           */