kagamihogeの日記

kagamihogeの日記です。

spring-cloud-stream-binder-kafkaのdead-letter-queue

リファレンスとしては https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#kafka-dlq-processing のあたり。

plugins {
  id 'org.springframework.boot' version '2.4.0'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter'
  implementation 'org.springframework.kafka:spring-kafka'
//  implementation 'com.fasterxml.jackson.core:jackson-databind'
  implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka:3.1.0'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.kafka:spring-kafka-test'
}
test {
  useJUnitPlatform()
}
spring.cloud.stream.kafka.binder.brokers=localhost:32770

spring.cloud.stream.bindings.input.group=java-consumer-group
spring.cloud.stream.bindings.input.destination=dlqtest

spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlqName=dlqtest-dlq

enableDlqでdead-letter-queueをONにする。そのtopicはdlqNameで指定。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SampleApplication {
  public static void main(String[] args) {
    SpringApplication.run(SampleApplication.class, args);
  }
}

動作確認用にわざとエラーになるconsumerを用意する。これでdlqtestにpublishすればdlqtest-dlqにメッセージがpublishされる。

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class) // deprecated
public class SampleListener {
  @StreamListener(Sink.INPUT) // deprecated
  public void process(String message) {
    System.out.println(message);
    throw new RuntimeException("eee");
  }
}

なお@StreamListener@EnableBindingはdeprecatedになっている。javadocとかリファレンス見ると、functional styleで書いてね、とある。が、それについてはこのエントリは触れない(調べてない)。

spring-kafkaでconsumerのエラー時にdead-letter-topicにpublish

リファレンスとしては https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters のあたり。consumerのエラー発生時に別のtopicにメッセージをpublishする。

plugins {
  id 'org.springframework.boot' version '2.3.4.RELEASE'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter'
  implementation 'org.springframework.kafka:spring-kafka'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
  testImplementation 'org.springframework.kafka:spring-kafka-test'
  
  // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
implementation 'com.fasterxml.jackson.core:jackson-databind'
  
}
test {
  useJUnitPlatform()
}
spring.kafka.bootstrap-servers=localhost:19092
spring.kafka.consumer.group-id=myGroup
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public ErrorHandler eee(KafkaOperations<String, String> template) {
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);

        ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));
        return errorHandler;
    }
}

細かい設定はリファレンスに譲るとして。DeadLetterPublishingRecovererを設定したErrorHandlerのbeanを作ればそれを使ってエラーハンドリングしてくれる。

動作確認としては、Integerを受け取るconsumerを作成して非数値のメッセージをpublish、とかが手軽と思われる。

@Component
public class StringConsumer {
    @KafkaListener(topics = "mytopic")
    public void processMessage(Integer content) {
        System.out.println("process" + content);
    }
}

デフォルトではdead-letter-topicの名前はオリジナルのtopic + .DLTとなる。環境にも依ると思うがこのtopicは自動的に作成される。

このtopicを変更するにはDeadLetterPublishingRecovererコンストラクタの第二引数で指定する。

BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
deadLetterTopicResolver = (cr, e) -> new TopicPartition("dead-letter-topic-name", cr.partition());
        
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template, deadLetterTopicResolver);

実際に使用するには他にもいろいろな設定が必要となる。基本的な事はドキュメント、あとはDeadLetterPublishingRecovereとかのソースコード読む事になると思われる。

spring-kafkaでoffset指定のconsumer

spring-kafkaでoffsetを明示的に指定してconsumerを起動する。ドキュメントとしては https://docs.spring.io/spring-kafka/docs/current/reference/html/#topicpartition-initial-offset が参考になる。

plugins {
    id 'org.springframework.boot' version '2.4.1'
    id 'io.spring.dependency-management' version '1.0.10.RELEASE'
    id 'java'
}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka'
  implementation 'com.fasterxml.jackson.core:jackson-databind' 
    compileOnly 'org.projectlombok:lombok'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
}

test {
    useJUnitPlatform()
}

起動用のmainクラス。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumerMain {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerMain.class, args);
    }
}

プロパティファイル。独自プロパティpartition2.offsetはあとで使用する。

spring.kafka.bootstrap-servers=localhost:32770
spring.kafka.consumer.group-id=sample-group
partition2.offset=100

offsetを指定して起動

アノテーションと指定インタフェース実装による方法がある。

TopicPartitionアノテーション

サンプルコードは以下の通り。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class MyTopicPartitionListener {
    @KafkaListener(
            topicPartitions={@TopicPartition(topic="mypart3", partitionOffsets={
                    @PartitionOffset(partition="0",initialOffset="227"),
                    @PartitionOffset(partition="1",initialOffset="200"),
                    @PartitionOffset(partition="2",initialOffset="${partition2.offset}")
    })})
    public void processMessage(
            String message,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.OFFSET) int offset) {
        System.out.println("consumer:" + message + " " + partition + " " + offset);
    }
}

この例では3つのパーティションに対してそれぞれ異なるoffsetを指定している。また、partition="2"はSpELでoffsetを取得している。これにより、アプリケーション起動時にVM引数などでoffsetを指定できる。

ConsumerSeekAwareインタフェースのonPartitionsAssignedを実装。

サンプルコードは以下の通り。

import java.util.Map;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class MyOnPartitionsAssigned implements ConsumerSeekAware {

    @KafkaListener(topics = "mypart3")
    public void processMessage(
            String message,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.OFFSET) int offset) {
        System.out.println("consumer:" + message + " " + partition + " " + offset);
    }

    @Override
    public void onPartitionsAssigned(
            Map<org.apache.kafka.common.TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {
        System.out.println("## onPartitionsAssigned");
        assignments.keySet().forEach(tp -> {
            System.out.println("topic=" + tp.topic() + " partition=" + tp.partition());
            callback.seek(tp.topic(), tp.partition(), 120);
        });
    }
}

onPartitionsAssignedはこのconsumerにパーティションが割り当てられた際に呼び出される。より正確に言うと、パーティション割り当てが変更された際に呼び出される。なので、起動時は当然ながらconsumerが追加されたり削除されたりした場合にも呼び出される。なので割り当てパーティションが変化した際に動的にoffsetを指定したい場合はこちらを使用する。

上記コードはoffsetに固定の120を指定しているが、DBなりファイルなりからoffsetを取得すれば、動的なoffset指定が実現できる。

consumerを追加・削除した際のonPartitionsAssignedのログはこんな感じ。なおパーティション割り当てアルゴリズムの設定値によって同じ結果にならないと思われる。

// 起動時 - 3パーティションが割り当て
sample-group: partitions assigned: [mypart3-2, mypart3-0, mypart3-1]
// consumer追加 - 2番だけになった。
sample-group: partitions assigned: [mypart3-2]
// 追加したconsumer削除 - 3つに戻った
sample-group: partitions assigned: [mypart3-2, mypart3-0, mypart3-1]