現象
タイトル通りSpring Batch + MySQL + JTA(Atomikos)環境下で以下のような例外がスローされる。
Caused by: org.springframework.dao.DataAccessResourceFailureException: Unable to commit new sequence value changes for BATCH_JOB_SEQ at org.springframework.jdbc.support.incrementer.MySQLMaxValueIncrementer.getNextKey(MySQLMaxValueIncrementer.java:178) ~[spring-jdbc-5.2.5.RELEASE.jar:5.2.5.RELEASE] at org.springframework.jdbc.support.incrementer.AbstractDataFieldMaxValueIncrementer.nextLongValue(AbstractDataFieldMaxValueIncrementer.java:128) ~[spring-jdbc-5.2.5.RELEASE.jar:5.2.5.RELEASE] at org.springframework.batch.core.repository.dao.JdbcJobInstanceDao.createJobInstance(JdbcJobInstanceDao.java:113) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE] at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:140) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
該当箇所をデバッガで停止してみると、内部的には以下のような例外をスローしている。
com.atomikos.jdbc.AtomikosSQLException: Cannot call method 'commit' while a global transaction is running
検証用ソースコード
package springbatchmysql; import java.util.List; import javax.sql.DataSource; import javax.transaction.SystemException; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.BatchConfigurer; import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.support.ListItemReader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.jta.JtaTransactionManager; import com.atomikos.icatch.jta.UserTransactionManager; import com.atomikos.jdbc.AtomikosDataSourceBean; import com.mysql.cj.jdbc.MysqlXADataSource; @Configuration @EnableBatchProcessing public class BatchConfiguration { @Bean public DataSource ds() { MysqlXADataSource d = new MysqlXADataSource(); d.setUrl("jdbc:mysql://localhost/mysql"); d.setUser("root"); d.setPassword("mysql"); // PGXADataSource d = new PGXADataSource(); // d.setUrl("jdbc:postgresql://localhost/postgres"); // d.setUser("postgres"); // d.setPassword("mysecretpassword"); AtomikosDataSourceBean b = new AtomikosDataSourceBean(); b.setXaDataSource(d); b.setMaxPoolSize(10); b.setUniqueResourceName("resourceName"); return b; } @Bean public UserTransactionManager userTransactionManager() throws SystemException { UserTransactionManager ut = new UserTransactionManager(); return ut; } @Bean("myTransactionManager") public PlatformTransactionManager t(UserTransactionManager userTransactionManager) { JtaTransactionManager tm = new JtaTransactionManager(userTransactionManager, userTransactionManager); tm.setAllowCustomIsolationLevels(true); return tm; } @Autowired JobBuilderFactory jobBuilderFactory; @Autowired StepBuilderFactory stepBuilderFactory; @Bean public Job importUserJob(Step step1) { return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).start(step1) .build(); } @Autowired JdbcTemplate jdbc; @Bean public Step step1() { ListItemReader<Integer> r = new ListItemReader<Integer>(List.of(1, 2, 3, 4, 5)); return stepBuilderFactory.get("step1").chunk(1).reader(r).writer(items -> { items.forEach(i -> jdbc.update("insert into aaa(user_id) values (1)")); }).build(); } @Bean public BatchConfigurer batchConfigurer(@Qualifier("myTransactionManager") PlatformTransactionManager tm) { return new DefaultBatchConfigurer() { @Override public PlatformTransactionManager getTransactionManager() { return tm; } }; } }
原因
先に断り書きだが、俺のJTAとAtomikosに対する理解が浅いので推測が混じっている点は容赦願いたい。
まず、例外が起きているMySQLMaxValueIncrementer
について。これはクラス名の通りMySQL用のincrementerで、その挙動はインクリメントする際に別のコネクションを取得しようとする。
で、エラー内容がCannot call method 'commit' while a global transaction is running
を踏まえると、おそらくJTAのグルーバルトランザクションが既にある状態では別コネクションを取得できない、のだと思われる。
なので、JTAの仕様的には妥当な挙動に思える。
ただし、先に「インクリメントする際に別のコネクションを取得」と書いたが、これはMySQLのみの挙動。実際、データソースをPostgreSQLに変えたところ、例外は発生しない。
MySQLMaxValueIncrementer
でコネクション取得する箇所は以下になっている。useNewConnection
はtrue
が指定される。
if (this.useNewConnection) { con = getDataSource().getConnection(); if (con.getAutoCommit()) { mustRestoreAutoCommit = true; con.setAutoCommit(false); } }
一方でPostgeSQLの場合こうなっている。DataSourceUtils.getConnection
のjavadoc読む限りではJTA下では新規コネクションを生成はしない(たぶん)。
Connection con = DataSourceUtils.getConnection(getDataSource());
で、このケースでMySQLだけ特別扱いな理由は、たぶん以下のソースコード内のコメントと思われる。以下はMySQLMaxValueIncrementer
の抜粋。
/* * If useNewConnection is true, then we obtain a non-managed connection so our modifications * are handled in a separate transaction. If it is false, then we use the current transaction's * connection relying on the use of a non-transactional storage engine like MYISAM for the * incrementer table. We also use straight JDBC code because we need to make sure that the insert * and select are performed on the same connection (otherwise we can't be sure that last_insert_id() * returned the correct value). */