kagamihogeの日記

kagamihogeの日記です。

spring-integrationでjob名渡してspring-batchを起動

spring-integration勉強中です。

spring-integrationに文字列でjob名を渡し、その先でspring-batchを起動、をつくる。https://github.com/pakmans/spring-batch-integration-example/blob/master/pom.xml を参考にした。

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kagamihoge</groupId>
    <artifactId>integratesample2batch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>integratesample2batch</name>
    <url>http://maven.apache.org</url>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath />
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
        </dependency>
    </dependencies>
</project>

/src/main/resources/application.ymlに以下を指定。spring-bootのデフォルト動作は起動時にJobのbeanをすべて自動起動するので、その機能をオフにしておく。

spring:
  batch:
    job:
      enabled: false
package kagamihoge.integratesample2batch.sample1;

import java.util.Date;
import java.util.Optional;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.JobLocator;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.support.ReferenceJobFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.batch.integration.launch.JobLaunchingMessageHandler;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;

@SpringBootApplication
@EnableBatchProcessing
public class Application implements CommandLineRunner {
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).web(WebApplicationType.NONE).run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        jobRegistry.register(new ReferenceJobFactory(sampleJob()));
        gateway.go("sampleJob");
    }

    @Autowired
    StartBatchGateway gateway;

    @MessagingGateway
    public interface StartBatchGateway {
        @Gateway(requestChannel = "start.batch.channel")
        void go(String jobName);
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from("start.batch.channel") //
                .filter(name -> job((String) name).isPresent()) //
                .transform(name -> {
                    Job job = job((String) name).get();
                    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
                    jobParametersBuilder.addDate("dummy", new Date());
                    return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
                }) //
                .handle(jobLaunchingMessageHandler()) //
                .handle(jobExecution -> {
                    System.out.println("# jobExecution #" + jobExecution.getPayload());
                }).get();
    }

    Optional<Job> job(String name) {
        try {
            return Optional.of(jobLocator.getJob(name));
        } catch (NoSuchJobException e) {
            return Optional.empty();
        }
    }

    @Bean
    JobLaunchingMessageHandler jobLaunchingMessageHandler() {
        JobLaunchingMessageHandler handler = new JobLaunchingMessageHandler(jobLauncher);
        return handler;
    }

    // ---------- 以下spring-batch関連beanとjob定義 ----------------
    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    JobRegistry jobRegistry;

    @Autowired
    JobLocator jobLocator;

    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @Bean
    Job sampleJob() {
        TaskletStep step = stepBuilderFactory.get("sampleStep")//
                .tasklet((contribution, chunkContext) -> {//
                    System.out.println("tasklet");//
                    return RepeatStatus.FINISHED;//
                }).build();

        return jobBuilderFactory.get("sampleJob") //
                .incrementer(new RunIdIncrementer()) //
                .start(step) //
                .build();
    }
}