Spring BootからApache Kafkaにpublish/subscribeする。
環境
dockerでkafka起動
上記を参考に構築する。本エントリで関係する書き換え箇所は以下の通り。
KAFKA_ADVERTISED_HOST_NAME
- ホストのローカルアドレスを指定。"19092:9092"
- 後々、ホストのSpring Bootからアクセスする際にポートを指定する。なので適当な値でポートフォワーディングしている。KAFKA_CREATE_TOPICS
- 起動時に動作確認用のtopicを作成。
version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: build: . ports: - "19092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 192.168.10.210 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS: "Topic:1:1" volumes: - /var/run/docker.sock:/var/run/docker.sock
CLIでpublish/subscribeが出来るか動作確認する。
まずkafkaのdockerに入る。
docker exec -it kafka-docker_kafka_1 bash
topicの一覧を取得する。KAFKA_CREATE_TOPICS
で指定したtopicが作られているのが分かる。
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list Topic
topicの新規作成。
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --create --topic=mytopic Created topic mytopic.
topicにpublishする。
bash-4.4# $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic >sad >
topicからconsumeする。
bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic=mytopic
Spring Boot
上記で構築したkafkaにSpring Bootからpublish/subscribeする。
基本的なことはSpring Bootのドキュメントに記載がある。
string
まず単純な文字列のpublish/subscribeをする。
gradleに関連する依存性などを追加する。Spring Initializr https://start.spring.io/ で必要なものを追加する。Dependenciesにkafka
と入力すると、Spring for Apache KafkaとSpring for Apache Kafka Streamsが出てくるが、とりあえずはSpring for Apache Kafkaだけで良い。
plugins { id 'org.springframework.boot' version '2.3.4.RELEASE' id 'io.spring.dependency-management' version '1.0.10.RELEASE' id 'java' } group = 'com.example' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.kafka:spring-kafka' compileOnly 'org.projectlombok:lombok' developmentOnly 'org.springframework.boot:spring-boot-devtools' annotationProcessor 'org.projectlombok:lombok' testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } testImplementation 'org.springframework.kafka:spring-kafka-test' } test { useJUnitPlatform() }
application.properties
で設定を行う。
spring.kafka.bootstrap-servers=localhost:19092 spring.kafka.consumer.group-id=myGroup
Spring Boot起動時にpublishを一回行うMainクラスを作成する。
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import lombok.AllArgsConstructor; @SpringBootApplication @AllArgsConstructor public class DemoApplication implements CommandLineRunner { StringPublisher publisher; public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @Override public void run(String... args) throws Exception { publisher.publish(); } }
publishするクラス。
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import lombok.AllArgsConstructor; @Component @AllArgsConstructor public class StringPublisher { KafkaTemplate<String, String> kafkaTemplate; void publish() { kafkaTemplate.send("mytopic", "message"); } }
consumeするクラス。
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class StringConsumer { @KafkaListener(topics = "mytopic") public void processMessage(String content) { System.out.println(content); } }
JSON
次にPOJOのpublish/subscribeをする。実際にはPOJOからJSONにシリアライズ・デシリアライズをして通信を行う。なので、まずjackson-databindの依存性を追加する。
implementation 'com.fasterxml.jackson.core:jackson-databind'
application.properties
にJSONシリアライズ・デシリアライズの設定を追加する。デフォルトはStringにシリアライズ・デシリアライズするため、これをJSONに切り替える。また、そのJSONクラスは信頼されたpackageに入れる必要がある。今回は説明簡易化のため、すべて信頼するように*
を指定している。
pring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=*
JSON用のMainクラスを作成する。
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import lombok.AllArgsConstructor; @SpringBootApplication @AllArgsConstructor public class DemoApplication implements CommandLineRunner { JsonPublisher publisher; public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @Override public void run(String... args) throws Exception { publisher.publish(); } }
POJOのクラス。
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @AllArgsConstructor @NoArgsConstructor public class SampleData { String id; int value; }
pubishをするクラス。
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import lombok.AllArgsConstructor; @Component @AllArgsConstructor public class JsonPublisher { KafkaTemplate<String, SampleData> kafkaTemplate; void publish() { kafkaTemplate.send("mytopic", new SampleData("id001", 334)); } }
consumeをするクラス。
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class JsonConsumer { @KafkaListener(topics = "mytopic") public void processMessage(SampleData content) { System.out.println(content); } }
ハマったところ
/bin/sh: illegal option -
docker-compose up -d --build
の際に以下のエラーが発生した。
OK: 349 MiB in 74 packages /bin/sh: illegal option - ERROR: Service 'kafka' failed to build : The command '/bin/sh -c apk add --no-cache bash curl jq docker && chmod a+x /tmp/*.sh && mv /tmp/start-kafka.sh /tmp/broker-list.sh /tmp/create-topics.sh /tmp/versions.sh /usr/bin && sync && /tmp/download-kafka.sh && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME} && rm /tmp/* && wget https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VERSION}/glibc-${GLIBC_VERSION}.apk && apk add --no-cache --allow-untrusted glibc-${GLIBC_VERSION}.apk && rm glibc-${GLIBC_VERSION}.apk' returned a non-zero code: 2
原因は*.sh
の改行コードがCRLF
のため発生するらしい。windowsの場合はgit clone
の際にcore.autocrlf false
だと改行コードが自動的にCRLF
になる。なのでcore.autocrlf true
で再度clone
しなおせばよい。詳しくは以下を参照。
Invalid url in bootstrap.servers:
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost --list Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:499) at org.apache.kafka.clients.admin.Admin.create(Admin.java:63) at kafka.admin.TopicCommand$AdminClientTopicService$.createAdminClient(TopicCommand.scala:216) at kafka.admin.TopicCommand$AdminClientTopicService$.apply(TopicCommand.scala:220) at kafka.admin.TopicCommand$.main(TopicCommand.scala:57) at kafka.admin.TopicCommand.main(TopicCommand.scala) Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: localhost at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:59) at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:455) ... 5 more
-bootstrap-server
にはポート番号も必要。
× --bootstrap-server localhost 〇 --bootstrap-server localhost:9092
Can't convert value of class xxxx to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class kafka.sample.SampleData to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer Caused by: java.lang.ClassCastException: class kafka.sample.SampleData cannot be cast to class java.lang.String (kafka.sample.SampleData is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @43180ffa; java.lang.String is in module java.base of loader 'bootstrap') at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.5.1.jar:na] at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.5.1.jar:na]
POJOからJSONにシリアライズする場合、プロパティでそのための指定をする必要がある。
java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582) ~[na:na]
JSONを使用する場合、依存性にcom.fasterxml.jackson.core:jackson-databind
を追加する必要がある。
Cannot convert from [java.lang.String] to [xxxx]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [kafka.sample.SampleData] for GenericMessage [payload={"id":"id001","value":334}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32f00752, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=mytopic, kafka_receivedTimestamp=1603606820608, __TypeId__=[B@1a73802f, kafka_groupId=myGroup}] at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]
JSONからPOJOにデシリアライズする場合、プロパティでそのための指定をする必要がある。
The class 'xxxx' is not in the trusted packages
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition mytopic-0 at offset 10. If needed, please seek past the record to continue consumption. Caused by: java.lang.IllegalArgumentException: The class 'kafka.sample.SampleData' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*). at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.5.6.RELEASE.jar:2.5.6.RELEASE]
デシリアライズの際に対象クラスは信頼されたpackageに含める必要がある。