Hadoop シリーズ - MapReduce#
MapReduce の主要な考え方:大きな計算プログラムを自動的に Map(マッピング)と Reduce(簡略化)に分割する、分治思想
プロセスの概要#
input --> map --> shuffle --> reduce ---> output
Map 側のプロセス#
- Map: カスタム Map 関数でデータを処理
- Partition: map の結果を対応する reduce 側に送信するため。全体の partition の数は reducer の数に等しい。具体的な実装は:key をハッシュ化した後、reduce タスクの数でモジュロを取り、指定されたジョブに送る(デフォルトはHashPartitioner、
job.setPartitionerClass(MyPartition.class)
でカスタマイズ可能) - Sort: まず
<key,value,partition>
の partition 番号でソートし、その後 key でソートする。これがソート操作であり、最後に溢れた小ファイルはパーティションのものであり、同じパーティション内では key が順序付けられていることが保証される。 - Combine: 事前に統計を行い、局所的にマージして、Map 側から Reduce 側へのデータ転送コストを削減する。開発者はプログラム内で combine を設定する必要がある(プログラム内で
job.setCombinerClass(myCombine.class)
でカスタマイズされた combine 操作)。2 つの段階で発生する可能性がある:(1) map 出力データが partition でソートされた後、ファイルに書き込む前に combine 操作が一度実行される、もちろん前提として作業でこの操作が設定されていること。(2) map 出力が大きく、溢れたファイルの数が 3 を超える場合(この値は属性min.num.spills.for.combine
で設定可能)、マージの過程で combine 操作が実行される。 - Merge: map が非常に大きい場合、毎回の溢れ書きで spill_file が生成され、複数の spill_file が存在する。この時、merge マージ操作を行う必要があり、最終的に 1 つの MapTask が 1 つのファイルを出力する。つまり、Map Task のすべてのデータが処理された後、タスクが生成したすべての中間データファイルを一度マージして、1 つの中間データファイルのみを生成することを保証する。
Reduce 側のプロセス#
- Copy: Reducer ノードが各 mapper ノードから自分のパーティションに属するデータをダウンロードする。同じ Partition のデータは同じノードに落ちる。
- Merge: Reducer が異なる Mapper ノードから取得したデータを 1 つのファイルにマージする。
- Reduce: データを統計する。
MR 作業を Yarn に提出するプロセス#
1. RunJarプロセスを生成し、クライアントがRMにジョブの実行を申請する。
2. RMがジョブに関連するリソースの提出パスstaging-dirとこのジョブが生成したジョブIDを返す。
3. クライアントはジョブに関連するリソースを対応する共有ファイルシステムのパス(/yarn-staging/jobID)に提出する。
4. クライアントがRMに提出結果を報告する。
5. RMがジョブをタスクキューに追加する。
6. NMがRMとのハートビート接続を通じて、RMのタスクキューから新しいタスクを取得する。
7. RMがNMに実行リソースコンテナcontainerを割り当てる。
8. RMがコンテナ内でMRAppMasterプロセスを起動する。
9. 作成されたMRAppmasterがどのNodeManagerでmap(つまりyarnchildプロセス)とreduceタスクを実行するかを割り当てる。
10. mapとreduceタスクを実行するNMが共有ファイルシステムからジョブに関連するリソースを取得し、jarファイル、設定ファイルなどを含む。その後、mapとreduceタスクを実行する。
11. ジョブが完了した後、MRAppMasterがRMに自分を解除し、リソースを解放する。
Shuffle メカニズム#
Map から Reduce への入力全体のプロセスは広義に Shuffle と呼ばれる。Shuffle は Map 側と Reduce 側を横断し、Map 側には Partition 分割と Spill 分割プロセスが含まれ、Reduce 側には copy コピーと Merge プロセスが含まれる。
循環メモリバッファ#
map メソッド () の最後のステップで、OutputCollector.collect(key,value)
またはcontext.write(key,value)
を通じて中間処理結果を出力し、関連するcollect(key,value)
メソッド内でPartitioner.getPartition(K2 key, V2 value, int numPartitions)
メソッドを呼び出して出力の key/value に対応するパーティション番号を取得する(パーティション番号は Reduce Task を実行するノードに対応すると考えられる)、その後<key,value,partition>
を一時的にメモリ内のMapOutputBuffer内部の循環データバッファ
に保存する。このバッファのデフォルトサイズは100MBで、パラメータio.sort.mb
でサイズを調整できる。
MapOutputBuffer 内部に保存されるデータは 2 つのインデックス構造を使用し、3 つの循環メモリバッファが関与する:
-
kvoffsetsバッファ
:オフセットインデックス配列とも呼ばれ、key/value 情報の位置インデックス kvindices 内のオフセットを保存する。kvoffsets
の使用率がmapreduce.map.sort.spill.percent(デフォルトは80%)
を超えると、SpillThread スレッドの「溢れ書き」操作がトリガーされ、Spill 段階の操作が開始される。 -
kvindicesバッファ
:位置インデックス配列とも呼ばれ、key/value がデータバッファ kvbuffer 内の開始位置を保存する。 -
kvbufferつまりデータバッファ
:実際の key/value の値を保存するために使用される。デフォルトでは、このバッファは最大でio.sort.mb
の 95% を使用でき、kvbufferの使用率がmapreduce.map.sort.spill.percent(デフォルトは80%)
を超えると、SpillThread スレッドの「溢れ書き」操作がトリガーされ、Spill 段階の操作が開始される。
Spill 溢れ書き#
MapOutputBuffer 内部のkvoffsetsバッファ
とkvbufferつまりデータバッファ
が閾値に達すると spill 操作がトリガーされる。メモリバッファ内のデータをローカルディスクに書き込み、ローカルディスクに書き込む際にはまず partition で、次に key でソートする。
- この spill 操作は別のスレッドによって操作され、バッファに map 結果を書き込むスレッドには影響しない。
- ディスクにデータを書き込む前に、書き込むデータを一度ソート操作する必要があり、まず
<key,value,partition>
の partition 番号でソートし、その後 key でソートする。これがソート操作であり、最後に溢れた小ファイルはパーティションのものであり、同じパーティション内では key が順序付けられていることが保証される。
Copy 段階#
デフォルトでは、全体の MapReduce 作業のすべての実行が完了した Map Task の数が Map Task の総数の 5% を超えると、JobTracker は Reduce Task の実行をスケジュールし始める。その後、Reduce Task はデフォルトでmapred.reduce.parallel.copies(デフォルトは5)
個の MapOutputCopier スレッドを完了した Map Task ノードに起動し、それぞれ自分のデータのコピーを行う。これらのコピーされたデータは最初にメモリバッファに保存され、メモリバッファの使用率が一定の閾値に達すると、ディスクに書き込まれる。
補足
このメモリバッファのサイズの制御は、map 側のメモリバッファのようにio.sort.mb
で設定されるのではなく、別のパラメータで設定される:mapred.job.shuffle.input.buffer.percent(デフォルト0.7)
。このパラメータは、shuffile が reduce メモリ内で使用するデータの最大メモリ量が:0.7 × reduce タスクの maxHeap であることを意味する。もしその reduce タスクの最大 heap 使用量が、通常mapred.child.java.opts
で設定される場合、例えば - Xmx1024m に設定すると、reduce はその **heapsize の 70%** を使用してメモリ内にデータをキャッシュする。もし reduce の heap がビジネス上の理由で大きく調整されると、相応のキャッシュサイズも大きくなる。これが reduce がキャッシュに使用するパラメータが固定値ではなくパーセンテージである理由である。
Merge プロセス#
コピーされたデータは最初にメモリバッファに置かれ、ここでのバッファサイズは map 側のものよりも柔軟で、JVM のheap size
設定に基づいている。Shuffle 段階では Reducer は実行されないため、メモリの大部分を Shuffle に使用するべきである。
3 つの形式がある:
- メモリからメモリ
- メモリからディスク
- ディスクからディスク
デフォルトでは、最初の形式は有効になっていない。メモリ内のデータ量が一定の閾値に達すると、メモリからディスクへのマージが開始される(マージを行う理由は、reduce 側が複数の map 側からデータをコピーする際に、ソートを行わずに単にメモリにロードしたため、閾値に達してディスクに書き込む際にマージが必要になる)。このマージ方式は、map 側のデータがなくなるまで実行され続け、その後に最終ファイルを生成するためのディスクからディスクへのマージ方式が開始される。
MR 調整の考え方#
データの偏り
:データが Reduce に集約される際、1 つの ReduceTask のデータが過大であると、全体のプログラムの効率が非常に低下する。- Map と Reduce タスクの数の設定が不適切。
- Map の実行時間が長すぎて、Reduce が長時間待機する。
小さなファイルが多すぎる
:ファイルの大きさに関わらず、メタデータ情報が割り当てられるため、過剰になるとリソースやストレージスペースが無駄になる。- MR には大量の分割不可能な超大ファイルがあり、shuffle 段階で継続的に溢れ書きが発生する。
- 複数の溢れた小ファイルがあり、多段階のマージが必要。