kagamihogeの日記

kagamihogeの日記です。

Spring BootでApache ActiveMQの送受信

Spring Bootでhttp://activemq.apache.org/を使用しての送受信のhello worldレベルのことをやる。

手順など

Apache ActiveMQのインストール

http://activemq.apache.org/をインストールする。まずbin/artemis create 名前を実行する。以下はbin/artemis create mybrokerと実行したときの様子。

apache-artemis-2.10.1\bin>artemis.cmd create mybroker
Creating ActiveMQ Artemis instance at: C:\(中略)\apache-artemis-2.10.1\bin\mybroker

--user: is a mandatory property!
Please provide the default username:
kagami

--password: is mandatory with this configuration:
Please provide the default password:


--allow-anonymous | --require-login: is a mandatory property!
Allow anonymous access?, valid values are Y,N,True,False
Y

Auto tuning journal ...
done! Your system can make 0.95 writes per millisecond, your journal-buffer-timeout will be 1056000

You can now start the broker by executing:

   "(略)\apache-artemis-2.10.1\bin\mybroker\bin\artemis" run

Or you can setup the broker as Windows service and run it in the background:

   "(略)\apache-artemis-2.10.1\bin\mybroker\bin\artemis-service.exe" install
   "(略)\apache-artemis-2.10.1\bin\mybroker\bin\artemis-service.exe" start

   To stop the windows service:
      "(略)\apache-artemis-2.10.1\bin\mybroker\bin\artemis-service.exe" stop

   To uninstall the windows service
      "(略)\apache-artemis-2.10.1\bin\mybroker\bin\artemis-service.exe" uninstall

bin\mybroker\にファイル等が作られる。起動するにはmybroker\bin\artemis runする。実行時のログは以下。

apache-artemis-2.10.1\bin\mybroker\bin>artemis run
     _        _               _
    / \  ____| |_  ___ __  __(_) _____
   / _ \|  _ \ __|/ _ \  \/  | |/  __/
  / ___ \ | \/ |_/  __/ |\/| | |\___ \
 /_/   \_\|   \__\____|_|  |_|_|/___ /
 Apache ActiveMQ Artemis 2.10.1


