kagamihogeの日記

kagamihogeの日記です。

spring.kafka.consumer.auto-offset-resetの挙動確認

spring-kafkaのプロパティspring.kafka.consumer.auto-offset-resetの挙動の違いを確認する。

以下はorg.apache.kafka.clients.consumer.ConsumerConfig#AUTO_OFFSET_RESET_DOCの抜粋だけど https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.resetと同じ事が書いてある。

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • anything else: throw exception to the consumer.

以下抄訳。

Kafkaのoffsetが未初期化またはcurrent offsetがサーバに無い(ex. 削除済みなど)場合の挙動。

  • earliest: 最初のoffsetに自動リセット
  • latest: 最新のoffsetに自動リセット
  • none: consumer groupにoffsetがまだ無い場合consumerに例外をスロー
  • anything else: consumerに例外スロー

挙動確認ソースコード

単にconsumeするだけのspring-bootをつくる。

plugins {
  id 'org.springframework.boot' version '2.3.4.RELEASE'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}
repositories {
  mavenCentral()
}
dependencies {
  implementation 'org.springframework.boot:spring-boot-starter'
  implementation 'org.springframework.kafka:spring-kafka'
  compileOnly 'org.projectlombok:lombok'
  developmentOnly 'org.springframework.boot:spring-boot-devtools'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
  testImplementation 'org.springframework.kafka:spring-kafka-test'
  
  // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
implementation 'com.fasterxml.jackson.core:jackson-databind'
  
}
test {
  useJUnitPlatform()
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class StringConsumer {
    @KafkaListener(topics = "mytopic")
    public void processMessage(String content, 
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.OFFSET) long offset) {
        System.out.println(content + " PARTITION=" + partition + " OFFSET=" + offset);
    }
}

以下のプロパティファイルの値を変更して動作を確認していく。

spring.kafka.bootstrap-servers=localhost:19092
spring.kafka.consumer.group-id=myGroup

#spring.kafka.consumer.auto-offset-reset=earliest
#spring.kafka.consumer.auto-offset-reset=latest
#spring.kafka.consumer.auto-offset-reset=none
#spring.kafka.consumer.auto-offset-reset=aaaa

spring.kafka.consumer.auto-offset-reset

latest

topicを新規作成してメッセージをいくつかpublishしておく。

bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --create --partitions=3 --topic=mytopic
Created topic mytopic.
bash-4.4# $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic
>1
>2
(中略)
>10

次にconsumerを起動する。

2021-01-17 15:17:09.134  INFO 7564 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : myGroup: partitions assigned: [mytopic-2, mytopic-1, mytopic-0]

何もconsumeされない。

このように、デフォルトのlatestだと最新のメッセージからconsumeする。つまり、すでにtopicにあるメッセージはconsumeしない。

consumerを一旦終了し、再度publishして、もう一度consumerを起動する。

$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic
>11
(中略)
>15
o.s.k.l.KafkaMessageListenerContainer    : myGroup: partitions assigned: [mytopic-2, mytopic-1, mytopic-0]
15 PARTITION=2 OFFSET=2
(中略)
14 PARTITION=0 OFFSET=4

consumeされる。

これはconsumer-groupにcurrent-offsetが記録された状態なので、そのoffsetからconsumeされる。

earliest

topicを削除してもう一度作り直す。

bash-4.4# $KAFKA_HOME/bin/kaftopics.sh --bootstrap-server=localhost:9092 --delete --topic=mytopic
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --create --partitions=3 --topic=mytopic
Created topic mytopic.

先ほどと同様にいくつかのメッセージをpublishする。

bash-4.4# $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic
>1
(中略)
>10

consumerを起動する。

o.s.k.l.KafkaMessageListenerContainer    : myGroup: partitions assigned: [mytopic-2, mytopic-1, mytopic-0]
3 PARTITION=2 OFFSET=0
4 PARTITION=2 OFFSET=1
(中略)
8 PARTITION=0 OFFSET=2
9 PARTITION=0 OFFSET=3

メッセージがconsumeされる。

このように、earliestだと最初のメッセージからconsumeする。つまり、すでにtopicにあるメッセージをconsumeする。より正確に言うと、このエントリでは試してないが、未削除メッセージのうち最初のものから、になると思われる。

consumerを一旦終了し、再度publishして、もう一度consumerを起動する。

Cbash-4.4# $KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic
>12
(中略)
>15
o.s.k.l.KafkaMessageListenerContainer    : myGroup: partitions assigned: [mytopic-2, mytopic-1, mytopic-0]
12 PARTITION=2 OFFSET=4
(中略)
15 PARTITION=0 OFFSET=6

consumeされる。

consumer-groupにcurrent-offsetが記録された状態での動作は同様となる。

none

topicを削除してもう一度作り直す。

$KAFKA_HOME/bin/kaftopics.sh --bootstrap-server=localhost:9092 --delete092 --topic=mytopic
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server=localhost:9092 --create --partitions=3 --topic=mytopic
Created topic mytopic.

先ほどと同様にいくつかのメッセージをpublishする。

$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server=localhost:9092 --topic=mytopic
>1
(中略)
>5

consumerを起動すると以下のような例外がスローされる。

org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [mytopic-2, mytopic-1, mytopic-0]
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:658) ~[kafka-clients-2.5.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2391) ~[kafka-clients-2.5.1.jar:na]

エラーメッセージの通り、noneはまだoffsetが記録されていない状態だとエラーになる。

それ以外

上記以外の値を設定すると以下の例外がスローされる。

Caused by: org.apache.kafka.common.config.ConfigException: Invalid value aaaa for configuration auto.offset.reset: String must be one of: latest, earliest, none
    at org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:941) ~[kafka-clients-2.5.1.jar:na]

雑感

はじめてkafkaさわったとき、topic作ってpublishしてconsumeしたら何も来ない……何故? となった。なので、なぜデフォルト値はearliestでなくてlatestなのだろう、と疑問だった。

ただ、kafkaの主用途考えると、次々メッセージがpublishされる状態で新規consumer-groupを追加するとき、latestで起動したいはず。たとえば、リアルタイムのログ分析とかならそうなるはず。対してearliestだと全データが来てしまう。リアルタイムのログ分析で一週間前のデータから来られても意味が無いし、現時点まで大量にデータがあったら困った事になりかねない。

てことで、デフォルトlatestで状況に応じてearliestにしてね、って事と思われる。

なので、例えばバッチ処理的なデータバスとして使う場合、初回起動時は先にconsumer-groupを作っておくのが安全と思われる。もし、先にpublishしてからconsumerを起動するとそれ以前のメッセージがconsumeされない、とかになりかねない。