kagamihogeの日記

kagamihogeの日記です。

Spring Cloud Stream(RabbitMQ)でhello-worldレベルのconsumer

Spring Cloud StreamRabbitMQと組み合わせて、RabbitMQ管理画面から文字列をpublishしてspringアプリケーションでconsumeするだけのサンプルを作成する。

ソースコード・手順

build.gradle

https://start.spring.io/ でCloud Stream, Spring for RabbitMQと後はお好みの依存性を追加してbuild.gradleを生成する。

plugins {
  id 'java'
  id 'org.springframework.boot' version '3.1.4'
  id 'io.spring.dependency-management' version '1.1.3'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
  sourceCompatibility = '17'
}

configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}

repositories {
  mavenCentral()
}

ext {
  set('springCloudVersion', "2022.0.4")
}

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-amqp'
  implementation 'org.springframework.cloud:spring-cloud-stream'
  implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.amqp:spring-rabbit-test'
  testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
}

dependencyManagement {
  imports {
    mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
  }
}

tasks.named('test') {
  useJUnitPlatform()
}

dockerでRabbitMQを起動

手軽にpublishするために管理画面を使用したいのでrabbitmq:3ではなくrabbitmq:3-managementを使用する。username/passwordはデフォルト(guest / guest)のままで良い。

sudo docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

consumer

import java.util.function.Consumer;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class SpringCloudStreamSample {
  public static void main(String[] args) {
    new SpringApplicationBuilder(SpringCloudStreamSample.class)
      .web(WebApplicationType.NONE)
      .run(args);
  }

  @Bean
  public Consumer<String> consume() {
    return payload -> System.out.println(payload);
  }
}

上記でconsumerを起動する。プロパティファイルが不要な点は後述。

管理画面からpublish

http://localhost:15672/ でRabbitMQ管理画面を開いてデフォルトのusername/passwordでログインする。上記のconsumeというメソッド名によりconsume-in-0というexchangeが自動生成されてるのでそこから適当な文字列をpublishすると、springのconsumerがそれを出力する。

デフォルトで色々自動にやってくれるのでプロパティファイルは無くても起動可能

以下は起動ログの一部だが、consume-in-0.anonymous.jtC4TT2IS_mRDG4XtPEquwというqueueを作ったとか consume-in-0にバインドしたとかlocal_rabbitとかいうバインダーを作っただとか、あれこれ自動生成しているのが分かる。

No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
LiveReload server is running on port 35729
Channel 'application.consume-in-0' has 1 subscriber(s).
Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
Channel 'application.errorChannel' has 1 subscriber(s).
started bean '_org.springframework.integration.errorLogger'
Creating binder: local_rabbit
Constructing binder child context for local_rabbit
Caching the binder: local_rabbit
declaring queue for inbound: consume-in-0.anonymous.jtC4TT2IS_mRDG4XtPEquw, bound to: consume-in-0
Attempting to connect to: [localhost:5672]
Created new connection: rabbitConnectionFactory#24ae5bd5:0/SimpleConnection@4370a547 [delegate=amqp://guest@127.0.0.1:5672/, localPort=51812]
Channel 'rabbit-62463784.consume-in-0.errors' has 1 subscriber(s).
Channel 'rabbit-62463784.consume-in-0.errors' has 2 subscriber(s).
started bean 'inbound.consume-in-0.anonymous.jtC4TT2IS_mRDG4XtPEquw'
Started SpringCloudStreamSample in 2.018 seconds (process running for 2.388)

また、RabbitProperties.javaのusername/passwordあたりのデフォルト値はRabbitMQのdocker runのサンプルと同一になっている。なので一切プロパティが無くても最低限のサンプルコード程度であれば起動する。

package org.springframework.boot.autoconfigure.amqp;

@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
    private static final int DEFAULT_PORT = 5672;

    private String host = "localhost";
    private Integer port;
    private String username = "guest";
    private String password = "guest";

spring-integrationでcommons-ioを使用したファイルのtail

Spring Integrationでファイルのtailを実現する。デフォルトではOSのtailコマンドを実行するが、設定によりOS非依存のApache Commons IOに切り替えられる。今回はそのサンプルコードについて。

ソースコードなど

build.gradle

https://start.spring.io/ を使用してspring-initegration関連を含めた依存性を作成する。

plugins {
  id 'java'
  id 'org.springframework.boot' version '3.1.4'
  id 'io.spring.dependency-management' version '1.1.3'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'

java {
  sourceCompatibility = '17'
}

repositories {
  mavenCentral()
}

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-integration'
  implementation 'org.springframework.integration:spring-integration-file'
  
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.integration:spring-integration-test'
}

tasks.named('test') {
  useJUnitPlatform()
}

java

import java.nio.file.Path;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.dsl.TailAdapterSpec;

