spring-kafkaでoffsetを明示的に指定してconsumerを起動する。ドキュメントとしては https://docs.spring.io/spring-kafka/docs/current/reference/html/#topicpartition-initial-offset が参考になる。
plugins { id 'org.springframework.boot' version '2.4.1' id 'io.spring.dependency-management' version '1.0.10.RELEASE' id 'java' } group = 'com.example' version = '0.0.1-SNAPSHOT' sourceCompatibility = '11' configurations { compileOnly { extendsFrom annotationProcessor } } repositories { mavenCentral() } dependencies { implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.kafka:spring-kafka' implementation 'com.fasterxml.jackson.core:jackson-databind' compileOnly 'org.projectlombok:lombok' developmentOnly 'org.springframework.boot:spring-boot-devtools' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.kafka:spring-kafka-test' } test { useJUnitPlatform() }
起動用のmainクラス。
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ConsumerMain { public static void main(String[] args) { SpringApplication.run(ConsumerMain.class, args); } }
プロパティファイル。独自プロパティpartition2.offset
はあとで使用する。
spring.kafka.bootstrap-servers=localhost:32770 spring.kafka.consumer.group-id=sample-group partition2.offset=100
offsetを指定して起動
アノテーションと指定インタフェース実装による方法がある。
TopicPartitionアノテーション
サンプルコードは以下の通り。
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.PartitionOffset; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @Component public class MyTopicPartitionListener { @KafkaListener( topicPartitions={@TopicPartition(topic="mypart3", partitionOffsets={ @PartitionOffset(partition="0",initialOffset="227"), @PartitionOffset(partition="1",initialOffset="200"), @PartitionOffset(partition="2",initialOffset="${partition2.offset}") })}) public void processMessage( String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) int offset) { System.out.println("consumer:" + message + " " + partition + " " + offset); } }
この例では3つのパーティションに対してそれぞれ異なるoffsetを指定している。また、partition="2"
はSpELでoffsetを取得している。これにより、アプリケーション起動時にVM引数などでoffsetを指定できる。
ConsumerSeekAwareインタフェースのonPartitionsAssignedを実装。
サンプルコードは以下の通り。
import java.util.Map; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.ConsumerSeekAware; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @Component public class MyOnPartitionsAssigned implements ConsumerSeekAware { @KafkaListener(topics = "mypart3") public void processMessage( String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) int offset) { System.out.println("consumer:" + message + " " + partition + " " + offset); } @Override public void onPartitionsAssigned( Map<org.apache.kafka.common.TopicPartition, Long> assignments, ConsumerSeekCallback callback) { System.out.println("## onPartitionsAssigned"); assignments.keySet().forEach(tp -> { System.out.println("topic=" + tp.topic() + " partition=" + tp.partition()); callback.seek(tp.topic(), tp.partition(), 120); }); } }
onPartitionsAssigned
はこのconsumerにパーティションが割り当てられた際に呼び出される。より正確に言うと、パーティション割り当てが変更された際に呼び出される。なので、起動時は当然ながらconsumerが追加されたり削除されたりした場合にも呼び出される。なので割り当てパーティションが変化した際に動的にoffsetを指定したい場合はこちらを使用する。
上記コードはoffsetに固定の120
を指定しているが、DBなりファイルなりからoffsetを取得すれば、動的なoffset指定が実現できる。
consumerを追加・削除した際のonPartitionsAssigned
のログはこんな感じ。なおパーティション割り当てアルゴリズムの設定値によって同じ結果にならないと思われる。
// 起動時 - 3パーティションが割り当て sample-group: partitions assigned: [mypart3-2, mypart3-0, mypart3-1] // consumer追加 - 2番だけになった。 sample-group: partitions assigned: [mypart3-2] // 追加したconsumer削除 - 3つに戻った sample-group: partitions assigned: [mypart3-2, mypart3-0, mypart3-1]