kagamihogeの日記

kagamihogeの日記です。

spring-batchのPartitioningをためす

https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#partitioning のPartitioningを試す。

ソースコード

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    runtimeOnly 'com.h2database:h2'
}
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.util.NumberUtils;

@EnableBatchProcessing
@SpringBootApplication
public class Application {
    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) throws IOException {
        return jobBuilderFactory
                .get("sampleJob")
                .start(stepMaster(stepBuilderFactory))
                .build();
    }

    @Bean
    public Step stepMaster(StepBuilderFactory stepBuilderFactory) throws IOException {
        return stepBuilderFactory
                .get("step1.master")
                .partitioner("step1", partitioner())
                .step(step1(stepBuilderFactory))
                .gridSize(10)
                .taskExecutor(new SimpleAsyncTaskExecutor("spring_batch"))
                .build();
    }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory) throws IOException {
        return stepBuilderFactory
                .get("step1")
                .<Integer, Integer>chunk(2)
                .reader(itemReader(null)).writer(itemWriter())
                .build();
    }

    @Bean
    public Partitioner partitioner() throws IOException {
        MultiResourcePartitioner p = new MultiResourcePartitioner();
        PathMatchingResourcePatternResolver resolover = new PathMatchingResourcePatternResolver();
        p.setResources(resolover.getResources("classpath:data/file*.csv"));
        return p;
    }

    @StepScope
    @Bean
    public FlatFileItemReader<Integer> itemReader(@Value("#{stepExecutionContext['fileName']}") Resource resource)
            throws IOException {
        FlatFileItemReader<Integer> reader = new FlatFileItemReader<Integer>();
        DefaultLineMapper<Integer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(new DelimitedLineTokenizer(","));
        lineMapper.setFieldSetMapper(fieldSet -> NumberUtils.parseNumber(fieldSet.getValues()[0],Integer.class));

        reader.setLineMapper(lineMapper);
        reader.setResource(resource);
        return reader;
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return (items) -> items.forEach(System.out::println);
    }

    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }
}

クラスパス下にdata/file*.csvを作成する。具体的にはdata/file1.csv, data/file2.csv, data/file3.csvなど。中身は一列数値だけにしておく。例えば、以下のような感じ。

1
1
1

メモ

Partitioner partitioner()がキモ。stepをどういうアルゴリズムパーティション化するか、をここで定義する。Partitionerインタフェースで実装する。たとえば、適当な範囲で区切られた値・主キー範囲・ユニークなファイル名リスト、など。

spring-batchにはそのインタフェース実装としてMultiResourcePartitionerがある。このクラスは、ファイルのリストを与えると各ファイルごとにパーティションを一つ割り当てる、という実装。また、各パーティションが処理すべきファイル名はstepExecutionContextfileNameというキー名(変更可)で保存する。なので、後続処理はそのファイル名で処理を行うことになる。

reader定義であるitemReaderの引数は@Value("#{stepExecutionContext['fileName']}"となっており、上述のファイル名を取得している。@Value...の詳細は、spring-batchのlate bindingsという機能で、詳細はこちら。SpEL式でstepExecutionContext['fileName']と記述することで、spring-batchのstepのExecutionContextにアクセスしてキー名fileNameの値を取得している。なお、@StepScopeでスコープを変更する必要がある。以上により、パーティション化されたそれぞれのステップにそれぞれが処理すべきファイル名が渡される。

これを実行すると、ただ単にファイルから読みだした値を出力するだけだが、以下のようになる。なお、各ファイルの中身はすべて同じ数値のものを3つ用意した状態で実行している。すべて1・すべて2・すべて3、の3ファイルで実行している。

Job: [SimpleJob: [name=sampleJob]] launched with the following parameters: [{}]
Executing step: [step1.master]
3
3
2
2
1
1
1
1
3
3
2
2
1
2
Step: [step1:partition0] executed in 91ms
3
Step: [step1:partition1] executed in 98ms
Step: [step1:partition2] executed in 98ms
Step: [step1.master] executed in 131ms

3ファイルに対応してパーティション化された3つのstepが実行されているのがわかる。また、マルチスレッドなので出力はバラバラになる。

はまりどころ

Reader must be open before it can be read

spring-batchのRemote Chunkingをためす

https://docs.spring.io/spring-batch/docs/4.1.x/reference/html/spring-batch-integration.html#remote-chunking のRemote Chunkingをためす。

このサンプルは、Spring Integegration -> Active MQを介してMasterからworkerおprocessor + writerを呼び出す。

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    implementation 'org.springframework.boot:spring-boot-starter-integration'
    implementation 'org.springframework.integration:spring-integration-jms'
    implementation 'org.springframework.batch:spring-batch-integration'
    implementation 'org.springframework.boot:spring-boot-starter-artemis'
}

