kagamihogeの日記

kagamihogeの日記です。

spring-kafkaでoffset指定のconsumer

spring-kafkaでoffsetを明示的に指定してconsumerを起動する。ドキュメントとしては https://docs.spring.io/spring-kafka/docs/current/reference/html/#topicpartition-initial-offset が参考になる。

plugins {
    id 'org.springframework.boot' version '2.4.1'
    id 'io.spring.dependency-management' version '1.0.10.RELEASE'
    id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka'
  implementation 'com.fasterxml.jackson.core:jackson-databind' 
    compileOnly 'org.projectlombok:lombok'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

test {
    useJUnitPlatform()
}

起動用のmainクラス。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumerMain {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerMain.class, args);
    }
}

プロパティファイル。独自プロパティpartition2.offsetはあとで使用する。

spring.kafka.bootstrap-servers=localhost:32770
spring.kafka.consumer.group-id=sample-group
partition2.offset=100

offsetを指定して起動

アノテーションと指定インタフェース実装による方法がある。

TopicPartitionアノテーション

サンプルコードは以下の通り。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class MyTopicPartitionListener {
    @KafkaListener(
            topicPartitions={@TopicPartition(topic="mypart3", partitionOffsets={
                    @PartitionOffset(partition="0",initialOffset="227"),
                    @PartitionOffset(partition="1",initialOffset="200"),
                    @PartitionOffset(partition="2",initialOffset="${partition2.offset}")
    })})
    public void processMessage(
            String message,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.OFFSET) int offset) {
        System.out.println("consumer:" + message + " " + partition + " " + offset);
    }
}

この例では3つのパーティションに対してそれぞれ異なるoffsetを指定している。また、partition="2"はSpELでoffsetを取得している。これにより、アプリケーション起動時にVM引数などでoffsetを指定できる。

ConsumerSeekAwareインタフェースのonPartitionsAssignedを実装。

サンプルコードは以下の通り。

import java.util.Map;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class MyOnPartitionsAssigned implements ConsumerSeekAware {

    @KafkaListener(topics = "mypart3")
    public void processMessage(
            String message,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.OFFSET) int offset) {
        System.out.println("consumer:" + message + " " + partition + " " + offset);
    }

    @Override
    public void onPartitionsAssigned(
            Map<org.apache.kafka.common.TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {
        System.out.println("## onPartitionsAssigned");
        assignments.keySet().forEach(tp -> {
            System.out.println("topic=" + tp.topic() + " partition=" + tp.partition());
            callback.seek(tp.topic(), tp.partition(), 120);
        });
    }
}

onPartitionsAssignedはこのconsumerにパーティションが割り当てられた際に呼び出される。より正確に言うと、パーティション割り当てが変更された際に呼び出される。なので、起動時は当然ながらconsumerが追加されたり削除されたりした場合にも呼び出される。なので割り当てパーティションが変化した際に動的にoffsetを指定したい場合はこちらを使用する。

上記コードはoffsetに固定の120を指定しているが、DBなりファイルなりからoffsetを取得すれば、動的なoffset指定が実現できる。

consumerを追加・削除した際のonPartitionsAssignedのログはこんな感じ。なおパーティション割り当てアルゴリズムの設定値によって同じ結果にならないと思われる。

// 起動時 - 3パーティションが割り当て
sample-group: partitions assigned: [mypart3-2, mypart3-0, mypart3-1]
// consumer追加 - 2番だけになった。
sample-group: partitions assigned: [mypart3-2]
// 追加したconsumer削除 - 3つに戻った
sample-group: partitions assigned: [mypart3-2, mypart3-0, mypart3-1]

Spring Bootでjsonプロパティをenumにデシリアライズとvalidation

各種パターン

まず、動作確認用の適当なmainとcontrollerを作成する。

