kagamihogeの日記

kagamihogeの日記です。

Spring Boot + Apache Kafkaのチュートリアルレベルの事をやる

Spring BootからApache Kafkaにpublish/subscribeする。

環境

dockerでkafka起動

qiita.com

上記を参考に構築する。本エントリで関係する書き換え箇所は以下の通り。

  • KAFKA_ADVERTISED_HOST_NAME - ホストのローカルアドレスを指定。
  • "19092:9092" - 後々、ホストのSpring Bootからアクセスする際にポートを指定する。なので適当な値でポートフォワーディングしている。
  • KAFKA_CREATE_TOPICS - 起動時に動作確認用のtopicを作成。
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: .
    ports:
      - "19092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.10.210
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "Topic:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

CLIでpublish/subscribeが出来るか動作確認する。

まずkafkaのdockerに入る。

docker exec -it kafka-docker_kafka_1 bash

topicの一覧を取得する。KAFKA_CREATE_TOPICSで指定したtopicが作られているのが分かる。

bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Topic

topicの新規作成。

bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --create --topic=mytopic
Created topic mytopic.

topicにpublishする。

bash-4.4# $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic
>sad
>

topicからconsumeする。

bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic=mytopic

Spring Boot

上記で構築したkafkaにSpring Bootからpublish/subscribeする。

docs.spring.io

基本的なことはSpring Bootのドキュメントに記載がある。

string

まず単純な文字列のpublish/subscribeをする。

gradleに関連する依存性などを追加する。Spring Initializr https://start.spring.io/ で必要なものを追加する。Dependenciesにkafkaと入力すると、Spring for Apache KafkaとSpring for Apache Kafka Streamsが出てくるが、とりあえずはSpring for Apache Kafkaだけで良い。

plugins {
  id 'org.springframework.boot' version '2.3.4.RELEASE'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter'
  implementation 'org.springframework.kafka:spring-kafka'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
  testImplementation 'org.springframework.kafka:spring-kafka-test'  
}
test {
  useJUnitPlatform()
}

application.propertiesで設定を行う。

spring.kafka.bootstrap-servers=localhost:19092
spring.kafka.consumer.group-id=myGroup

Spring Boot起動時にpublishを一回行うMainクラスを作成する。

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import lombok.AllArgsConstructor;

@SpringBootApplication
@AllArgsConstructor
public class DemoApplication implements CommandLineRunner {
    StringPublisher publisher;
    
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        publisher.publish();
    }
}

publishするクラス。

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import lombok.AllArgsConstructor;

@Component
@AllArgsConstructor
public class StringPublisher {
    KafkaTemplate<String, String> kafkaTemplate;

    void publish() {
        kafkaTemplate.send("mytopic", "message");
    }
}

consumeするクラス。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class StringConsumer {
    @KafkaListener(topics = "mytopic")
    public void processMessage(String content) {
        System.out.println(content);
    }
}

JSON

次にPOJOのpublish/subscribeをする。実際にはPOJOからJSONシリアライズ・デシリアライズをして通信を行う。なので、まずjackson-databindの依存性を追加する。

implementation 'com.fasterxml.jackson.core:jackson-databind'

application.propertiesJSONシリアライズ・デシリアライズの設定を追加する。デフォルトはStringにシリアライズ・デシリアライズするため、これをJSONに切り替える。また、そのJSONクラスは信頼されたpackageに入れる必要がある。今回は説明簡易化のため、すべて信頼するように*を指定している。

pring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

JSON用のMainクラスを作成する。

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import lombok.AllArgsConstructor;

@SpringBootApplication
@AllArgsConstructor
public class DemoApplication implements CommandLineRunner {
    JsonPublisher publisher;
    
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        publisher.publish();
    }
}

POJOのクラス。

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SampleData {
    String id;
    int value;
}

pubishをするクラス。

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import lombok.AllArgsConstructor;

@Component
@AllArgsConstructor
public class JsonPublisher {
    KafkaTemplate<String, SampleData> kafkaTemplate;

    void publish() {
        kafkaTemplate.send("mytopic", new SampleData("id001", 334));
    }
}

consumeをするクラス。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class JsonConsumer {
    @KafkaListener(topics = "mytopic")
    public void processMessage(SampleData content) {
        System.out.println(content);
    }
}

ハマったところ

/bin/sh: illegal option -

docker-compose up -d --buildの際に以下のエラーが発生した。

OK: 349 MiB in 74 packages
/bin/sh: illegal option -
ERROR: Service 'kafka' failed to build : The command '/bin/sh -c apk add --no-cache bash curl jq docker  && chmod a+x /tmp/*.sh  && mv /tmp/start-kafka.sh /tmp/broker-list.sh /tmp/create-topics.sh /tmp/versions.sh /usr/bin  && sync && /tmp/download-kafka.sh  && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt  && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz  && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME}  && rm /tmp/*  && wget https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VERSION}/glibc-${GLIBC_VERSION}.apk  && apk add --no-cache --allow-untrusted glibc-${GLIBC_VERSION}.apk  && rm glibc-${GLIBC_VERSION}.apk' returned a non-zero code: 2

原因は*.shの改行コードがCRLFのため発生するらしい。windowsの場合はgit cloneの際にcore.autocrlf falseだと改行コードが自動的にCRLFになる。なのでcore.autocrlf trueで再度cloneしなおせばよい。詳しくは以下を参照。

github.com

Invalid url in bootstrap.servers:

bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost --list
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:499)
        at org.apache.kafka.clients.admin.Admin.create(Admin.java:63)
        at kafka.admin.TopicCommand$AdminClientTopicService$.createAdminClient(TopicCommand.scala:216)
        at kafka.admin.TopicCommand$AdminClientTopicService$.apply(TopicCommand.scala:220)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:57)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: localhost
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:59)
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:455)
        ... 5 more

-bootstrap-serverにはポート番号も必要。

× --bootstrap-server localhost
〇 --bootstrap-server localhost:9092

Can't convert value of class xxxx to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class kafka.sample.SampleData to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: class kafka.sample.SampleData cannot be cast to class java.lang.String (kafka.sample.SampleData is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @43180ffa; java.lang.String is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.5.1.jar:na]

POJOからJSONシリアライズする場合、プロパティでそのための指定をする必要がある。

java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper

Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582) ~[na:na]

JSONを使用する場合、依存性にcom.fasterxml.jackson.core:jackson-databindを追加する必要がある。

Cannot convert from [java.lang.String] to [xxxx]

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [kafka.sample.SampleData] for GenericMessage [payload={"id":"id001","value":334}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32f00752, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=mytopic, kafka_receivedTimestamp=1603606820608, __TypeId__=[B@1a73802f, kafka_groupId=myGroup}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]

JSONからPOJOにデシリアライズする場合、プロパティでそのための指定をする必要がある。

The class 'xxxx' is not in the trusted packages

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition mytopic-0 at offset 10. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'kafka.sample.SampleData' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.5.6.RELEASE.jar:2.5.6.RELEASE]

シリアライズの際に対象クラスは信頼されたpackageに含める必要がある。

参考文献