spring-kafkaのプロパティspring.kafka.consumer.auto-offset-reset
の挙動の違いを確認する。
以下はorg.apache.kafka.clients.consumer.ConsumerConfig#AUTO_OFFSET_RESET_DOC
の抜粋だけど https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.resetと同じ事が書いてある。
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
- none: throw exception to the consumer if no previous offset is found for the consumer's group
- anything else: throw exception to the consumer.
以下抄訳。
Kafkaのoffsetが未初期化またはcurrent offsetがサーバに無い(ex. 削除済みなど)場合の挙動。
- earliest: 最初のoffsetに自動リセット
- latest: 最新のoffsetに自動リセット
- none: consumer groupにoffsetがまだ無い場合consumerに例外をスロー
- anything else: consumerに例外スロー
挙動確認ソースコード
単にconsumeするだけのspring-bootをつくる。
plugins { id 'org.springframework.boot' version '2.3.4.RELEASE' id 'io.spring.dependency-management' version '1.0.10.RELEASE' id 'java' } group = 'com.example' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.kafka:spring-kafka' compileOnly 'org.projectlombok:lombok' developmentOnly 'org.springframework.boot:spring-boot-devtools' annotationProcessor 'org.projectlombok:lombok' testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } testImplementation 'org.springframework.kafka:spring-kafka-test' // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind implementation 'com.fasterxml.jackson.core:jackson-databind' } test { useJUnitPlatform() }
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @Component public class StringConsumer { @KafkaListener(topics = "mytopic") public void processMessage(String content, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) long offset) { System.out.println(content + " PARTITION=" + partition + " OFFSET=" + offset); } }
以下のプロパティファイルの値を変更して動作を確認していく。
spring.kafka.bootstrap-servers=localhost:19092 spring.kafka.consumer.group-id=myGroup #spring.kafka.consumer.auto-offset-reset=earliest #spring.kafka.consumer.auto-offset-reset=latest #spring.kafka.consumer.auto-offset-reset=none #spring.kafka.consumer.auto-offset-reset=aaaa
spring.kafka.consumer.auto-offset-reset
latest
topicを新規作成してメッセージをいくつかpublishしておく。
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --create --partitions=3 --topic=mytopic Created topic mytopic. bash-4.4# $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic >1 >2 (中略) >10
次にconsumerを起動する。
2021-01-17 15:17:09.134 INFO 7564 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : myGroup: partitions assigned: [mytopic-2, mytopic-1, mytopic-0]
何もconsumeされない。
このように、デフォルトのlatest
だと最新のメッセージからconsumeする。つまり、すでにtopicにあるメッセージはconsumeしない。
consumerを一旦終了し、再度publishして、もう一度consumerを起動する。
$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic >11 (中略) >15
o.s.k.l.KafkaMessageListenerContainer : myGroup: partitions assigned: [mytopic-2, mytopic-1, mytopic-0] 15 PARTITION=2 OFFSET=2 (中略) 14 PARTITION=0 OFFSET=4
consumeされる。
これはconsumer-groupにcurrent-offsetが記録された状態なので、そのoffsetからconsumeされる。
earliest
topicを削除してもう一度作り直す。
bash-4.4# $KAFKA_HOME/bin/kaftopics.sh --bootstrap-server=localhost:9092 --delete --topic=mytopic bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --create --partitions=3 --topic=mytopic Created topic mytopic.
先ほどと同様にいくつかのメッセージをpublishする。
bash-4.4# $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic >1 (中略) >10
consumerを起動する。
o.s.k.l.KafkaMessageListenerContainer : myGroup: partitions assigned: [mytopic-2, mytopic-1, mytopic-0] 3 PARTITION=2 OFFSET=0 4 PARTITION=2 OFFSET=1 (中略) 8 PARTITION=0 OFFSET=2 9 PARTITION=0 OFFSET=3
メッセージがconsumeされる。
このように、earliest
だと最初のメッセージからconsumeする。つまり、すでにtopicにあるメッセージをconsumeする。より正確に言うと、このエントリでは試してないが、未削除メッセージのうち最初のものから、になると思われる。
consumerを一旦終了し、再度publishして、もう一度consumerを起動する。
Cbash-4.4# $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic >12 (中略) >15
o.s.k.l.KafkaMessageListenerContainer : myGroup: partitions assigned: [mytopic-2, mytopic-1, mytopic-0] 12 PARTITION=2 OFFSET=4 (中略) 15 PARTITION=0 OFFSET=6
consumeされる。
consumer-groupにcurrent-offsetが記録された状態での動作は同様となる。
none
topicを削除してもう一度作り直す。
$KAFKA_HOME/bin/kaftopics.sh --bootstrap-server=localhost:9092 --delete092 --topic=mytopic bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --create --partitions=3 --topic=mytopic Created topic mytopic.
先ほどと同様にいくつかのメッセージをpublishする。
$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic >1 (中略) >5
consumerを起動すると以下のような例外がスローされる。
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [mytopic-2, mytopic-1, mytopic-0] at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:658) ~[kafka-clients-2.5.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2391) ~[kafka-clients-2.5.1.jar:na]
エラーメッセージの通り、none
はまだoffsetが記録されていない状態だとエラーになる。
それ以外
上記以外の値を設定すると以下の例外がスローされる。
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value aaaa for configuration auto.offset.reset: String must be one of: latest, earliest, none at org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:941) ~[kafka-clients-2.5.1.jar:na]
雑感
はじめてkafkaさわったとき、topic作ってpublishしてconsumeしたら何も来ない……何故? となった。なので、なぜデフォルト値はearliest
でなくてlatest
なのだろう、と疑問だった。
ただ、kafkaの主用途考えると、次々メッセージがpublishされる状態で新規consumer-groupを追加するとき、latest
で起動したいはず。たとえば、リアルタイムのログ分析とかならそうなるはず。対してearliest
だと全データが来てしまう。リアルタイムのログ分析で一週間前のデータから来られても意味が無いし、現時点まで大量にデータがあったら困った事になりかねない。
てことで、デフォルトlatest
で状況に応じてearliest
にしてね、って事と思われる。
なので、例えばバッチ処理的なデータバスとして使う場合、初回起動時は先にconsumer-groupを作っておくのが安全と思われる。もし、先にpublishしてからconsumerを起動するとそれ以前のメッセージがconsumeされない、とかになりかねない。