Kafka Message Offset Mechanism#
Consumer Offset Submission#
Consumers send messages to a special topic called _consumer_offset, which contains the offsets for each partition. If the consumer remains running, the offsets are of little use. However, if the consumer crashes or a new consumer joins the group, it triggers a rebalance
. After the rebalance
is completed, each consumer may be assigned new partitions, rather than the ones they previously processed. To continue their previous work, consumers need to read the last committed offset for each partition and then continue processing from the specified offset. At this point, there may be cases of duplicate consumption and message loss due to inconsistencies between the committed offsets and the offsets processed by the client.
Duplicate Consumption#
If the committed offset is less than the offset of the last message processed by the client, then messages between the two offsets will be processed again.
Message Loss#
If the committed offset is greater than the offset of the last message processed by the client, then messages between the two will be lost.
Methods of Committing Offsets#
Automatic Commit (at most once)#
When enable.auto.commit is set to true, the consumer will automatically commit offsets, with the commit interval controlled by auto.commit.interval.ms, defaulting to 5 seconds. This is the simplest method of committing, but it does not clearly indicate the status of message processing, making it prone to duplicate consumption and message loss.
Manual Commit of Current Offset (at least once)#
Using the KafkaConsumer API, you can manually commit the current offset when necessary, rather than based on a time interval.
-
commitSync()
Synchronous commit, which will commit the latest offset returned by poll(). It ensures reliability, but since the program will be in a blocking state during the commit, it limits throughput. -
commitAsync()
Asynchronous commit, which does not guarantee message reliability, supports custom callback handling for logging commit errors or generating metrics.
Committing Specific Offsets#
The frequency of committing offsets is the same as the frequency of processing message batches, meaning that typically, offsets are committed after processing a batch of messages. However, sometimes, for example, when the poll() method returns a large batch of data, to avoid duplicate processing of the entire batch due to rebalance (see the next section for details), we may want to commit offsets in the middle of the batch. This can be achieved by passing a map containing the desired partitions and offsets to commitSync()/commitAsync(). However, since the consumer may read from multiple partitions, you need to track the offsets for all partitions, making it complex to control offset commits at this level.
Starting to Process Records from a Specific Offset#
// Obtain information about all available partitions for the topic
partitionInfos = kafkaConsumer.partitionsFor(topic);
Collection<TopicPartition> partitions = null;
if (partitionInfos != null) {
// The consumer assigns partitions to itself; this example iterates to consume all partitions.
// In practice, logic can be added to assign specific partitions.
partitionInfos.forEach(partitionInfo -> {
partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
});
kafkaConsumer.assign(partitions);
}
ConsumerCommitOffset.commitOffset(kafkaConsumer);
Rebalance#
Kafka Consumption Modes#
When consuming messages from Kafka, the Kafka client provides two modes: partition consumption and group consumption.
Relationship Between Number of Consumers and Number of Partitions#
- One consumer can consume data from one to all partitions.
- In group consumption, all consumers within the same group consume a complete set of data; at this time, data from one partition can only be consumed by one consumer, while one consumer can consume data from multiple partitions.
- When the number of consumers in the same consumer group exceeds the number of partitions, there will be idle consumers = number of partitions - number of consumers, at which point a new group can be added.
Rebalance Strategy for Group Consumption#
We know that consumers within a group jointly consume partition data from the topic. When a new consumer joins the group, it reads messages that were originally read by other consumers. When a consumer is shut down or crashes, it leaves the group, and the partitions it was reading will be read by other consumers in the group. Additionally, if the topic has changed and new partitions have been added, partition reassignment will also occur.
In the above scenarios, the ownership of partitions transfers from one consumer to another, which is called partition rebalance
. It provides high availability and scalability for consumer groups, but the downside is that during the rebalance, consumers cannot read messages, causing a brief period of unavailability for the entire group.
By setting the parameter partition.assignment.strategy
, you can choose the assignment strategy, which has two types:
RangeAssignor#
Assigns several contiguous partitions of the topic to consumers; this is the default.
Assuming consumers C1 and C2 are subscribed to topics T1 and T2, and each topic has 3 partitions. Consumer C1 may be assigned partitions 0 and 1 of both topics, while consumer C2 is assigned partition 2 of both topics.
RoundRobinAssignor#
Assigns all partitions of the topic to consumers one by one.
If using the RoundRobin strategy to assign partitions to consumers C1 and C2, consumer C1 will receive partitions 0 and 2 of topic T1 and partition 1 of topic T2, while consumer C2 will be assigned partition 1 of topic T1 and partitions 0 and 2 of topic T2. Generally, if all consumers subscribe to the same topics (which is common), the RoundRobin strategy will allocate the same number of partitions to all consumers (or at most, one partition difference).
References#
Partition Heartbeat Mechanism#
Consumers rely on the heartbeat mechanism to report their status to the GroupCoordinator, sending heartbeats to maintain their membership in the group and their ownership of partitions. For more details, refer to: Kafka Source Code Analysis: Group Coordination Management Mechanism
Rebalance Listener#
Before committing offsets, and before consumers exit and perform partition rebalancing, some cleanup work is done. We can define a listener method to handle special logic, such as processing buffered records or performing database connection operations to save information (for example, using MySQL
to store offsets for manual maintenance).
The implementation is straightforward; just pass an instance of a rebalance listener when subscribing to the consumption topic (you can implement the interface yourself).
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
// Called before the rebalance starts and after the consumer stops reading messages
// ... implement your own handling logic
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
// Called after partitions are reassigned and before the consumer starts reading messages
}
});