以下はMaster側。

package springbatchsample.remote.master;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import javax.jms.JMSException;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;

@EnableBatchProcessing
@SpringBootApplication
public class Application {    
    @Bean
    ActiveMQConnectionFactory connectionFactory() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL("tcp://localhost:61616");
        return factory;
    }
    
    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(requests())
                .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
                .get();
    }
    
    @Bean
    public QueueChannel replies() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
                .channel(replies())
                .get();
    }

    @Bean
    public ItemWriter<String> itemWriter() {
        MessagingTemplate messagingTemplate = new MessagingTemplate();
        messagingTemplate.setDefaultChannel(requests());
        messagingTemplate.setReceiveTimeout(2000);
        ChunkMessageChannelItemWriter<String> chunkMessageChannelItemWriter
                = new ChunkMessageChannelItemWriter<>();
        chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
        chunkMessageChannelItemWriter.setReplyChannel(replies());

        return chunkMessageChannelItemWriter;
    }
    
    @Bean
    public Job chunkJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        List<String> list = IntStream.range(0, 1001).mapToObj(Integer::toString).collect(Collectors.toList());
        ListItemReader<String> itemReader = new ListItemReader<String>(list);
        
        return jobBuilderFactory.get("personJob")
                 .start(stepBuilderFactory.get("step1")
                         .<String, String>chunk(200)
                         .reader(itemReader)
                         .writer(itemWriter())
                         .build())
                 .build();
    }
    
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }
}

次がworker側。

package springbatchsample.remote.worker;

import javax.jms.JMSException;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.step.item.ChunkProcessor;
import org.springframework.batch.core.step.item.SimpleChunkProcessor;
import org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;

@EnableBatchProcessing
@SpringBootApplication
public class Application {
    @Bean
    ActiveMQConnectionFactory connectionFactory() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL("tcp://localhost:61616");
        return factory;
    }
    
    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
                .channel(requests())
                .get();
    }
    
    @Bean
    public DirectChannel replies() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(replies())
                .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
                .get();
    }
    
    @Bean
    @ServiceActivator(inputChannel = "requests", outputChannel = "replies")
    public ChunkProcessorChunkHandler<String> chunkProcessorChunkHandler() {
        ChunkProcessor<String> chunkProcessor
                = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
        ChunkProcessorChunkHandler<String> chunkProcessorChunkHandler
                = new ChunkProcessorChunkHandler<>();
        chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
        return chunkProcessorChunkHandler;
    }
    
    ItemProcessor<String, String> itemProcessor() {
        return item -> {
            System.out.println("## processor:" + item);
            return item;
            };
    }
    
    ItemWriter<String> itemWriter() {
        return items -> {
            for (String string : items) {
                System.out.print(string + ",");
            }
            System.out.println("");
        };
    }
    
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }

}

実行前にはActive MQを起動しておく

master側は1-1000をrederで読み込みchunk(200)ごとにリモート呼び出しをする。workerは特に何もしておらず、200件ごとにwriterが一度呼ばれる。

Spring BootでApache ActiveMQの送受信

Spring Bootでhttp://activemq.apache.org/を使用しての送受信のhello worldレベルのことをやる。

手順など

Apache ActiveMQのインストール

http://activemq.apache.org/をインストールする。まずbin/artemis create 名前を実行する。以下はbin/artemis create mybrokerと実行したときの様子。

apache-artemis-2.10.1\bin>artemis.cmd create mybroker
Creating ActiveMQ Artemis instance at: C:\(中略)\apache-artemis-2.10.1\bin\mybroker

--user: is a mandatory property!
Please provide the default username:
kagami

--password: is mandatory with this configuration:
Please provide the default password:


--allow-anonymous | --require-login: is a mandatory property!
Allow anonymous access?, valid values are Y,N,True,False
Y

Auto tuning journal ...
done! Your system can make 0.95 writes per millisecond, your journal-buffer-timeout will be 1056000

You can now start the broker by executing:

   "(略)\apache-artemis-2.10.1\bin\mybroker\bin\artemis" run

Or you can setup the broker as Windows service and run it in the background:

   "(略)\apache-artemis-2.10.1\bin\mybroker\bin\artemis-service.exe" install
   "(略)\apache-artemis-2.10.1\bin\mybroker\bin\artemis-service.exe" start

   To stop the windows service:
      "(略)\apache-artemis-2.10.1\bin\mybroker\bin\artemis-service.exe" stop

   To uninstall the windows service
      "(略)\apache-artemis-2.10.1\bin\mybroker\bin\artemis-service.exe" uninstall

