Yige

Yige

Build

Kafkaメッセージオフセットメカニズム

Kafka メッセージオフセットメカニズム#

消費者のオフセットの提出#

消費者は、_consumer_offset という特別なトピックにメッセージを送信し、そのメッセージには各パーティションのオフセットが含まれています。
消費者が常に稼働している場合、オフセットはあまり意味がありません。しかし、消費者がクラッシュしたり、新しい消費者がグループに参加したりすると、再バランスがトリガーされます。再バランスが完了した後、各消費者は以前処理していたものではなく、新しいパーティションに割り当てられる可能性があります。以前の作業を続けるために、消費者は各パーティションの最後に提出されたオフセットを読み取り、そのオフセットが指定する場所から処理を続ける必要があります。この時、提出されたオフセットとクライアントが処理しているオフセットの間に不一致があると、重複消費やメッセージの喪失が発生する可能性があります。

重複消費#

提出されたオフセットがクライアントが処理した最後のメッセージのオフセットよりも小さい場合、2 つのオフセットの間にあるメッセージが重複して処理されます。
-w686

メッセージの喪失#

提出されたオフセットがクライアントが処理した最後のメッセージのオフセットよりも大きい場合、両者の間にあるメッセージが失われます。
-w691

オフセットの提出方法#

自動提出(at most once)#

enable.auto.commit が true に設定されている場合、消費者は自動的にオフセットを提出します。提出の時間間隔は auto.commit.interval.ms によって制御され、デフォルトは 5 秒です。
最も簡単な提出方法ですが、メッセージ処理の状況を明確に把握することができず、重複消費やメッセージの喪失が発生しやすくなります。

現在のオフセットを手動で提出(at least once)#

KafkaConsumer API を使用して、必要に応じて現在のオフセットを手動で提出できます。時間間隔に基づくのではなく。

  • commitSync()
    同期的に提出し、poll () が返す最新のオフセットを提出します。信頼性を保証できますが、提出時にプログラムがブロック状態になるため、スループットが制限されます。

  • commitAsync()
    非同期的に提出し、メッセージの信頼性を保証しません。カスタムコールバック処理をサポートし、提出エラーの記録やメトリクスの生成に使用されます。

特定のオフセットを提出#

オフセットの提出頻度はメッセージバッチの処理頻度と同じです。つまり、通常はバッチメッセージを処理した後にオフセットを 1 回提出しますが、例えば poll () メソッドが大量のデータを返す場合、再バランス(次のセクションで説明)による重複処理を避けるために、バッチの途中でオフセットを提出したい場合があります。この場合、commitSync ()/commitAsync () メソッドを呼び出す際に、提出したいパーティションとオフセットを含むマップを渡すことで実現できます。しかし、消費者が複数のパーティションを読み取る可能性があるため、すべてのパーティションのオフセットを追跡する必要があります。このため、このレベルでオフセットの提出を制御すると、コードが複雑になります。

特定のオフセットからレコードを処理開始#

// トピック topic のすべての利用可能なパーティション partition の情報を取得
partitionInfos = kafkaConsumer.partitionsFor(topic);
Collection<TopicPartition> partitions = null;

if (partitionInfos != null) {

    // 消費者は自分自身にパーティションを割り当てます。ここでは、すべてのパーティションを消費することに相当する例です。実際の状況では、特定のパーティションを割り当てるロジックを追加できます。
    partitionInfos.forEach(partitionInfo -> {
        partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
    });

    kafkaConsumer.assign(partitions);
}

ConsumerCommitOffset.commitOffset(kafkaConsumer);

再バランス#

Kafka の消費モード#

Kafka からメッセージを消費する際、Kafka クライアントは 2 つのモードを提供します:パーティション消費、グループ消費。

消費者数とパーティション数の関係#

  • 1 つの消費者は 1 つからすべてのパーティションデータを消費できます。
  • グループ消費では、同じグループ内のすべての消費者が完全なデータを消費します。この場合、1 つのパーティションデータは 1 人の消費者によってのみ消費され、1 人の消費者は複数のパーティションデータを消費できます。
  • 同じ消費グループ内で、消費者数がパーティション数を超えると、消費者には余剰が生じます = パーティション数 - 消費者数。この場合、新しいグループを追加することができます。

グループ消費の再バランス戦略#

1 つのグループ内の消費者が共同でトピックのパーティションデータを消費することがわかっています。新しい消費者がグループに参加すると、他の消費者が読み取っていたメッセージを読み取ります。消費者が停止したりクラッシュしたりすると、その消費者はグループを離れ、元々その消費者が読み取っていたパーティションはグループ内の他の消費者によって読み取られます。また、トピックに変更が加えられ、新しいパーティションが追加されることもあり、パーティションの再割り当てが発生します。

これらの状況では、パーティションの所有権が 1 人の消費者から別の消費者に移転されます。これがパーティション再バランスであり、消費者グループに高可用性とスケーラビリティをもたらしますが、再バランス中は消費者がメッセージを読み取ることができず、グループ全体が短時間利用できなくなるという欠点があります。

パラメータpartition.assignment.strategyを設定することで、割り当て戦略を選択できます。2 つの割り当て戦略があります:

RangeAssignor#

トピックのいくつかの連続したパーティションを消費者に割り当てます。デフォルトはこの方法です。
消費者 C1 と消費者 C2 が同時にトピック T1 とトピック T2 を購読し、各トピックに 3 つのパーティションがあると仮定します。消費者 C1 は、これら 2 つのトピックのパーティション 0 とパーティション 1 に割り当てられる可能性がありますが、消費者 C2 は、これら 2 つのトピックのパーティション 2 に割り当てられます。

RoundRobinAssignor#

トピックのすべてのパーティションを消費者に順番に割り当てます。
RoundRobin 戦略を使用して消費者 C1 と消費者 C2 にパーティションを割り当てる場合、消費者 C1 はトピック T1 のパーティション 0 とパーティション 2、およびトピック T2 のパーティション 1 に割り当てられ、消費者 C2 はトピック T1 のパーティション 1 とトピック T2 のパーティション 0 およびパーティション 2 に割り当てられます。一般的に、すべての消費者が同じトピックを購読している場合(この状況は非常に一般的です)、RoundRobin 戦略はすべての消費者に同じ数のパーティションを割り当てます(または最大で 1 つのパーティションが不足します)。

参考#

パーティションハートビートメカニズム#

消費者はハートビートメカニズムに依存して GroupCoordinator に生存を報告し、ハートビートを送信してグループとの従属関係やパーティションの所有権関係を維持します。
具体的には、Kafka ソースコード解析:グループ調整管理メカニズムを参照してください。

再バランスリスナー#

オフセットを提出する際、消費者は終了し、パーティションの再バランスを行う前にいくつかのクリーンアップ作業を行います。リスナーのメソッドを定義することで、バッファ内のレコードの処理やデータベース接続操作による情報の保存(例えば、Mysqlを利用してオフセットを手動で維持する)などの特別なロジック処理を行うことができます。

実装は非常に簡単で、消費トピックを購読する際に再バランスリスナーのインスタンスを渡すだけです(インターフェースを自分で実装できます)。

consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
    // 再バランスが始まる前と消費者がメッセージの読み取りを停止した後に呼び出されます。
      // ... 自分で処理ロジックを実装
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
    // 再割り当て後、消費者がメッセージの読み取りを開始する前に呼び出されます。
    }
});
読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。