一、groupid的定義
在使用Kafka的時候,我們經(jīng)常會看到group.id這個配置項,它是一個字符串類型的配置項。具體來說,每個消費者都有一個group id,一般情況下我們可以將同樣處理某個數(shù)據(jù)源的消費者放置在一組中,使用group id進(jìn)行標(biāo)識。
舉個例子,如果你有一個在多個地方運行的日志處理程序,每個程序都會處理某個topic的消息,那么你可以用相同的group id來標(biāo)識這個處理組,以確保傳遞給組中的每個處理程序的消息是唯一的。
二、groupid的作用
Kafka通過group id分配消費者之間的消息,確保一個組內(nèi)的消費者不會接收到相同的消息。當(dāng)同一個group id下的多個消費者訂閱了同一個topic時,每個消息將只能被一個消費者消費。
在多個消費者共同消費一個topic的場景下,可以通過groupid來做load balance,即通過groupid的設(shè)置,部署多個消費者實例來對消息進(jìn)行消費。
三、groupid的注意事項
1、group id需要唯一
在同一個Kafka集群中,group id需要唯一,如果兩個group使用了相同的groupid,它們就會消費相同的消息,造成消息的重復(fù)消費。
2、重新啟動后,groupid也需要唯一
如果在同一個group中,消費者重啟或新加入消費者組,那么每次加入新消費者之前,需要確保添加的消費者的group id在之前沒有被使用過。
3、group id的更改會導(dǎo)致消費者重新從頭開始消費
Kafka集群會為group id下的每個消費者保存消費的偏移量,如果group id被更改,消費者將會從頭開始消費。
四、實例代碼
// 配置項
properties.put("group.id", "test-group");
// 創(chuàng)建消費者
KafkaConsumer consumer = new KafkaConsumer<>(properties);
// 訂閱topic
consumer.subscribe(Arrays.asList("test-topic"));
// 消費消息
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
五、小結(jié)
Kafka是一個分布式的消息隊列,通過group id來保證消費者組內(nèi)的消息處理具有唯一性,可以做到消息的負(fù)載均衡和處理組內(nèi)消息的互斥性。在使用時需要注意group id的唯一性以及更改group id的影響等問題。