Kafka 消息偏移量機制#
消費者提交偏移量#
消費者往一個叫做_consumer_offset 的特殊主題發送消息,消息裡包含每個分區的偏移量。
如果消費者一直處於運行狀態,那麼偏移量就沒有什麼用處。不過,如果消費者發生崩潰或者有新的消費者加入群組,就會觸發再均衡
,完成再均衡
之後,每個消費者可能分配到新的分區,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要讀取每個分區最後一次提交的偏移量,然後從偏移量指定的地方繼續處理,這個時候就可能會因為提交的偏移量與客戶端處理的偏移量之間不一致產生的重複消費和消息丟失情況。
重複消費#
如果提交的偏移量小於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息就會被重複處理。
消息丟失#
如果提交的偏移量大於客戶端處理的最後一個消息的偏移量,那麼處於兩者之間的消息就會丟失。
提交偏移量的方式#
自動提交 (at most once)#
enable.auto.commit 被設為 true, 消費者會自動提交偏移量,提交時間間隔由 auto.commit.interval.ms 控制,默認為 5s。
最簡單的提交方式,但是不能清楚的知道消息處理的情況,容易產生消息重複消費和消息丟失的情況。
主動提交當前偏移量 (at least once)#
利用 KafkaConsumer API 可以在必要的時候主動提交當前偏移量,而不是基於時間間隔。
-
commitSync()
同步提交,將會提交由 poll () 返回的最新偏移量,可以保證可靠性,但因為提交時程序會處於阻塞狀態,限制吞吐量。 -
commitAsync()
異步提交,不保證消息可靠,支持自定義回調處理,用於記錄提交錯誤或生成度量指標。
提交特定的偏移量#
提交偏移量的頻率與處理消息批次的頻率是一樣的,就是說通常是處理完一批次消息提交一次偏移量,但有時候比如 poll () 方法返回一大批數據,為了避免因為再均衡 (詳見下節說明) 引起的重複處理整批消息,我們想要在批次中間提交偏移量,這時可以通過調用 commitSync ()/commitAsync () 方法時傳入一個存有希望提交的分區和偏移量的 map 實現,但是因為消費者可能不只讀取一個分區,你需要跟蹤所有分區的偏移量,所以在這個層面上控制偏移量的提交會讓代碼變複雜。
從特定偏移量處開始處理記錄#
// 獲得主題 topic 所有可用分區 partition 的信息
partitionInfos = kafkaConsumer.partitionsFor(topic);
Collection<TopicPartition> partitions = null;
if (partitionInfos != null) {
// 消費者為自己分配分區, 這裡示例是遍歷相當於消費全部分區, 實際情況可以添加邏輯分配特定分區
partitionInfos.forEach( partitionInfo -> {
partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));``
});
kafkaConsumer.assign(partitions);
}
ConsumerCommitOffset.commitOffset(kafkaConsumer);
再均衡#
Kafka 消費模式#
從 kafka 消費消息,kafka 客戶端提供兩種模式:分區消費,分組消費。
消費者數目跟分區數目的關係#
- 一個消費者可以消費一個到全部分區數據。
- 分組消費,同一個 group 內所有消費者消費一份完整的數據,此時一個分區數據只能被一個消費者消費,而一個消費者可以消費多個分區數據。
- 同一個消費組內,消費者數目大於分區數目後,消費者會有空餘 = 分區數 - 消費者數,這個時候可以選擇新增 group。
分組消費的再平衡策略#
我們知道一個 group 下的消費者共同消費主題的分區數據,一個新的消費者加入群組時,它讀取的是原本由其他消費者讀取的消息。當一個消費者被關閉或發生崩潰時,它就離開群組,原本由它讀取的分區將由群組裡的其他消費者來讀取。還有就是可能主題發送了變化,添加了新分區,也會發生分區重分配。
以上幾種情況,分區的所有權從一個消費者轉移到了另一個消費者,就是分區再均衡
,它為消費者群組帶來了高可用性和伸縮性,缺陷在於在再均衡期間,消費者無法讀取消息,造成整個群組一小段時間的不可用。
通過設置參數 partition.assignment.strategy
, 選擇分配策略,有兩種分配策略:
RangeAssignor#
把主題的若干個連續的分區分配給消費者,默認是這種。
假設消費者 C1 和消費者 C2 同時訂閱了主題 T1 和主題 T2,並且每個主題有 3 個分區。那麼消費者 C1 有可能分配到這兩個主題的分區 0 和分區 1,而消費者 C2 分配到這兩個主題的分區 2。
RoundRobinAssignor#
把主題的所有分區逐個分配給消費者。
如果使用 RoundRobin 策略來給消費者 C1 和消費者 C2 分配分區,那麼消費者 C1 將分到主題 T1 的分區 0 和分區 2 以及主題 T2 的分區 1,消費者 C2 將分配到主題 T1 的分區 1 以及主題 T2 的分區 0 和分區 2。一般來說,如果所有消費者都訂閱相同的主題(這種情況很常見), RoundRobin 策略會給所有消費者分配相同數量的分區(或最多就差一個分區)。
參考#
分區心跳機制#
消費者依賴於心跳機制向 GroupCoordinator 報活,發送心跳來維持它們和群組的從屬關係以及它們對分區的所有權關係。
具體參考:Kafka 源碼解析:Group 協調管理機制
再均衡監聽器#
在提交偏移量,消費者在退出和進行分區再均衡之前,會做一些清理工作,我們可以通過定義監聽器的方法做一些特殊的邏輯處理,比如處理緩衝區記錄,進行數據庫連接操作保存信息 (比如利用Mysql
存儲 offset 進行手動維護) 等。
實現很簡單,在訂閱消費主題時傳入一個再均衡監聽器實例就行 (可以自己實現接口)。
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
// 在再均衡開始之前和消費者停止讀取消息之後被調用
// ... 自己實現處理邏輯
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
// 在重新分配分區之後和消費者開始讀取消息之前被調用
}
});