kagamihogeの日記

kagamihogeの日記です。

spring-batchのMapJobRepositoryFactoryBeanでメタテーブルをDBに保存せず大量にstepを実行するとスローダウン

現象

ほぼタイトルで言い切っているけど。spring-batchでメタデータ保存でDBを使わなないようインメモリに格納するMapJobRepositoryFactoryBeanを使う場合が稀に良くある。この状況下で大量のstepを含むjobを実行すると徐々に実行速度が低下していく、という現象に遭遇した。

再現

大量のstepを手軽に再現するため、以下のように無限ループでstepを実行し続ける。つまりJobExecutionDeciderが常に同一値を返し、同一のstepに遷移する。こんな感じ。

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.1.RELEASE</version>
        <relativePath />
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
package kagamihoge.springbatchslowdown;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.BatchConfigurationException;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.explore.support.MapJobExplorerFactoryBean;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Bean
    public Job job(JobBuilderFactory jobs, @Qualifier("s1") Step s1, JobExecutionDecider decider) {
        return jobs
                .get("myJob")
                .incrementer(new RunIdIncrementer())
                .start(s1)
                .next(decider)
                  .on("CONTINUE").to(s1)
                 .end()
                .build();
    }

    @Bean(name = "s1")
    public Step step1(StepBuilderFactory steps) {
        return steps.get("step1").tasklet((stepContribution, chunkContext) -> {
            return RepeatStatus.FINISHED;
        }).build();
    }

    @Bean
    public JobExecutionDecider decider() {
        return new JobExecutionDecider() {
            @Override
            public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
                return new FlowExecutionStatus("CONTINUE");
            }
        };
    }

    @Bean
    DefaultBatchConfigurer batchConfigurer() {
        return new DefaultBatchConfigurer() {
            private final JobRepository jobRepository;
            private final JobExplorer jobExplorer;
            private final JobLauncher jobLauncher;

            {
                MapJobRepositoryFactoryBean jobRepositoryFactory = new MapJobRepositoryFactoryBean();
                try {
                    this.jobRepository = jobRepositoryFactory.getObject();
                    MapJobExplorerFactoryBean jobExplorerFactory = new MapJobExplorerFactoryBean(jobRepositoryFactory);
                    this.jobExplorer = jobExplorerFactory.getObject();
                    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
                    jobLauncher.setJobRepository(jobRepository);
                    jobLauncher.afterPropertiesSet();
                    this.jobLauncher = jobLauncher;
                } catch (Exception e) {
                    throw new BatchConfigurationException(e);
                }
            }

            @Override
            public JobRepository getJobRepository() {
                return jobRepository;
            }

            @Override
            public JobExplorer getJobExplorer() {
                return jobExplorer;
            }

            @Override
            public JobLauncher getJobLauncher() {
                return jobLauncher;
            }
        };
    }
}

実行時の様子

以下にjvisualvmのキャプチャを示す。

f:id:kagamihoge:20190706205250j:plain

上記はだいたい5分くらい実行したところだが、ヒープサイズがどんどん膨らんでいく。使用済みヒープも徐々に増えてはいるようだが、どうも急激に増えて減ってを繰り返している、ように見える。

f:id:kagamihoge:20190706205526j:plain

f:id:kagamihoge:20190706205539j:plain

サンプラを見るとbyte[]が多数を占め、CPUを見るとリフレクションが実行時間のうち大半を占めている。

怪しそうなのは、上位にあるStepExecutionObjectとかConcurrentHashMapとか、シリアライズ関連のObjectInputStreamとからへん。なので原因調べるときはその辺から調べていった。

原因

概要

MapJobRepositoryFactoryBeanはjob-executionの格納にMapJobExecutionDaoを使用する。これは内部的にはConcurrentHashMap<Long, JobExecution>()で保存し、また、job-executionに関連付くstep-executionのコレクションを保持している。次に、そのマップ保存時にオリジナルのJobExecutionをコピーするが、その実装がシリアライズandデシリアライズになっている。

詳細

MapJobExecutionDaoは以下のようなプロパティがあり、ここでjobExecution idJobExecutionのマップを持っている。また、JobExecutionはstep-executionを保持するstepExecutionsのコレクションがある。

public class MapJobExecutionDao implements JobExecutionDao {
    private final ConcurrentMap<Long, JobExecution> executionsById = new ConcurrentHashMap<Long, JobExecution>();
@SuppressWarnings("serial")
public class JobExecution extends Entity {
        ....
    private volatile Collection<StepExecution> stepExecutions = Collections.synchronizedSet(new LinkedHashSet<>());

step実行時は以下のMapStepExecutionDaoがstep-executionの追加処理を行う。以下のように、job-exectuion -> step-executionのコレクションに追加する。なのでstepを大量に実行すれば無限にstep-executionが追加されていく。

public class MapStepExecutionDao implements StepExecutionDao {
        ...
    @Override
    public void updateStepExecution(StepExecution stepExecution) {
Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId());
         ....
         executions.put(stepExecution.getId(), copy);

ヒープサイズが拡大する点についてはこれで一応説明がつく。

次に、スローダウンの原因について。

MapJobExecutionDaoに話は戻るが、マップ保存の際にJobExecutionインスタンスをコピーする。で、コピーは以下のようにシリアライズ -> デシリアライズで実装している。

