kagamihogeの日記

kagamihogeの日記です。

spring-batchのRemote Chunkingをためす

https://docs.spring.io/spring-batch/docs/4.1.x/reference/html/spring-batch-integration.html#remote-chunking のRemote Chunkingをためす。

このサンプルは、Spring Integegration -> Active MQを介してMasterからworkerおprocessor + writerを呼び出す。

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    implementation 'org.springframework.boot:spring-boot-starter-integration'
    implementation 'org.springframework.integration:spring-integration-jms'
    implementation 'org.springframework.batch:spring-batch-integration'
    implementation 'org.springframework.boot:spring-boot-starter-artemis'
}

以下はMaster側。

package springbatchsample.remote.master;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import javax.jms.JMSException;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;

@EnableBatchProcessing
@SpringBootApplication
public class Application {    
    @Bean
    ActiveMQConnectionFactory connectionFactory() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL("tcp://localhost:61616");
        return factory;
    }
    
    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(requests())
                .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
                .get();
    }
    
    @Bean
    public QueueChannel replies() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
                .channel(replies())
                .get();
    }

    @Bean
    public ItemWriter<String> itemWriter() {
        MessagingTemplate messagingTemplate = new MessagingTemplate();
        messagingTemplate.setDefaultChannel(requests());
        messagingTemplate.setReceiveTimeout(2000);
        ChunkMessageChannelItemWriter<String> chunkMessageChannelItemWriter
                = new ChunkMessageChannelItemWriter<>();
        chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
        chunkMessageChannelItemWriter.setReplyChannel(replies());

        return chunkMessageChannelItemWriter;
    }
    
    @Bean
    public Job chunkJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        List<String> list = IntStream.range(0, 1001).mapToObj(Integer::toString).collect(Collectors.toList());
        ListItemReader<String> itemReader = new ListItemReader<String>(list);
        
        return jobBuilderFactory.get("personJob")
                 .start(stepBuilderFactory.get("step1")
                         .<String, String>chunk(200)
                         .reader(itemReader)
                         .writer(itemWriter())
                         .build())
                 .build();
    }
    
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }
}

次がworker側。

package springbatchsample.remote.worker;

import javax.jms.JMSException;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.step.item.ChunkProcessor;
import org.springframework.batch.core.step.item.SimpleChunkProcessor;
import org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;

@EnableBatchProcessing
@SpringBootApplication
public class Application {
    @Bean
    ActiveMQConnectionFactory connectionFactory() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL("tcp://localhost:61616");
        return factory;
    }
    
    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
                .channel(requests())
                .get();
    }
    
    @Bean
    public DirectChannel replies() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(replies())
                .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
                .get();
    }
    
    @Bean
    @ServiceActivator(inputChannel = "requests", outputChannel = "replies")
    public ChunkProcessorChunkHandler<String> chunkProcessorChunkHandler() {
        ChunkProcessor<String> chunkProcessor
                = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
        ChunkProcessorChunkHandler<String> chunkProcessorChunkHandler
                = new ChunkProcessorChunkHandler<>();
        chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
        return chunkProcessorChunkHandler;
    }
    
    ItemProcessor<String, String> itemProcessor() {
        return item -> {
            System.out.println("## processor:" + item);
            return item;
            };
    }
    
    ItemWriter<String> itemWriter() {
        return items -> {
            for (String string : items) {
                System.out.print(string + ",");
            }
            System.out.println("");
        };
    }
    
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }

}

実行前にはActive MQを起動しておく

master側は1-1000をrederで読み込みchunk(200)ごとにリモート呼び出しをする。workerは特に何もしておらず、200件ごとにwriterが一度呼ばれる。