2020-01-07 16:28:31,509 INFO  [org.apache.activemq.artemis.integration.bootstrap] AMQ101000: Starting ActiveMQ Artemis Server
2020-01-07 16:28:31,558 INFO  [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=false,journalDirectory=data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/large-messages,pagingDirectory=data/paging)
2020-01-07 16:28:31,588 INFO  [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
2020-01-07 16:28:31,626 INFO  [org.apache.activemq.artemis.core.server] AMQ221057: Global Max Size is being adjusted to 1/2 of the JVM max size (-Xmx). being defined as 536,870,912
2020-01-07 16:28:31,655 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
2020-01-07 16:28:31,656 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
2020-01-07 16:28:31,657 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-hornetq-protocol]. Adding protocol support for: HORNETQ
2020-01-07 16:28:31,657 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
2020-01-07 16:28:31,658 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
2020-01-07 16:28:31,659 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
2020-01-07 16:28:31,743 INFO  [org.apache.activemq.artemis.core.server] AMQ221034: Waiting indefinitely to obtain live lock
2020-01-07 16:28:31,744 INFO  [org.apache.activemq.artemis.core.server] AMQ221035: Live Server Obtained live lock
2020-01-07 16:28:32,804 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address DLQ supporting [ANYCAST]
2020-01-07 16:28:32,804 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue DLQ on address DLQ
2020-01-07 16:28:32,805 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address ExpiryQueue supporting [ANYCAST]
2020-01-07 16:28:32,805 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue ExpiryQueue on address ExpiryQueue
2020-01-07 16:28:33,925 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:61616 for protocols [CORE,MQTT,AMQP,STOMP,HORNETQ,OPENWIRE]
2020-01-07 16:28:33,961 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:5445 for protocols [HORNETQ,STOMP]
2020-01-07 16:28:33,992 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:5672 for protocols [AMQP]
2020-01-07 16:28:34,024 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:1883 for protocols [MQTT]
2020-01-07 16:28:34,051 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:61613 for protocols [STOMP]
2020-01-07 16:28:34,058 INFO  [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
2020-01-07 16:28:34,059 INFO  [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.10.1 [0.0.0.0, nodeID=100ac6af-3114-11ea-8aac-d89ef337aa82]
2020-01-07 16:28:34,517 INFO  [org.apache.activemq.hawtio.branding.PluginContextListener] Initialized activemq-branding plugin
2020-01-07 16:28:34,791 INFO  [org.apache.activemq.hawtio.plugin.PluginContextListener] Initialized artemis-plugin plugin
2020-01-07 16:28:37,100 INFO  [io.hawt.HawtioContextListener] Initialising hawtio services
2020-01-07 16:28:37,147 INFO  [io.hawt.system.ConfigManager] Configuration will be discovered via system properties
2020-01-07 16:28:37,155 INFO  [io.hawt.jmx.JmxTreeWatcher] Welcome to hawtio 1.5.5 : http://hawt.io/ : Don't cha wish your console was hawt like me? ;-)
2020-01-07 16:28:37,165 INFO  [io.hawt.jmx.UploadManager] Using file upload directory: (略)\apache-artemis-2.10.1\bin\mybroker\tmp\uploads
2020-01-07 16:28:37,248 INFO  [io.hawt.web.AuthenticationFilter] Starting hawtio authentication filter, JAAS realm: "activemq" authorized role(s): "amq" role principal classes: "org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal"
2020-01-07 16:28:37,366 INFO  [io.hawt.web.JolokiaConfiguredAgentServlet] Jolokia overridden property: [key=policyLocation, value=file:(略)/apache-artemis-2.10.1/bin/mybroker/etc/\jolokia-access.xml]
2020-01-07 16:28:37,856 INFO  [io.hawt.web.RBACMBeanInvoker] Using MBean [hawtio:type=security,area=jmx,rank=0,name=HawtioDummyJMXSecurity] for role based access control
2020-01-07 16:28:57,017 INFO  [io.hawt.system.ProxyWhitelist] Initial proxy whitelist: [localhost, 127.0.0.1, 10.48.52.34, p47187.intra.rakuten.co.jp, 172.17.231.161, P47187.mshome.net, 10.0.75.1, P47187.intra.rakuten.co.jp, 10.80.90.213, 192.168.56.1, kubernetes.docker.internal]
2020-01-07 16:28:57,968 INFO  [org.apache.activemq.artemis] AMQ241001: HTTP Server started at http://localhost:8161
2020-01-07 16:28:57,968 INFO  [org.apache.activemq.artemis] AMQ241002: Artemis Jolokia REST API available at http://localhost:8161/console/jolokia
2020-01-07 16:28:57,969 INFO  [org.apache.activemq.artemis] AMQ241004: Artemis Console available at http://localhost:8161/console
2020-01-07 16:29:02,076 INFO  [io.hawt.web.keycloak.KeycloakServlet] Keycloak integration is disabled
2020-01-07 16:29:12,886 INFO  [io.hawt.web.LoginServlet] hawtio login is using 1800 sec. HttpSession timeout
2020-01-07 16:29:46,430 WARN  [org.apache.activemq.artemis.core.server] AMQ222149: Message Reference[633]:RELIABLE:CoreMessage[messageID=633,durable=true,userID=cdb5084f-311c-11ea-8aac-d89ef337aa82,priority=4, timestamp=Tue Jan 07 16:10:36 JST 2020,expiration=0, durable=true, address=remotingQueue,size=1530,properties=TypedProperties[__HDR_dlqDeliveryFailureCause=java.lang.Throwable: Dispatch[7] to ID:P47187-64333-1578382186053-1:1:1:1 exceeds redelivery policy limit:RedeliveryPolicy {destination = null, collisionAvoidanceFactor = 0.15, maximumRedeliveries = 6, maximumRedeliveryDelay = -1, initialRedeliveryDelay = 1000, useCollisionAvoidance = false, useExponentialBackOff = false, backOffMultiplier = 5.0, redeliveryDelay = 1000, preDispatchCheck = true},__AMQ_CID=ID:P47187-63745-1578381036655-0:1,_AMQ_GROUP_SEQUENCE=0,__HDR_BROKER_IN_TIME=1578381036826,_AMQ_ROUTING_TYPE=1,__HDR_ARRIVAL=0,__HDR_COMMAND_ID=5,__HDR_PRODUCER_ID=[0000 0035 7B01 0021 4944 3A50 3437 3138 372D 3633 3734 352D 3135 3738 3338 3130 3336 3635 352D 313A 3100 0000 0000 0000 0100 0000 0000 0000 01),__HDR_MESSAGE_ID=[0000 0048 6E00 017B 0100 2149 443A 5034 3731 3837 2D36 3337 3435 2D31 3537  ...  0000 0000 0001 0000 0000 0000 0001 0000 0000 0000 0001 0000 0000 0000 0000),__HDR_DROPPABLE=false]]@828366005 has reached maximum delivery attempts, sending it to Dead Letter Address DLQ from remotingQueue
2020-01-07 16:30:08,654 WARN  [org.apache.activemq.artemis.core.client] AMQ212037: Connection failure to /127.0.0.1:64334 has been detected: An existing connection was forcibly closed by the remote host [code=GENERIC_EXCEPTION]
2020-01-07 16:30:38,355 WARN  [org.apache.activemq.artemis.core.client] AMQ212037: Connection failure to /127.0.0.1:64343 has been detected: An existing connection was forcibly closed by the remote host [code=GENERIC_EXCEPTION]
2020-01-07 16:44:50,527 WARN  [org.apache.activemq.artemis.core.client] AMQ212037: Connection failure to /127.0.0.1:64353 has been detected: An existing connection was forcibly closed by the remote host [code=GENERIC_EXCEPTION]

ログの最後の方にある通り http://localhost:8161/console で管理コンソールが開ける。id/passwordはartemis create xxxxx時に入力したものを使う。

これでhttp://activemq.apache.org/のインストールと起動は完了。

spring-bootからApache ActiveMQを使う

build.gradle

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-artemis'
}

プロパティ

application.ymlなどに接続先urlを指定する。

spring:
  activemq:
    broker-url: tcp://localhost:61616

送信

とりあえず適当な文字列を送信する。

import javax.jms.Queue;

import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;

@SpringBootApplication
public class Application implements CommandLineRunner {

    @Bean
    Queue queue() {
        return new ActiveMQQueue("remotingQueue");
    }
    
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }
    
    @Autowired
    private Queue queue;

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public void run(String... args) throws Exception {
        jmsTemplate.convertAndSend(queue, "message");
        System.exit(0);
    }
}

