kagamihogeの日記

kagamihogeの日記です。

Spring Batch + MySQL + JTA(Atomikos)のUnable to commit new sequence value changes for BATCH_JOB_SEQ

現象

タイトル通りSpring Batch + MySQL + JTA(Atomikos)環境下で以下のような例外がスローされる。

Caused by: org.springframework.dao.DataAccessResourceFailureException: Unable to commit new sequence value changes for BATCH_JOB_SEQ
    at org.springframework.jdbc.support.incrementer.MySQLMaxValueIncrementer.getNextKey(MySQLMaxValueIncrementer.java:178) ~[spring-jdbc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.jdbc.support.incrementer.AbstractDataFieldMaxValueIncrementer.nextLongValue(AbstractDataFieldMaxValueIncrementer.java:128) ~[spring-jdbc-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.batch.core.repository.dao.JdbcJobInstanceDao.createJobInstance(JdbcJobInstanceDao.java:113) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:140) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]

該当箇所をデバッガで停止してみると、内部的には以下のような例外をスローしている。

com.atomikos.jdbc.AtomikosSQLException: Cannot call method 'commit' while a global transaction is running

検証用ソースコード

package springbatchmysql;

import java.util.List;

import javax.sql.DataSource;
import javax.transaction.SystemException;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.cj.jdbc.MysqlXADataSource;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
  @Bean
  public DataSource ds() {
    MysqlXADataSource d = new MysqlXADataSource();
    d.setUrl("jdbc:mysql://localhost/mysql");
    d.setUser("root");
    d.setPassword("mysql");
    
//    PGXADataSource d = new PGXADataSource();
//    d.setUrl("jdbc:postgresql://localhost/postgres");
//    d.setUser("postgres");
//    d.setPassword("mysecretpassword");
    
    AtomikosDataSourceBean b = new AtomikosDataSourceBean();
    b.setXaDataSource(d);
    b.setMaxPoolSize(10);
    b.setUniqueResourceName("resourceName");
    
    return b;
  }
  
  @Bean
  public UserTransactionManager userTransactionManager() throws SystemException {
    UserTransactionManager ut = new UserTransactionManager();
    return ut;
  }

  @Bean("myTransactionManager")
  public PlatformTransactionManager t(UserTransactionManager userTransactionManager) {
    JtaTransactionManager tm = new JtaTransactionManager(userTransactionManager, userTransactionManager);
    tm.setAllowCustomIsolationLevels(true);
    return tm;
  }

  @Autowired
  JobBuilderFactory jobBuilderFactory;

  @Autowired
  StepBuilderFactory stepBuilderFactory;

  @Bean
  public Job importUserJob(Step step1) {
    return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).start(step1)
        .build();
  }

  @Autowired
  JdbcTemplate jdbc;

  @Bean
  public Step step1() {
    ListItemReader<Integer> r = new ListItemReader<Integer>(List.of(1, 2, 3, 4, 5));
    
    return stepBuilderFactory.get("step1").chunk(1).reader(r).writer(items -> {
      items.forEach(i -> jdbc.update("insert into  aaa(user_id) values (1)"));
      
    }).build();
  }

  @Bean
  public BatchConfigurer batchConfigurer(@Qualifier("myTransactionManager") PlatformTransactionManager tm) {
    return new DefaultBatchConfigurer() {
      @Override
      public PlatformTransactionManager getTransactionManager() {
        return tm;
      }
    };
  }
}

原因

先に断り書きだが、俺のJTAとAtomikosに対する理解が浅いので推測が混じっている点は容赦願いたい。

まず、例外が起きているMySQLMaxValueIncrementerについて。これはクラス名の通りMySQL用のincrementerで、その挙動はインクリメントする際に別のコネクションを取得しようとする。

で、エラー内容がCannot call method 'commit' while a global transaction is runningを踏まえると、おそらくJTAのグルーバルトランザクションが既にある状態では別コネクションを取得できない、のだと思われる。

なので、JTAの仕様的には妥当な挙動に思える。

ただし、先に「インクリメントする際に別のコネクションを取得」と書いたが、これはMySQLのみの挙動。実際、データソースをPostgreSQLに変えたところ、例外は発生しない。

MySQLMaxValueIncrementerでコネクション取得する箇所は以下になっている。useNewConnectiontrueが指定される。

               if (this.useNewConnection) {
                    con = getDataSource().getConnection();
                    if (con.getAutoCommit()) {
                        mustRestoreAutoCommit = true;
                        con.setAutoCommit(false);
                    }
                }

一方でPostgeSQLの場合こうなっている。DataSourceUtils.getConnectionjavadoc読む限りではJTA下では新規コネクションを生成はしない(たぶん)。

Connection con = DataSourceUtils.getConnection(getDataSource());

で、このケースでMySQLだけ特別扱いな理由は、たぶん以下のソースコード内のコメントと思われる。以下はMySQLMaxValueIncrementerの抜粋。

           /*
           * If useNewConnection is true, then we obtain a non-managed connection so our modifications
           * are handled in a separate transaction. If it is false, then we use the current transaction's
           * connection relying on the use of a non-transactional storage engine like MYISAM for the
           * incrementer table. We also use straight JDBC code because we need to make sure that the insert
           * and select are performed on the same connection (otherwise we can't be sure that last_insert_id()
           * returned the correct value).
           */