   private static JobExecution copy(JobExecution original) {
        JobExecution copy = (JobExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original));
        return copy;
    }

前述のようにstep-executionコレクションはどんどんデカくなっていくので、当然、そのマップをシリアライズ -> デシリアライズする処理もどんどん遅くなっていく。この挙動は徐々にスローダウンする事象と一致する。クッソでかいコレクションをシリアライズ -> デシリアライズするのでゴミもクッソでかくなる。使用済みヒープが急激に増えては減ってはを繰り返すのはこの辺にある、と推測される。

対策

インメモリDBにメタデータを保存させる。

f:id:kagamihoge:20190706220204j:plain

H2でやってみたがスローダウンはしなかった。もっと長時間起動し続けたらどうなるか分からんけど、そんなにもstep大量に実行するような状況とは最早別問題であろう。

spring-integrationでjob名渡してspring-batchを起動

spring-integration勉強中です。

spring-integrationに文字列でjob名を渡し、その先でspring-batchを起動、をつくる。https://github.com/pakmans/spring-batch-integration-example/blob/master/pom.xml を参考にした。

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kagamihoge</groupId>
    <artifactId>integratesample2batch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>integratesample2batch</name>
    <url>http://maven.apache.org</url>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath />
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
        </dependency>
    </dependencies>
</project>

/src/main/resources/application.ymlに以下を指定。spring-bootのデフォルト動作は起動時にJobのbeanをすべて自動起動するので、その機能をオフにしておく。

spring:
  batch:
    job:
      enabled: false
package kagamihoge.integratesample2batch.sample1;

import java.util.Date;
import java.util.Optional;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.JobLocator;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.support.ReferenceJobFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.batch.integration.launch.JobLaunchingMessageHandler;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;

@SpringBootApplication
@EnableBatchProcessing
public class Application implements CommandLineRunner {
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).web(WebApplicationType.NONE).run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        jobRegistry.register(new ReferenceJobFactory(sampleJob()));
        gateway.go("sampleJob");
    }

    @Autowired
    StartBatchGateway gateway;

    @MessagingGateway
    public interface StartBatchGateway {
        @Gateway(requestChannel = "start.batch.channel")
        void go(String jobName);
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from("start.batch.channel") //
                .filter(name -> job((String) name).isPresent()) //
                .transform(name -> {
                    Job job = job((String) name).get();
                    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
                    jobParametersBuilder.addDate("dummy", new Date());
                    return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
                }) //
                .handle(jobLaunchingMessageHandler()) //
                .handle(jobExecution -> {
                    System.out.println("# jobExecution #" + jobExecution.getPayload());
                }).get();
    }

    Optional<Job> job(String name) {
        try {
            return Optional.of(jobLocator.getJob(name));
        } catch (NoSuchJobException e) {
            return Optional.empty();
        }
    }

    @Bean
    JobLaunchingMessageHandler jobLaunchingMessageHandler() {
        JobLaunchingMessageHandler handler = new JobLaunchingMessageHandler(jobLauncher);
        return handler;
    }

    // ---------- 以下spring-batch関連beanとjob定義 ----------------
    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    JobRegistry jobRegistry;

    @Autowired
    JobLocator jobLocator;

    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @Bean
    Job sampleJob() {
        TaskletStep step = stepBuilderFactory.get("sampleStep")//
                .tasklet((contribution, chunkContext) -> {//
                    System.out.println("tasklet");//
                    return RepeatStatus.FINISHED;//
                }).build();

        return jobBuilderFactory.get("sampleJob") //
                .incrementer(new RunIdIncrementer()) //
                .start(step) //
                .build();
    }
}

spring-integrationでメッセージ送信

spring-integration勉強中です。

非常にシンプルな、spring-integrationのチャネルに文字列メッセージ送信して表示するだけ、を作ってみる。

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kagamihoge</groupId>
    <artifactId>integratesample0</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>integratesample0</name>
    <url>http://maven.apache.org</url>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath />
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

IntegrationFlow をつかう

package kagamihoge.integratesample0;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;

@SpringBootApplication
public class Application implements CommandLineRunner {
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).web(WebApplicationType.NONE).run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        gateway.go("simple message.");
    }

    @Autowired
    SimpleMessageGateway gateway;

    @MessagingGateway
    public interface SimpleMessageGateway {
        @Gateway(requestChannel = "simple.channel")
        void go(String payload);
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows //
                .from("simple.channel") //
                .handle(m -> System.out.println("handle:" + m.getPayload())) //
                .get();
    }
}

アノテーションでなんかする

詳しいことはまだ全然分かってないけど。単純なメッセージ送信・受信だけなら、以下のようにアノテーションをつけたbean用意すれば出来る。

package kagamihoge.integratesample1simple;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class Application implements CommandLineRunner {
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).web(WebApplicationType.NONE).run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        gateway.go("sample message send.");
    }

    @Autowired
    SampleGateway gateway;

    @MessagingGateway
    interface SampleGateway {
        @Gateway(requestChannel = "sample.channel")
        void go(String payload);
    }

    static class SampleActivator {
        @ServiceActivator(inputChannel = "sample.channel")
        public void handle(String message) {
            System.out.println("handle message: " + message);
        }
    };

    @Bean
    SampleActivator receiver() {
        return new SampleActivator();
    }
}