@SpringBootApplication
public class ApacheCommonsFileTailingApplication {
  public static void main(String[] args) {
    new SpringApplicationBuilder(ApacheCommonsFileTailingApplication.class)
      .web(WebApplicationType.NONE)
      .run(args);
  }

  @Bean
  public IntegrationFlow fileReadingFlow() {
    TailAdapterSpec tailAdapter = Files.tailAdapter(Path.of("sample.txt").toFile())
        .delay(1000)
        .end(true)
        .reopen(false);

    return IntegrationFlow.from(tailAdapter).handle("sampleBean", "sampleHandle").get();
  }
}
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class SampleBean {
  public void sampleHandle(Message<String> e) {
    System.out.println(e);
  }
}

これを実行してsample.txtを修正すると以下のような出力が得られる。

GenericMessage [payload=hogehoge, headers={file_originalFile=sample.txt, id=97e015a2-ffdf-28b5-2b3e-b0f9e8119d08, file_name=sample.txt, timestamp=1696850274578}]

基本的にはFiles.tailAdapterを使用するだけ。FileTailInboundChannelAdapterFactoryBean.java#L243 を見ると if (this.delay == null && this.end == null && this.reopen == null) のelseの場合にApacheCommonsFileTailingMessageProducerに切り替わるのが分かる。

ハマった点

windowsでデフォルト設定のFiles.tailAdapterを使用するとtailコマンドが無くてエラーになる

以下のエラーのように、デフォルト設定でwindowsだとtailコマンドが見つからなくて実行時エラーになる。

Exception in thread "SimpleAsyncTaskExecutor-1" org.springframework.messaging.MessagingException: Failed to exec tail command: 'tail -F -n 0 C:\java\workspaces\e202212\d\sp\integfilepolling\sample.txt'
    at org.springframework.integration.file.tail.OSDelegatingFileTailingMessageProducer.runExec(OSDelegatingFileTailingMessageProducer.java:136)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Cannot run program "tail": CreateProcess error=2, 指定されたファイルが見つかりません。
    at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1143)
    at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1073)
    at java.base/java.lang.Runtime.exec(Runtime.java:594)
    at java.base/java.lang.Runtime.exec(Runtime.java:453)

tailはWSLでwindowsでも実行可能。ただOSDelegatingFileTailingMessageProducer.java#L97を見るとtailとべた書きしてあるのでwindowsでどうにかこうにか動かす方法は分からなかった。

commons-ioのTailerが2回実行される

こちらはspring-integrationと直接の関係は無い。ApacheCommonsFileTailingMessageProducerが裏で使用するライブラリを直接使用する場合、javadocを見てなんとなく以下のように書くとhandleが2回呼ばれてしまう。

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.input.Tailer;
import org.apache.commons.io.input.TailerListener;
import org.apache.commons.io.input.TailerListenerAdapter;

public class SampleTailer extends TailerListenerAdapter {
  @Override
  public void handle(String line) {
    System.out.println(line);
  }
  
  @Override
  public void handle(Exception ex) {
    System.out.println(ex);
  }
  
  public static void main(String[] args) throws InterruptedException {
    TailerListener listener = new SampleTailer();
    Tailer tailer = Tailer.create(new File("sample.txt"), listener, 1000, true); 

    ExecutorService ex = Executors.newFixedThreadPool(1);
    ex.submit(tailer); 
  }
}

この原因はTailer.createの中でThread#startするため。TailerRunnableをimplementsしているので、単にこのクラスを生成して適当なExecutorServiceにsubmitで良い。

Tailer tailer = new Tailer(new File("sample.txt"), listener, 1000, true);
ExecutorService ex = Executors.newFixedThreadPool(1);
ex.submit(tailer);

gradleのversion catalogでsubprojectで共通のバージョンを定義

JJUG CCC 2023 Spring で最近のGradleにはversion catalogというmultiple projectでバージョン定義を共有する仕組みがあるのを知った。その時に講演していた方のプレゼン資料は Gradleと仲良くなる第一歩 ~小規模PJから大規模PJへ~ にある。

https://docs.gradle.org/current/userguide/platforms.html 公式ドキュメントとしてはこのあたりを参照している。

ソースコードなど

以下の記述においてdependencyResolutionManagementsettings.gradleに記述する。

version

  • java 17
  • gradle 8.1.1

library - バージョン定義のエイリアス

settings.gradle にライブラリのバージョン定義にエイリアスをつけられる。各multiple projectのbuild.gradleはそのエイリアスでバージョン定義を参照できる。

以下はgroovyとcommon-lang3のエイリアスを作成している。

dependencyResolutionManagement {
    versionCatalogs {
        libs {
            library('groovy-core', 'org.apache.groovy:groovy:4.0.12')
            library('commons-lang3', 'org.apache.commons', 'commons-lang3').version('3.12.0')
        }
    }
}