受信

import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.jms.annotation.JmsListener;

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);
    }

    @JmsListener(destination = "remotingQueue")
    public void listener(String message){
        System.out.println("Message received " + message);;
    }
}

spring-batchのParallel Stepsをためす

https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#scalabilityParallelSteps のパラレルstepを試す。http://kagamihoge.hatenablog.com/entry/2020/01/07/110847 は単一のstepをマルチスレッド化する方法だが、こちらは複数のstepをパラレルに実行する方法となる。

というわけで、とりあえず使ってみる。以下は、2つのパラレルなstepがあり、それぞれのstepで1-500・501-1000の合計を出し、最後にそれらを足し合わせる、というもの。あまり意味のあるサンプルではないが、その辺は勘弁願いたい。

@SpringBootApplication
@EnableBatchProcessing
public class App {
    static AtomicInteger sum = new AtomicInteger(0);
    
    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
            .start(splitFlow(steps))
            .build()
            .listener(new JobExecutionListener() {
                @Override
                public void beforeJob(JobExecution jobExecution) {}
                
                @Override
                public void afterJob(JobExecution jobExecution) {
                    System.out.println("sum="+sum);
                }
            })
            .build();
    }
    
    @Bean
    public Flow splitFlow(StepBuilderFactory steps) {
        return new FlowBuilder<SimpleFlow>("splitFlow")
            .split(taskExecutor())
            .add(
              flow(steps, "flow1", 1, 501), 
              flow(steps, "flow2", 501, 1001))
            .build();
    }
    
    public Flow flow(StepBuilderFactory steps, String name, int start, int end) {
        return new FlowBuilder<SimpleFlow>(name)
            .start(step(steps, name, start, end))
            .build();
    }
    
    @Transactional
    public Step step(StepBuilderFactory steps, String flowname, int start, int end) {
        List<Integer> list = IntStream.range(start, end).boxed().collect(Collectors.toList());
        ItemReader<Integer> reader = new MyItemReader(new ListItemReader<Integer>(list));
        
        MyItemWriter writer = new MyItemWriter();
        TaskletStep step = steps
                .get(flowname + "step1")
                .<Integer ,Integer>chunk(10)
                .reader(reader)
                .writer(writer)
                .build();
        
        return step;
    }
        
    @Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor("spring_batch");
    }

    public static void main(String[] args) {
        new SpringApplicationBuilder(App.class).run(args);
    }

}
public class MyItemWriter implements ItemWriter<Integer>, StepExecutionListener {

    int sum = 0;
    @Override
    public void write(List<? extends Integer> items) throws Exception {
        items.forEach(i -> {
            sum += i;
        });
    }
    @Override
    public void beforeStep(StepExecution stepExecution) { }
    
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("step sum=" + sum);
        App.sum.addAndGet(sum);
        
        return stepExecution.getExitStatus();
    }

}

これを実行すると以下のような実行結果が得られる。

Executing step: [flow2step1]
Executing step: [flow1step1]
step sum=125250
Step: [flow1step1] executed in 116ms
step sum=375250
Step: [flow2step1] executed in 116ms
sum=500500

spring-batchのTaskExecutorによるstepマルチスレッド化