bin\mybroker\にファイル等が作られる。起動するにはmybroker\bin\artemis runする。実行時のログは以下。

apache-artemis-2.10.1\bin\mybroker\bin>artemis run
     _        _               _
    / \  ____| |_  ___ __  __(_) _____
   / _ \|  _ \ __|/ _ \  \/  | |/  __/
  / ___ \ | \/ |_/  __/ |\/| | |\___ \
 /_/   \_\|   \__\____|_|  |_|_|/___ /
 Apache ActiveMQ Artemis 2.10.1


2020-01-07 16:28:31,509 INFO  [org.apache.activemq.artemis.integration.bootstrap] AMQ101000: Starting ActiveMQ Artemis Server
2020-01-07 16:28:31,558 INFO  [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=false,journalDirectory=data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/large-messages,pagingDirectory=data/paging)
2020-01-07 16:28:31,588 INFO  [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
2020-01-07 16:28:31,626 INFO  [org.apache.activemq.artemis.core.server] AMQ221057: Global Max Size is being adjusted to 1/2 of the JVM max size (-Xmx). being defined as 536,870,912
2020-01-07 16:28:31,655 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
2020-01-07 16:28:31,656 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
2020-01-07 16:28:31,657 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-hornetq-protocol]. Adding protocol support for: HORNETQ
2020-01-07 16:28:31,657 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
2020-01-07 16:28:31,658 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
2020-01-07 16:28:31,659 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
2020-01-07 16:28:31,743 INFO  [org.apache.activemq.artemis.core.server] AMQ221034: Waiting indefinitely to obtain live lock
2020-01-07 16:28:31,744 INFO  [org.apache.activemq.artemis.core.server] AMQ221035: Live Server Obtained live lock
2020-01-07 16:28:32,804 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address DLQ supporting [ANYCAST]
2020-01-07 16:28:32,804 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue DLQ on address DLQ
2020-01-07 16:28:32,805 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address ExpiryQueue supporting [ANYCAST]
2020-01-07 16:28:32,805 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue ExpiryQueue on address ExpiryQueue
2020-01-07 16:28:33,925 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:61616 for protocols [CORE,MQTT,AMQP,STOMP,HORNETQ,OPENWIRE]
2020-01-07 16:28:33,961 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:5445 for protocols [HORNETQ,STOMP]
2020-01-07 16:28:33,992 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:5672 for protocols [AMQP]
2020-01-07 16:28:34,024 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:1883 for protocols [MQTT]
2020-01-07 16:28:34,051 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:61613 for protocols [STOMP]
2020-01-07 16:28:34,058 INFO  [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
2020-01-07 16:28:34,059 INFO  [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.10.1 [0.0.0.0, nodeID=100ac6af-3114-11ea-8aac-d89ef337aa82]
2020-01-07 16:28:34,517 INFO  [org.apache.activemq.hawtio.branding.PluginContextListener] Initialized activemq-branding plugin
2020-01-07 16:28:34,791 INFO  [org.apache.activemq.hawtio.plugin.PluginContextListener] Initialized artemis-plugin plugin
2020-01-07 16:28:37,100 INFO  [io.hawt.HawtioContextListener] Initialising hawtio services
2020-01-07 16:28:37,147 INFO  [io.hawt.system.ConfigManager] Configuration will be discovered via system properties
2020-01-07 16:28:37,155 INFO  [io.hawt.jmx.JmxTreeWatcher] Welcome to hawtio 1.5.5 : http://hawt.io/ : Don't cha wish your console was hawt like me? ;-)
2020-01-07 16:28:37,165 INFO  [io.hawt.jmx.UploadManager] Using file upload directory: (略)\apache-artemis-2.10.1\bin\mybroker\tmp\uploads
2020-01-07 16:28:37,248 INFO  [io.hawt.web.AuthenticationFilter] Starting hawtio authentication filter, JAAS realm: "activemq" authorized role(s): "amq" role principal classes: "org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal"
2020-01-07 16:28:37,366 INFO  [io.hawt.web.JolokiaConfiguredAgentServlet] Jolokia overridden property: [key=policyLocation, value=file:(略)/apache-artemis-2.10.1/bin/mybroker/etc/\jolokia-access.xml]
2020-01-07 16:28:37,856 INFO  [io.hawt.web.RBACMBeanInvoker] Using MBean [hawtio:type=security,area=jmx,rank=0,name=HawtioDummyJMXSecurity] for role based access control
2020-01-07 16:28:57,017 INFO  [io.hawt.system.ProxyWhitelist] Initial proxy whitelist: [localhost, 127.0.0.1, 10.48.52.34, p47187.intra.rakuten.co.jp, 172.17.231.161, P47187.mshome.net, 10.0.75.1, P47187.intra.rakuten.co.jp, 10.80.90.213, 192.168.56.1, kubernetes.docker.internal]
2020-01-07 16:28:57,968 INFO  [org.apache.activemq.artemis] AMQ241001: HTTP Server started at http://localhost:8161
2020-01-07 16:28:57,968 INFO  [org.apache.activemq.artemis] AMQ241002: Artemis Jolokia REST API available at http://localhost:8161/console/jolokia
2020-01-07 16:28:57,969 INFO  [org.apache.activemq.artemis] AMQ241004: Artemis Console available at http://localhost:8161/console
2020-01-07 16:29:02,076 INFO  [io.hawt.web.keycloak.KeycloakServlet] Keycloak integration is disabled
2020-01-07 16:29:12,886 INFO  [io.hawt.web.LoginServlet] hawtio login is using 1800 sec. HttpSession timeout
2020-01-07 16:29:46,430 WARN  [org.apache.activemq.artemis.core.server] AMQ222149: Message Reference[633]:RELIABLE:CoreMessage[messageID=633,durable=true,userID=cdb5084f-311c-11ea-8aac-d89ef337aa82,priority=4, timestamp=Tue Jan 07 16:10:36 JST 2020,expiration=0, durable=true, address=remotingQueue,size=1530,properties=TypedProperties[__HDR_dlqDeliveryFailureCause=java.lang.Throwable: Dispatch[7] to ID:P47187-64333-1578382186053-1:1:1:1 exceeds redelivery policy limit:RedeliveryPolicy {destination = null, collisionAvoidanceFactor = 0.15, maximumRedeliveries = 6, maximumRedeliveryDelay = -1, initialRedeliveryDelay = 1000, useCollisionAvoidance = false, useExponentialBackOff = false, backOffMultiplier = 5.0, redeliveryDelay = 1000, preDispatchCheck = true},__AMQ_CID=ID:P47187-63745-1578381036655-0:1,_AMQ_GROUP_SEQUENCE=0,__HDR_BROKER_IN_TIME=1578381036826,_AMQ_ROUTING_TYPE=1,__HDR_ARRIVAL=0,__HDR_COMMAND_ID=5,__HDR_PRODUCER_ID=[0000 0035 7B01 0021 4944 3A50 3437 3138 372D 3633 3734 352D 3135 3738 3338 3130 3336 3635 352D 313A 3100 0000 0000 0000 0100 0000 0000 0000 01),__HDR_MESSAGE_ID=[0000 0048 6E00 017B 0100 2149 443A 5034 3731 3837 2D36 3337 3435 2D31 3537  ...  0000 0000 0001 0000 0000 0000 0001 0000 0000 0000 0001 0000 0000 0000 0000),__HDR_DROPPABLE=false]]@828366005 has reached maximum delivery attempts, sending it to Dead Letter Address DLQ from remotingQueue
2020-01-07 16:30:08,654 WARN  [org.apache.activemq.artemis.core.client] AMQ212037: Connection failure to /127.0.0.1:64334 has been detected: An existing connection was forcibly closed by the remote host [code=GENERIC_EXCEPTION]
2020-01-07 16:30:38,355 WARN  [org.apache.activemq.artemis.core.client] AMQ212037: Connection failure to /127.0.0.1:64343 has been detected: An existing connection was forcibly closed by the remote host [code=GENERIC_EXCEPTION]
2020-01-07 16:44:50,527 WARN  [org.apache.activemq.artemis.core.client] AMQ212037: Connection failure to /127.0.0.1:64353 has been detected: An existing connection was forcibly closed by the remote host [code=GENERIC_EXCEPTION]

ログの最後の方にある通り http://localhost:8161/console で管理コンソールが開ける。id/passwordはartemis create xxxxx時に入力したものを使う。

これでhttp://activemq.apache.org/のインストールと起動は完了。

spring-bootからApache ActiveMQを使う

build.gradle

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-artemis'
}

プロパティ

application.ymlなどに接続先urlを指定する。

spring:
  activemq:
    broker-url: tcp://localhost:61616

送信

とりあえず適当な文字列を送信する。

import javax.jms.Queue;

import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;

@SpringBootApplication
public class Application implements CommandLineRunner {

    @Bean
    Queue queue() {
        return new ActiveMQQueue("remotingQueue");
    }
    
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }
    
    @Autowired
    private Queue queue;

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public void run(String... args) throws Exception {
        jmsTemplate.convertAndSend(queue, "message");
        System.exit(0);
    }
}

受信

import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.jms.annotation.JmsListener;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }

    @JmsListener(destination = "remotingQueue")
    public void listener(String message){
        System.out.println("Message received " + message);;
    }
}