Kafka 的消費者可以使用兩種方式來提交消費位移(offset):自動提交和手動提交。自動提交是由 Kafka 客戶端自動定期提交位移,而手動提交則需要應(yīng)用程序顯式地調(diào)用 API 來提交位移。手動提交位移的方式可以更精細地控制消費位移,以及避免因自動提交位移而產(chǎn)生的數(shù)據(jù)丟失或重復(fù)消費等問題。
下面是使用 Kafka Java API 手動提交位移的一些示例代碼:
1.啟用手動提交位移:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 關(guān)閉自動提交位移
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
在創(chuàng)建 KafkaConsumer 對象時,將 enable.auto.commit 屬性設(shè)置為 false,以關(guān)閉自動提交位移的功能。
2.手動提交位移:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 處理消息
}
consumer.commitSync(); // 手動提交位移
}
在消費消息后,調(diào)用 commitSync() 方法來手動提交位移。如果需要批量提交位移,可以使用 commitSync(Map<topicpartition, offsetandmetadata=""> offsets) 方法來提交指定的分區(qū)和位移信息。
需要注意的是,手動提交位移需要在適當(dāng)?shù)臅r機進行提交,以確保數(shù)據(jù)不會丟失或重復(fù)消費。一般來說,可以在消費一批消息后,或者在處理完一段業(yè)務(wù)邏輯后,再進行位移提交。同時,還需要注意位移的提交順序,以保證數(shù)據(jù)的一致性。