kagamihogeの日記

kagamihogeの日記です。

spring-integrationでjob名渡してspring-batchを起動

spring-integration勉強中です。

spring-integrationに文字列でjob名を渡し、その先でspring-batchを起動、をつくる。https://github.com/pakmans/spring-batch-integration-example/blob/master/pom.xml を参考にした。

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kagamihoge</groupId>
    <artifactId>integratesample2batch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>integratesample2batch</name>
    <url>http://maven.apache.org</url>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath />
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
        </dependency>
    </dependencies>
</project>

/src/main/resources/application.ymlに以下を指定。spring-bootのデフォルト動作は起動時にJobのbeanをすべて自動起動するので、その機能をオフにしておく。

spring:
  batch:
    job:
      enabled: false
package kagamihoge.integratesample2batch.sample1;

import java.util.Date;
import java.util.Optional;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.JobLocator;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.support.ReferenceJobFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.batch.integration.launch.JobLaunchingMessageHandler;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;

@SpringBootApplication
@EnableBatchProcessing
public class Application implements CommandLineRunner {
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).web(WebApplicationType.NONE).run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        jobRegistry.register(new ReferenceJobFactory(sampleJob()));
        gateway.go("sampleJob");
    }

    @Autowired
    StartBatchGateway gateway;

    @MessagingGateway
    public interface StartBatchGateway {
        @Gateway(requestChannel = "start.batch.channel")
        void go(String jobName);
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from("start.batch.channel") //
                .filter(name -> job((String) name).isPresent()) //
                .transform(name -> {
                    Job job = job((String) name).get();
                    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
                    jobParametersBuilder.addDate("dummy", new Date());
                    return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
                }) //
                .handle(jobLaunchingMessageHandler()) //
                .handle(jobExecution -> {
                    System.out.println("# jobExecution #" + jobExecution.getPayload());
                }).get();
    }

    Optional<Job> job(String name) {
        try {
            return Optional.of(jobLocator.getJob(name));
        } catch (NoSuchJobException e) {
            return Optional.empty();
        }
    }

    @Bean
    JobLaunchingMessageHandler jobLaunchingMessageHandler() {
        JobLaunchingMessageHandler handler = new JobLaunchingMessageHandler(jobLauncher);
        return handler;
    }

    // ---------- 以下spring-batch関連beanとjob定義 ----------------
    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    JobRegistry jobRegistry;

    @Autowired
    JobLocator jobLocator;

    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @Bean
    Job sampleJob() {
        TaskletStep step = stepBuilderFactory.get("sampleStep")//
                .tasklet((contribution, chunkContext) -> {//
                    System.out.println("tasklet");//
                    return RepeatStatus.FINISHED;//
                }).build();

        return jobBuilderFactory.get("sampleJob") //
                .incrementer(new RunIdIncrementer()) //
                .start(step) //
                .build();
    }
}

spring-integrationでメッセージ送信

spring-integration勉強中です。

非常にシンプルな、spring-integrationのチャネルに文字列メッセージ送信して表示するだけ、を作ってみる。

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kagamihoge</groupId>
    <artifactId>integratesample0</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>integratesample0</name>
    <url>http://maven.apache.org</url>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath />
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

IntegrationFlow をつかう

package kagamihoge.integratesample0;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;

@SpringBootApplication
public class Application implements CommandLineRunner {
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).web(WebApplicationType.NONE).run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        gateway.go("simple message.");
    }

    @Autowired
    SimpleMessageGateway gateway;

    @MessagingGateway
    public interface SimpleMessageGateway {
        @Gateway(requestChannel = "simple.channel")
        void go(String payload);
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows //
                .from("simple.channel") //
                .handle(m -> System.out.println("handle:" + m.getPayload())) //
                .get();
    }
}

アノテーションでなんかする

詳しいことはまだ全然分かってないけど。単純なメッセージ送信・受信だけなら、以下のようにアノテーションをつけたbean用意すれば出来る。

package kagamihoge.integratesample1simple;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;

@SpringBootApplication
public class Application implements CommandLineRunner {
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).web(WebApplicationType.NONE).run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        gateway.go("sample message send.");
    }

    @Autowired
    SampleGateway gateway;

    @MessagingGateway
    interface SampleGateway {
        @Gateway(requestChannel = "sample.channel")
        void go(String payload);
    }

    static class SampleActivator {
        @ServiceActivator(inputChannel = "sample.channel")
        public void handle(String message) {
            System.out.println("handle message: " + message);
        }
    };

    @Bean
    SampleActivator receiver() {
        return new SampleActivator();
    }
}

spring-integrationでファイル作成を検出するサンプル

spring-integration勉強中です。

というわけで https://github.com/pakmans/spring-batch-integration-example の方のコードを参考に、監視先ディレクトリにファイルが作られたらメッセージハンドラが呼び出される、というだけのコードを動かした。詳しいことはまだ良く分かっていない。

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kagamihoge</groupId>
    <artifactId>integratesample1</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>integratesample1</name>
    <url>http://maven.apache.org</url>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath />
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>
package kagamihoge.integratesample1;

import java.io.File;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileReadingMessageSource.WatchEventType;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;

@SpringBootApplication
public class App {
    public static void main(String[] args) {
        new SpringApplicationBuilder(App.class).web(WebApplicationType.NONE).run(args);
    }

    @Bean
    public IntegrationFlow sampleFlow() {
        return IntegrationFlows //
                .from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(5000)))//
                .channel(new DirectChannel()) //
                .handle(msg -> {
                    File file = (File) msg.getPayload();
                    System.out.println(file);
                }) //
                .get();
    }

    @Bean
    public MessageSource<File> fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File("dropfolder"));
        source.setFilter(new SimplePatternFileListFilter("*.txt"));
        source.setUseWatchService(true);
        source.setWatchEvents(WatchEventType.CREATE);
        return source;
    }
}

上記を起動して監視先ディレクトリに適当にファイルを作る。そうすると以下のように出力される。

2019-06-26 22:04:10.636  INFO 5224 --- [           main] kagamihoge.integratesample1.App          : Started App in 1.366 seconds (JVM running for 1.679)
dropfolder\sdf - コピー (2).txt
dropfolder\sdf - コピー (3).txt
dropfolder\sdf - コピー.txt
dropfolder\sdf.txt

これでとりあえず、ファイル作成を検出してメッセージハンドラに通知が飛ぶ、というのが出来た。