kagamihogeの日記

kagamihogeの日記です。

spring-kafkaでconsumerのエラー時にdead-letter-topicにpublish

リファレンスとしては https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters のあたり。consumerのエラー発生時に別のtopicにメッセージをpublishする。

plugins {
  id 'org.springframework.boot' version '2.3.4.RELEASE'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter'
  implementation 'org.springframework.kafka:spring-kafka'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
  testImplementation 'org.springframework.kafka:spring-kafka-test'
  
  // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
implementation 'com.fasterxml.jackson.core:jackson-databind'
  
}
test {
  useJUnitPlatform()
}
spring.kafka.bootstrap-servers=localhost:19092
spring.kafka.consumer.group-id=myGroup
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public ErrorHandler eee(KafkaOperations<String, String> template) {
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);

        ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));
        return errorHandler;
    }
}

細かい設定はリファレンスに譲るとして。DeadLetterPublishingRecovererを設定したErrorHandlerのbeanを作ればそれを使ってエラーハンドリングしてくれる。

動作確認としては、Integerを受け取るconsumerを作成して非数値のメッセージをpublish、とかが手軽と思われる。

@Component
public class StringConsumer {
    @KafkaListener(topics = "mytopic")
    public void processMessage(Integer content) {
        System.out.println("process" + content);
    }
}

デフォルトではdead-letter-topicの名前はオリジナルのtopic + .DLTとなる。環境にも依ると思うがこのtopicは自動的に作成される。

このtopicを変更するにはDeadLetterPublishingRecovererコンストラクタの第二引数で指定する。

BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
deadLetterTopicResolver = (cr, e) -> new TopicPartition("dead-letter-topic-name", cr.partition());
        
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template, deadLetterTopicResolver);

実際に使用するには他にもいろいろな設定が必要となる。基本的な事はドキュメント、あとはDeadLetterPublishingRecovereとかのソースコード読む事になると思われる。