build.gradleでは以下のようにエイリアスを使用する。ハイフン("-"), アンダースコア(".")はドットに読み替えの必要がある。公式ドキュメント的にはセパレータはハイフンを推奨している。

  implementation(libs.groovy.core)
  implementation libs.commons.lang3

セパレータで分かりやすく区切るかどうかは任意で、お気に召さなければgroovyCoreでも良いよ、と公式ドキュメントには書いてある。

version, versionRef - バージョン番号のエイリアス

バージョン番号のみのエイリアスversionを使用する。

以下のようにgroovy関連がすべて同一のバージョン番号の場合はversionでそのバージョン番号を定義してversionRefで参照する。

dependencyResolutionManagement {
    versionCatalogs {
        libs {
            version('groovy', '4.0.12')
            version('java-version', '17')
            
            library('groovy-core', 'org.apache.groovy', 'groovy').versionRef('groovy')
            library('groovy-json', 'org.apache.groovy', 'groovy-json').versionRef('groovy')
            library('groovy-nio', 'org.apache.groovy', 'groovy-nio').versionRef('groovy')
        }
    }
}

build.gradleでの使用方法はlibraryと変わらない。

  implementation(libs.groovy.core)
  implementation(libs.groovy.json)
  implementation(libs.groovy.nio)

また、dependencies以外でもversionは参照できる。以下はsettings.gradle に定義したjavaのversionをbuild.gradleで参照している。

sourceCompatibility = libs.versions.java.version.get()

bundle - 複数の定義をまとめる

複数のバージョン定義をまとめたグループに対してエイリアスを作れる。以下はspring-bootのjdbcとwebをspring-bundleというbundleにまとめている。この場合バージョンは不要なのでwithoutVersion()を付与している。

dependencyResolutionManagement {
    versionCatalogs {
        libs {
            library('spring-boot-starter-jdbc', 'org.springframework.boot', 'spring-boot-starter-jdbc').withoutVersion()
            library('spring-boot-starter-web', 'org.springframework.boot', 'spring-boot-starter-web').withoutVersion()
            
            bundle('spring-bundle', ['spring-boot-starter-jdbc', 'spring-boot-starter-web'])
        }
    }
}

build.gradleでは以下で参照する。

implementation libs.bundles.spring.bundle

pluginのエイリアス

pluginは以下のようにエイリアスを作成する。以下はpluginでspring-bootのバージョンのエイリアスを作成している。

dependencyResolutionManagement {
    versionCatalogs {
        libs {
            plugin('spring-boot-version', 'org.springframework.boot').version('3.1.0')
        }
    }
}

build.gradleでは以下のように使用する。

plugins {
  id 'java'
  // id 'org.springframework.boot' version '3.1.0' // これと同じ結果になる。
  alias(libs.plugins.spring.boot.version)
  id 'io.spring.dependency-management' version '1.1.0'
}

複数カタログ

基本的にはlibsを使用するが自前定義のカタログを作成できる。以下はspringLibsという自前定義のカタログを追加している。

dependencyResolutionManagement {
    versionCatalogs {
        libs {
          // (省略)
        }
        springLibs {
            library('spring-boot-starter-jdbc', 'org.springframework.boot', 'spring-boot-starter-jdbc').withoutVersion()
            library('spring-boot-starter-web', 'org.springframework.boot', 'spring-boot-starter-web').withoutVersion()
            
            bundle('spring-bundle', ['spring-boot-starter-jdbc', 'spring-boot-starter-web'])
        }
    }
}

自前定義のカタログは以下のように使用する。

implementation springLibs.bundles.spring.bundle

tomlファイル

ちゃんと試してないのだけど以下のような形式のファイルでversion catalogを定義できるらしい。別ファイルに切り出して、別プロジェクトでもバージョン定義を共有したい、場合に有用らしい。

[versions]
groovy = "3.0.5"
checkstyle = "8.37"

[libraries]
groovy-core = { module = "org.codehaus.groovy:groovy", version.ref = "groovy" }
groovy-json = { module = "org.codehaus.groovy:groovy-json", version.ref = "groovy" }
groovy-nio = { module = "org.codehaus.groovy:groovy-nio", version.ref = "groovy" }
commons-lang3 = { group = "org.apache.commons", name = "commons-lang3", version = { strictly = "[3.8, 4.0[", prefer="3.9" } }

[bundles]
groovy = ["groovy-core", "groovy-json", "groovy-nio"]

[plugins]
versions = { id = "com.github.ben-manes.versions", version = "0.45.0" }

プロジェクト間のversion catalog共有の方法については https://docs.gradle.org/current/userguide/platforms.html#sec:version-catalog-plugin に自前のpluginを書いてgitにアップという方法も紹介している。