kagamihogeの日記

kagamihogeの日記です。

spring-cloud-stream-binder-kafkaのdead-letter-queue

リファレンスとしては https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#kafka-dlq-processing のあたり。

plugins {
  id 'org.springframework.boot' version '2.4.0'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter'
  implementation 'org.springframework.kafka:spring-kafka'
//  implementation 'com.fasterxml.jackson.core:jackson-databind'
  implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka:3.1.0'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'org.springframework.kafka:spring-kafka-test'
}
test {
  useJUnitPlatform()
}
spring.cloud.stream.kafka.binder.brokers=localhost:32770

spring.cloud.stream.bindings.input.group=java-consumer-group
spring.cloud.stream.bindings.input.destination=dlqtest

spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlqName=dlqtest-dlq

enableDlqでdead-letter-queueをONにする。そのtopicはdlqNameで指定。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SampleApplication {
  public static void main(String[] args) {
    SpringApplication.run(SampleApplication.class, args);
  }
}

動作確認用にわざとエラーになるconsumerを用意する。これでdlqtestにpublishすればdlqtest-dlqにメッセージがpublishされる。

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class) // deprecated
public class SampleListener {
  @StreamListener(Sink.INPUT) // deprecated
  public void process(String message) {
    System.out.println(message);
    throw new RuntimeException("eee");
  }
}

なお@StreamListener@EnableBindingはdeprecatedになっている。javadocとかリファレンス見ると、functional styleで書いてね、とある。が、それについてはこのエントリは触れない(調べてない)。