リファレンスとしては https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#kafka-dlq-processing のあたり。
plugins { id 'org.springframework.boot' version '2.4.0' id 'io.spring.dependency-management' version '1.0.10.RELEASE' id 'java' } group = 'com.example' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.kafka:spring-kafka' // implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka:3.1.0' compileOnly 'org.projectlombok:lombok' developmentOnly 'org.springframework.boot:spring-boot-devtools' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.kafka:spring-kafka-test' } test { useJUnitPlatform() }
spring.cloud.stream.kafka.binder.brokers=localhost:32770 spring.cloud.stream.bindings.input.group=java-consumer-group spring.cloud.stream.bindings.input.destination=dlqtest spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true spring.cloud.stream.kafka.bindings.input.consumer.dlqName=dlqtest-dlq
enableDlq
でdead-letter-queueをONにする。そのtopicはdlqName
で指定。
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SampleApplication { public static void main(String[] args) { SpringApplication.run(SampleApplication.class, args); } }
動作確認用にわざとエラーになるconsumerを用意する。これでdlqtest
にpublishすればdlqtest-dlq
にメッセージがpublishされる。
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component; @Component @EnableBinding(Sink.class) // deprecated public class SampleListener { @StreamListener(Sink.INPUT) // deprecated public void process(String message) { System.out.println(message); throw new RuntimeException("eee"); } }
なお@StreamListener
と@EnableBinding
はdeprecatedになっている。javadocとかリファレンス見ると、functional styleで書いてね、とある。が、それについてはこのエントリは触れない(調べてない)。