https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#scalability のMulti-threaded Stepを試す。

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

sourceCompatibility = '11'
targetCompatibility = '11'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    runtimeOnly 'com.h2database:h2'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.springframework.batch:spring-batch-test'
}

test {
    useJUnitPlatform()
}

taskExecutorを指定することでそのstepはマルチスレッドで実行される。以下のコードは1-1000の合計数を出すサンプルコードだが、意図通りには動作しない。その点は後述。

@SpringBootApplication
@EnableBatchProcessing
public class App {
    
    @Bean
    public Job job(JobBuilderFactory jobs, @Qualifier("s1") Step s1) {
        return jobs
                .get("myJob")
                .incrementer(new RunIdIncrementer())
                .start(s1)
                .listener(new JobExecutionListener() {
                    @Override
                    public void beforeJob(JobExecution jobExecution) {}
                    
                    @Override
                    public void afterJob(JobExecution jobExecution) {
                        System.out.println(sum);
                    }
                })
                .build();
    }
    
    int sum = 0;
    
    @Bean(name = "s1")
    public Step step1(StepBuilderFactory steps) {
        List<Integer> list = IntStream.range(1, 1001).boxed().collect(Collectors.toList());
        ItemReader<Integer> reader = new ListItemReader<Integer>(list);
        
        TaskletStep step = steps
                .get("step1")
                .<Integer ,Integer>chunk(10)
                .reader(reader)
                .writer(l -> {
                    l.forEach(i -> {
                        System.out.print(i+",");
                        sum += i;
                    });
                    System.out.println();
                })
                .taskExecutor(taskExecutor())
                .build();
        
        return step;
    }
    
    @Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor("spring_batch");
    }

    public static void main(String[] args) {
        new SpringApplicationBuilder(App.class).run(args);
    }

}

これの実行時の様子は以下の通り。

4,9,17,22,24,29,31,34,36,1,3,9,14,17,22,24,29,31,34,36,
2,5,6,7,8,10,12,15,19,21,23,27,37,
28,30,32,
(省略)
sum=501421

期待される合計値は500500だがそうはならない。これの原因は、マルチスレッドでstepを実行する際に複数スレッドでreader-processor-writerインスタンスを共有するため。プログラム内容に応じて適切な同期化を施さないと期待通りには動かない。上記の通り、readの同期化が無いために9が2回出てきちゃったりしている。また、上記実行状況の通り処理順序がバラバラになるため、順序が重要な場合にはこの方法は適さない。

同期化で改善するには、まず、合計を保持する変数をAtomicIntegerにする。

次に、ItemReaderを同期化対応版にする。spring-batchが提供する各種実装の大半はスレッドセーフではない。javadocに何らか回避策について記述が有る場合もあれば無い場合もある。非スレッドセーフの場合SynchronizedItemStreamReaderでラッピングしたり、自前のラッピングクラス作ったりで対応する。今回はとりあえず自前のクラス作る方向にしてみる。

public class MyItemReader implements ItemReader<Integer> {
    ItemReader<Integer> reader;
    public MyItemReader(ItemReader<Integer> reader) {
        this.reader = reader;
    }
    
    public synchronized Integer read() throws UnexpectedInputException, ParseException, NonTransientResourceException, Exception {
        return reader.read();
    }

}

単純にListItemReaderをラッピングしてreadsynchronizedで行う。

改善後のソースコードは以下の通り。

@SpringBootApplication
@EnableBatchProcessing
public class App {
    
    @Bean
    public Job job(JobBuilderFactory jobs, @Qualifier("s1") Step s1) {/*省略*/}
    
    static AtomicInteger sum = new AtomicInteger(0);
    
    @Bean(name = "s1")
    public Step step1(StepBuilderFactory steps) {
        List<Integer> list = IntStream.range(1, 1001).boxed().collect(Collectors.toList());
        ItemReader<Integer> reader = new MyItemReader(new ListItemReader<Integer>(list));
        
        TaskletStep step = steps
                .get("step1")
                .<Integer ,Integer>chunk(10)
                .reader(reader)
                .writer(l -> {
                    l.forEach(i -> {
                        sum.addAndGet(i);
                        System.out.print(i+",");
                    });
                    System.out.println();
                })
                .taskExecutor(taskExecutor())
                .build();
        
        return step;
    }
    
    @Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor("spring_batch");
    }

    public static void main(String[] args) {
        new SpringApplicationBuilder(App.class).run(args);
    }

}

感想とか

お手軽にstepをマルチスレッド化できる。ただし、処理順序であるとかreader-processor-writerに同期化の考慮が必要とか、若干のクセがあるのでそこは注意が必要。