リファレンスとしては https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters のあたり。consumerのエラー発生時に別のtopicにメッセージをpublishする。
plugins { id 'org.springframework.boot' version '2.3.4.RELEASE' id 'io.spring.dependency-management' version '1.0.10.RELEASE' id 'java' } group = 'com.example' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.kafka:spring-kafka' compileOnly 'org.projectlombok:lombok' developmentOnly 'org.springframework.boot:spring-boot-devtools' annotationProcessor 'org.projectlombok:lombok' testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } testImplementation 'org.springframework.kafka:spring-kafka-test' // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind implementation 'com.fasterxml.jackson.core:jackson-databind' } test { useJUnitPlatform() }
spring.kafka.bootstrap-servers=localhost:19092 spring.kafka.consumer.group-id=myGroup
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.SeekToCurrentErrorHandler; import org.springframework.util.backoff.FixedBackOff; @SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @Bean public ErrorHandler eee(KafkaOperations<String, String> template) { DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L)); return errorHandler; } }
細かい設定はリファレンスに譲るとして。DeadLetterPublishingRecoverer
を設定したErrorHandler
のbeanを作ればそれを使ってエラーハンドリングしてくれる。
動作確認としては、Integer
を受け取るconsumerを作成して非数値のメッセージをpublish、とかが手軽と思われる。
@Component public class StringConsumer { @KafkaListener(topics = "mytopic") public void processMessage(Integer content) { System.out.println("process" + content); } }
デフォルトではdead-letter-topicの名前はオリジナルのtopic + .DLT
となる。環境にも依ると思うがこのtopicは自動的に作成される。
このtopicを変更するにはDeadLetterPublishingRecoverer
コンストラクタの第二引数で指定する。
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> deadLetterTopicResolver = (cr, e) -> new TopicPartition("dead-letter-topic-name", cr.partition()); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template, deadLetterTopicResolver);
実際に使用するには他にもいろいろな設定が必要となる。基本的な事はドキュメント、あとはDeadLetterPublishingRecovere
とかのソースコード読む事になると思われる。