plugins {
  id 'org.springframework.boot' version '2.4.1'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-web'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  
  implementation 'io.springfox:springfox-boot-starter:3.0.0'
  implementation 'org.springframework.boot:spring-boot-starter-validation'
}
test {
  useJUnitPlatform()
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SampleController {
    @PostMapping("/sample")
    public void sample(@RequestBody @Validated SampleRequest request) {
        System.out.println(request);
    }
}

リクエスト格納クラスのプロパティの型を色々変えて試していく。

import lombok.Data;

@Data
public class SampleRequest {
    SimpleEnum simpleEnum;
}
public enum SimpleEnum {
    on, off;
}

単純なenum

上記のとおり、特に何もせずともenumにすればバインドする。上の例だと、"simpleEnum": "on"とか"simpleEnum": "off"が通る。また、"simpleEnum": nullとかjsonプロパティ自体が無い場合はnullになる。

Optionalのenum

Optional<SimpleEnum> optSimpleEnum = Optional.empty();

この場合、"optSimpleEnum": nullを渡すとOptional.emptyになる。また、上記例はデフォルト値を入れており、jsonプロパティ自体が無い場合Optional.emptyになる。デフォルト値が無いとnullになる。

独自マッピングenum

たとえば、onは"1"でoffは"0"のように、enumとは異なる値からマッピングしたい場合。これは@JsonCreatorマッピングを定義する。

import java.util.HashMap;
import java.util.Map;

import com.fasterxml.jackson.annotation.JsonCreator;

public enum ValueEnum {
    on("1"), off("0");
    
    String value;
    ValueEnum(String value) {
        this.value = value;
    }
    
    static Map<String, ValueEnum> BY_VALUE = new HashMap<>();
    static {
        for (ValueEnum e : values()) {
            BY_VALUE.put(e.value, e);
        }
    }
    
    @JsonCreator
    public static ValueEnum create(String value) {
        return BY_VALUE.getOrDefault(value, ValueEnum.off);
    }
}

上記例は、Mapにあらかじめマッピングを保持しておき、@JsonCreatorで文字列からenumへのマッピングを記述している。

なお、nulljsonプロパティ自体が無い場合はnullになる。

独自マッピングのOptionalのenum

Optional<ValueEnum> optValueEnum = Optional.empty();

この場合、nullを渡すとOptional.emptyになる。また、デフォルト値を入れているので、jsonプロパティ自体が無い場合もOptional.emptyになる。

独自マッピングenumのvalidation

妥当な値以外はvalidationエラーにしたい場合。上記例の場合、"0", "1"以外はvalidationエラーとしたい場合を考える。

@ValueEnumConstraint
ValidatedValueEnum validatedValueEnum;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import javax.validation.Constraint;
import javax.validation.Payload;

@Documented
@Constraint(validatedBy = ValueEnumValidator.class)
@Target( { ElementType.METHOD, ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
public @interface ValueEnumConstraint {
    String message() default "Invalid";
    Class<?>[] groups() default {};
    Class<? extends Payload>[] payload() default {};
}
import java.util.Objects;

import javax.validation.ConstraintValidator;
import javax.validation.ConstraintValidatorContext;

public class ValueEnumValidator implements ConstraintValidator<ValueEnumConstraint, ValidatedValueEnum> {

    @Override
    public boolean isValid(ValidatedValueEnum value, ConstraintValidatorContext context) {
        return Objects.nonNull(value);
    }
}
public enum ValidatedValueEnum {
  //(ValueEnumと同一部分は省略)    
    @JsonCreator
    public static ValidatedValueEnum create(String value) {
        return BY_VALUE.getOrDefault(value, null);
    }
}

まず、@JsonCreatorで、妥当な値以外が来た場合はnullを返す。そして、validationはnullが来たらvalidationエラーにする。@JsonCreatorバインディングの後にConstraintValidatorが走る。

ただし、これはnullを許容したい場合にはvalidationエラーとなってしまう。

独自マッピングのOptionalのenumのvalidation

nullを許容するため、まずOptionalにする。

@OptValueEnumConstraint
Optional<OptValidatedValueEnum> optValidatedValueEnum = Optional.empty();

次に、enumにvalidationエラーを示すinvalidを追加する。@JsonCreatorマッピング時に、妥当な値以外はinvalidを返す。

public enum OptValidatedValueEnum {
    on("1"), off("0"), invalid("");
    //(省略) 
    @JsonCreator
    public static OptValidatedValueEnum create(String value) {
        return BY_VALUE.getOrDefault(value, invalid);
    }
}

そして、validationではinvalidが来たらエラーにし、それ以外はOKにする。

public class OptValueEnumValidator implements ConstraintValidator<OptValueEnumConstraint, Optional<OptValidatedValueEnum>> {

    @Override
    public boolean isValid(Optional<OptValidatedValueEnum> value, ConstraintValidatorContext context) {
        if (value.isPresent()) {
            return !(value.get() == invalid);
        }
        return true;
    }
}

これで、妥当な値、ここでは"0", "1", nullおよびjsonプロパティ自体が無い、以外はvalidationエラーになる。

Spring Boot + Apache Kafkaのチュートリアルレベルの事をやる

Spring BootからApache Kafkaにpublish/subscribeする。

環境

dockerでkafka起動

qiita.com

上記を参考に構築する。本エントリで関係する書き換え箇所は以下の通り。

  • KAFKA_ADVERTISED_HOST_NAME - ホストのローカルアドレスを指定。
  • "19092:9092" - 後々、ホストのSpring Bootからアクセスする際にポートを指定する。なので適当な値でポートフォワーディングしている。
  • KAFKA_CREATE_TOPICS - 起動時に動作確認用のtopicを作成。
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: .
    ports:
      - "19092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.10.210
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "Topic:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

CLIでpublish/subscribeが出来るか動作確認する。

まずkafkaのdockerに入る。

docker exec -it kafka-docker_kafka_1 bash

topicの一覧を取得する。KAFKA_CREATE_TOPICSで指定したtopicが作られているのが分かる。

bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Topic

topicの新規作成。

bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --create --topic=mytopic
Created topic mytopic.

topicにpublishする。

bash-4.4# $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic
>sad
>

topicからconsumeする。

bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic=mytopic

Spring Boot

上記で構築したkafkaにSpring Bootからpublish/subscribeする。

docs.spring.io

基本的なことはSpring Bootのドキュメントに記載がある。

string

まず単純な文字列のpublish/subscribeをする。

gradleに関連する依存性などを追加する。Spring Initializr https://start.spring.io/ で必要なものを追加する。Dependenciesにkafkaと入力すると、Spring for Apache KafkaとSpring for Apache Kafka Streamsが出てくるが、とりあえずはSpring for Apache Kafkaだけで良い。

plugins {
  id 'org.springframework.boot' version '2.3.4.RELEASE'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter'
  implementation 'org.springframework.kafka:spring-kafka'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
  testImplementation 'org.springframework.kafka:spring-kafka-test'  
}
test {
  useJUnitPlatform()
}

application.propertiesで設定を行う。

spring.kafka.bootstrap-servers=localhost:19092
spring.kafka.consumer.group-id=myGroup

Spring Boot起動時にpublishを一回行うMainクラスを作成する。

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import lombok.AllArgsConstructor;

@SpringBootApplication
@AllArgsConstructor
public class DemoApplication implements CommandLineRunner {
    StringPublisher publisher;
    
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        publisher.publish();
    }
}

publishするクラス。

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import lombok.AllArgsConstructor;

@Component
@AllArgsConstructor
public class StringPublisher {
    KafkaTemplate<String, String> kafkaTemplate;

    void publish() {
        kafkaTemplate.send("mytopic", "message");
    }
}

consumeするクラス。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class StringConsumer {
    @KafkaListener(topics = "mytopic")
    public void processMessage(String content) {
        System.out.println(content);
    }
}

JSON

次にPOJOのpublish/subscribeをする。実際にはPOJOからJSONシリアライズ・デシリアライズをして通信を行う。なので、まずjackson-databindの依存性を追加する。

implementation 'com.fasterxml.jackson.core:jackson-databind'

application.propertiesJSONシリアライズ・デシリアライズの設定を追加する。デフォルトはStringにシリアライズ・デシリアライズするため、これをJSONに切り替える。また、そのJSONクラスは信頼されたpackageに入れる必要がある。今回は説明簡易化のため、すべて信頼するように*を指定している。

pring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

JSON用のMainクラスを作成する。

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import lombok.AllArgsConstructor;

@SpringBootApplication
@AllArgsConstructor
public class DemoApplication implements CommandLineRunner {
    JsonPublisher publisher;
    
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        publisher.publish();
    }
}

POJOのクラス。

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SampleData {
    String id;
    int value;
}

pubishをするクラス。

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import lombok.AllArgsConstructor;

@Component
@AllArgsConstructor
public class JsonPublisher {
    KafkaTemplate<String, SampleData> kafkaTemplate;

    void publish() {
        kafkaTemplate.send("mytopic", new SampleData("id001", 334));
    }
}

consumeをするクラス。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class JsonConsumer {
    @KafkaListener(topics = "mytopic")
    public void processMessage(SampleData content) {
        System.out.println(content);
    }
}

ハマったところ

/bin/sh: illegal option -

docker-compose up -d --buildの際に以下のエラーが発生した。

OK: 349 MiB in 74 packages
/bin/sh: illegal option -
ERROR: Service 'kafka' failed to build : The command '/bin/sh -c apk add --no-cache bash curl jq docker  && chmod a+x /tmp/*.sh  && mv /tmp/start-kafka.sh /tmp/broker-list.sh /tmp/create-topics.sh /tmp/versions.sh /usr/bin  && sync && /tmp/download-kafka.sh  && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt  && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz  && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME}  && rm /tmp/*  && wget https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VERSION}/glibc-${GLIBC_VERSION}.apk  && apk add --no-cache --allow-untrusted glibc-${GLIBC_VERSION}.apk  && rm glibc-${GLIBC_VERSION}.apk' returned a non-zero code: 2

原因は*.shの改行コードがCRLFのため発生するらしい。windowsの場合はgit cloneの際にcore.autocrlf falseだと改行コードが自動的にCRLFになる。なのでcore.autocrlf trueで再度cloneしなおせばよい。詳しくは以下を参照。

github.com

Invalid url in bootstrap.servers:

bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost --list
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:499)
        at org.apache.kafka.clients.admin.Admin.create(Admin.java:63)
        at kafka.admin.TopicCommand$AdminClientTopicService$.createAdminClient(TopicCommand.scala:216)
        at kafka.admin.TopicCommand$AdminClientTopicService$.apply(TopicCommand.scala:220)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:57)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: localhost
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:59)
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
        at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:455)
        ... 5 more

