https://docs.spring.io/spring-batch/docs/4.1.x/reference/html/spring-batch-integration.html#remote-chunking のRemote Chunkingをためす。
このサンプルは、Spring Integegration -> Active MQを介してMasterからworkerおprocessor + writerを呼び出す。
plugins { id 'org.springframework.boot' version '2.2.2.RELEASE' } dependencies { implementation 'org.springframework.boot:spring-boot-starter-batch' implementation 'org.springframework.boot:spring-boot-starter-integration' implementation 'org.springframework.integration:spring-integration-jms' implementation 'org.springframework.batch:spring-batch-integration' implementation 'org.springframework.boot:spring-boot-starter-artemis' }
以下はMaster側。
package springbatchsample.remote.master; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; import javax.jms.JMSException; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.ListItemReader; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.Bean; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.core.MessagingTemplate; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.jms.dsl.Jms; @EnableBatchProcessing @SpringBootApplication public class Application { @Bean ActiveMQConnectionFactory connectionFactory() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); factory.setBrokerURL("tcp://localhost:61616"); return factory; } @Bean public DirectChannel requests() { return new DirectChannel(); } @Bean public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlows .from(requests()) .handle(Jms.outboundAdapter(connectionFactory).destination("requests")) .get(); } @Bean public QueueChannel replies() { return new QueueChannel(); } @Bean public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlows .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies")) .channel(replies()) .get(); } @Bean public ItemWriter<String> itemWriter() { MessagingTemplate messagingTemplate = new MessagingTemplate(); messagingTemplate.setDefaultChannel(requests()); messagingTemplate.setReceiveTimeout(2000); ChunkMessageChannelItemWriter<String> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>(); chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate); chunkMessageChannelItemWriter.setReplyChannel(replies()); return chunkMessageChannelItemWriter; } @Bean public Job chunkJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) { List<String> list = IntStream.range(0, 1001).mapToObj(Integer::toString).collect(Collectors.toList()); ListItemReader<String> itemReader = new ListItemReader<String>(list); return jobBuilderFactory.get("personJob") .start(stepBuilderFactory.get("step1") .<String, String>chunk(200) .reader(itemReader) .writer(itemWriter()) .build()) .build(); } public static void main(String[] args) { new SpringApplicationBuilder(Application.class).run(args); } }
次がworker側。
package springbatchsample.remote.worker; import javax.jms.JMSException; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.step.item.ChunkProcessor; import org.springframework.batch.core.step.item.SimpleChunkProcessor; import org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.jms.dsl.Jms; @EnableBatchProcessing @SpringBootApplication public class Application { @Bean ActiveMQConnectionFactory connectionFactory() throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); factory.setBrokerURL("tcp://localhost:61616"); return factory; } @Bean public DirectChannel requests() { return new DirectChannel(); } @Bean public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlows .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests")) .channel(requests()) .get(); } @Bean public DirectChannel replies() { return new DirectChannel(); } @Bean public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlows .from(replies()) .handle(Jms.outboundAdapter(connectionFactory).destination("replies")) .get(); } @Bean @ServiceActivator(inputChannel = "requests", outputChannel = "replies") public ChunkProcessorChunkHandler<String> chunkProcessorChunkHandler() { ChunkProcessor<String> chunkProcessor = new SimpleChunkProcessor<>(itemProcessor(), itemWriter()); ChunkProcessorChunkHandler<String> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>(); chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor); return chunkProcessorChunkHandler; } ItemProcessor<String, String> itemProcessor() { return item -> { System.out.println("## processor:" + item); return item; }; } ItemWriter<String> itemWriter() { return items -> { for (String string : items) { System.out.print(string + ","); } System.out.println(""); }; } public static void main(String[] args) { new SpringApplicationBuilder(Application.class).run(args); } }
実行前にはActive MQを起動しておく
master側は1-1000をrederで読み込みchunk(200)
ごとにリモート呼び出しをする。workerは特に何もしておらず、200件ごとにwriterが一度呼ばれる。