-bootstrap-serverにはポート番号も必要。

× --bootstrap-server localhost
〇 --bootstrap-server localhost:9092

Can't convert value of class xxxx to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class kafka.sample.SampleData to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: class kafka.sample.SampleData cannot be cast to class java.lang.String (kafka.sample.SampleData is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @43180ffa; java.lang.String is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-2.5.1.jar:na]

POJOからJSONシリアライズする場合、プロパティでそのための指定をする必要がある。

java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper

Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582) ~[na:na]

JSONを使用する場合、依存性にcom.fasterxml.jackson.core:jackson-databindを追加する必要がある。

Cannot convert from [java.lang.String] to [xxxx]

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [kafka.sample.SampleData] for GenericMessage [payload={"id":"id001","value":334}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@32f00752, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=mytopic, kafka_receivedTimestamp=1603606820608, __TypeId__=[B@1a73802f, kafka_groupId=myGroup}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.2.9.RELEASE.jar:5.2.9.RELEASE]

JSONからPOJOにデシリアライズする場合、プロパティでそのための指定をする必要がある。

The class 'xxxx' is not in the trusted packages

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition mytopic-0 at offset 10. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'kafka.sample.SampleData' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.5.6.RELEASE.jar:2.5.6.RELEASE]

シリアライズの際に対象クラスは信頼されたpackageに含める必要がある。

参考文献