MapReduceとシャッフル
なぜ分散集計はシャッフルで詰まるのか、その正体をつかむために。map/reduceの処理モデルとシャッフル・ソートの役割、データ局所性による転送削減、そして全対全通信がボトルネックになる原理を解きほぐします。
- 1.MapReduceはmap(各レコードを独立にキー付き中間ペアへ変換)とreduce(同一キーの値をまとめて集約)の2段モデル。両者の間で『同じキーを同じreducerへ集める』再分配がシャッフルで、ここだけが全ノードにまたがる通信を伴う。
- 2.シャッフルは単なる転送ではなくソートを含む。mapper側で出力をパーティション分割しキー順にソートしておき、reducer側は各mapperの該当パーティションを取得してマージソートする。これによりreducerは同一キーの値を連続ストリームとして受け取れる。
- 3.map/reduceの計算自体は並列で線形にスケールするが、シャッフルはM個のmapperとR個のreducerの間でM×Rの全対全転送になり、ネットワーク帯域とディスクI/Oを食う。多くの分散ジョブの実行時間はここで決まり、データ局所性・combiner・パーティション設計が効きどころになる。
2段だけの計算モデル:なぜmapとreduceに割るのか
ペタバイト級のデータを1台では処理しきれないとき、数百〜数千台のクラスタで分散処理したい。難所はデータ分割そのものより、ノードをまたいだ協調です。MapReduceの発想は、ほとんどの大規模集計をたった2つの関数に落とし込めば、その協調を枠組み側が肩代わりできる、というものでした。
- map:入力の各レコードを独立に受け取り、ゼロ個以上の中間キー・バリューペア
(k, v)を出す。レコード間に依存が無いので、入力を分割すればいくらでも並列化できる。 - reduce:同一キーに属する値の集合を受け取り、それを集約して出力を出す。合計・件数・最大・重複排除など、キーごとにまとめる処理を書く。
古典例の語数カウントなら、mapは各行を単語に割って (単語, 1) を吐き、reduceは同じ単語に集まった 1 の列を足し合わせて (単語, 総数) を出します。ここで決定的に重要なのは、map出力の (k, v) が、キー k によって行き先のreducerへ振り分けられる点です。mapは「どこに送るか」を知らず、ただキーを付けるだけ。reduceは「どこから来たか」を知らず、ただキーごとの値を受け取るだけ。この疎結合の隙間を埋める再分配処理こそ**シャッフル(shuffle)**であり、MapReduceの心臓部です。
map/reduceという制約は不自由に見えますが、その制約こそがフレームワークに自動並列化・自動再実行・自動データ配置の自由を与えます。ユーザーは「各レコードに何をするか」と「各キーで何を集約するか」だけを書けばよく、どのノードで動かすか・障害でタスクが落ちたらどう再実行するか・中間データをどう運ぶかはランタイムが決めます。関数が純粋(同じ入力に同じ出力)で副作用が無いほど、フレームワークは安全にタスクを別ノードで再試行でき、耐障害性が成立します。
データ局所性:計算をデータのある場所へ運ぶ
分散処理で最初に効く原理が**データ局所性(data locality)**です。大規模データは分散ファイルシステム(HDFSなど)上でブロックに分割され、各ブロックは複数ノードに複製されて散らばっています。ここで素朴に「データをmapタスクのある場所へ転送する」と、入力全体がネットワークを流れてしまい、帯域が律速になります。
MapReduceの定石は逆で、計算をデータのある場所へ運ぶ。スケジューラは各入力ブロックがどのノードに載っているかを知っており、可能ならそのブロックを保持するノード上でmapタスクを起動します。こうすればmapの入力読み出しはローカルディスクからになり、ネットワークをまたぎません。
狙い: map入力はローカル読み、ネットワークを使わない
ブロックB → ノードN1,N2,N3 が複製保持
スケジューラは N1/N2/N3 のいずれかで B を処理するタスクを起動
→ 入力転送ゼロ(ノードローカル実行)
空きが無ければ同一ラック内のノード(ラックローカル)へフォールバック
局所性が効くのはmap入力の段階である点に注意が要ります。mapが吐いた中間データは「どのキーがどのreducerへ行くか」で宛先が決まるため、中間データの移動は原理的に避けられません。つまり局所性で節約できるのは入力読み出しであって、次に述べるシャッフルの全対全転送は別問題です。ここが「map局所性は最適化できてもシャッフルは残る」という構造の分かれ目です。
シャッフルの中身:分割・ソート・マージ
シャッフルを「mapからreduceへデータを送る処理」とだけ捉えると本質を外します。実体はパーティション分割・ソート・マージを含む、mapper側とreducer側の協調です。順を追います。
mapper側(map出力の書き出し)。各mapタスクは出力 (k, v) を、まずどのreducerに属するかで振り分けます。宛先は通常 partition = hash(k) mod R(Rはreducer数)で決まります。振り分けた各パーティションは、キーでソートされ、メモリ上のバッファがあふれるとディスクへスピルされます。1つのmapタスクが複数回スピルすれば、最後にそれらをマージして1つのソート済み出力ファイルにまとめます。
mapper 側の出力(R=3 の例)
map出力を hash(k) mod 3 でパーティション分け
partition0: [ソート済み (k,v) …] ← reducer0 向け
partition1: [ソート済み (k,v) …] ← reducer1 向け
partition2: [ソート済み (k,v) …] ← reducer2 向け
各 mapper がこの構造を1つ持つ(= M 個のソート済みファイル)
reducer側(取得とマージ)。各reduceタスクは、全mapperの自分向けパーティションをネットワーク経由で取りに行きます(HTTPフェッチなど)。集めるのはM個の断片で、いずれもキー順にソート済みです。これらをマージソートして1つのキー順ストリームにすると、reduceは同一キーの値がすべて隣り合った状態を得ます。あとはキーの切れ目ごとにユーザーのreduce関数を呼べばよい。
なぜソートが要るのか。reduceは「同一キーの全値」を一括で受け取る必要がありますが、値は多数のmapperに散らばっています。全値をメモリに載せてキーで集めるのはデータ量的に無理なので、代わりに各断片をキー順に並べておき、マージソートで統合する。ソート済み列のk方向マージはストリーミングで(全体をメモリに載せずに)でき、同一キーが連続するので区切りを検出するだけで済みます。つまりソートは、巨大なグループ化を有界メモリで実現するための手段です。
combiner:シャッフル量をmap側で削る
シャッフルで運ぶデータ量は、そのままネットワークとディスクの負荷になります。これをmapper側で事前集約して減らすのが**combiner(コンバイナ)**です。reduceと同じ集約を、map出力に対してローカルで先に適用します。
語数カウントなら、1つのmapタスクが同じ単語を1000回出すとき、(the,1) を1000件送る代わりに、combinerがmap側で足し込んで (the,1000) を1件だけ送れます。転送量が桁で落ちます。
combiner なし: map → (the,1)(the,1)…(the,1) 1000件をシャッフル
combiner あり: map → combiner でローカル集約 → (the,1000) 1件だけシャッフル
combinerは「部分集約してから全体集約しても最終結果が変わらない」場合にのみ正しく使えます。合計・件数・最大・最小は成り立ちます(sum(sum(a),sum(b)) = sum(a,b))。危ないのは平均で、部分平均の平均は全体平均と一致しません。平均を分散集約するなら、combinerは合計と件数のペアを持ち回し、最後のreduceで割る形にします。フレームワークはcombinerを呼ぶ保証も回数の保証もしない(0回・1回・複数回どれもありうる)ため、combinerは何回・どんな分割で適用されても結果不変でなければなりません。この代数的性質(結合則・交換則)を満たす集約かどうかが、そのまま分散集約の可否を決めます。
なぜシャッフルがボトルネックになるのか
map/reduceの計算部分は、入力・出力ともにノードローカルで完結し、タスクを足せばほぼ線形にスケールします。ところがシャッフルだけは性質が違います。M個のmapperそれぞれが、R個のreducerすべてに向けたパーティションを作り、各reducerがM個のmapperから断片を集める——M×Rの全対全(all-to-all)通信です。
| フェーズ | 通信パターン | 主なコスト | スケール特性 |
|---|---|---|---|
| map | ノードローカル(局所性が効く) | ローカルディスクI/O・CPU | 入力分割数に対しほぼ線形 |
| shuffle | M×R の全対全転送 | ネットワーク帯域・スピルのディスクI/O | データ量と横幅で急増、全体の律速になりやすい |
| reduce | 自パーティションのマージ・集約 | マージソート・CPU | reducer数に対しほぼ線形 |
全対全が重い理由は3つ重なります。第一にネットワーク帯域。中間データの総量がそのままクラスタ内部ネットワークを流れ、二分帯域(bisection bandwidth)が上限になります。第二にディスクI/O。中間データはメモリに収まらずスピルしてディスクへ書かれ、reducer側でも読み直され、書き+読みで増幅します。第三に同期の壁。多くの実装で全mapperが終わるまでreduceの本体(集約)は始められない——reducerは「全mapperの断片が揃った」ことを前提にマージするからです。結果、少数の遅いmapper(ストラグラー)が全体を待たせます。
パーティションは hash(k) mod R で決まるため、特定のキーに値が極端に集中する(ホットキー)と、そのキーを担当する1つのreducerに中間データが偏って集まります。他のreducerが空いていても、そのreducerだけが巨大な入力を1台で処理しきれず、ジョブ全体がそれを待ちます。均等分割はキーの分布が一様という暗黙の仮定に立っており、現実のデータ(人気商品、特定言語、NULLキー)は容易にこれを破ります。対策は、ホットキーへソルト(key#0〜key#n のように分割)を付けて複数reducerへ散らし二段集約する、事前にキー分布をサンプリングして範囲パーティションを設計する、といった偏りを前提にしたパーティション設計です。偏りは計算量ではなくシャッフルの分配で効くため、mapを速くしても解決しません。
モデルの射程と後継
MapReduceの制約は明快さと引き換えに硬さも生みます。1つのジョブは「map→shuffle→reduce」の1回で、複雑な処理は複数ジョブの連鎖になり、ジョブ間の中間結果をそのつど分散ファイルシステムへ書き戻します。反復計算(機械学習の学習ループ、グラフ探索)ではこの中間書き戻しが繰り返され、ディスクI/Oが支配的になります。後継のデータ処理エンジン(Sparkなど)は、中間データをできるだけメモリに保持し、複数の変換を1つの実行計画(DAG)にまとめてシャッフル回数自体を削ることで、この弱点を突いて高速化しました。
押さえる点は次の通り。(1) mapはレコード独立で並列、reduceはキー単位の集約。両者を繋ぐシャッフルだけが全ノードにまたがる。(2) シャッフルはパーティション分割+ソート+マージを含み、reducerが同一キーの値を連続で受け取れるのはソート済み断片をマージするから。(3) データ局所性が効くのはmap入力読み出しだけで、中間データの移動は避けられない。(4) combinerは結合則・交換則が成り立つ集約に限り、呼ばれる回数に依存しない前提で使う(平均は合計+件数に分解)。(5) ボトルネックはM×Rの全対全転送と**データ偏り(ホットキー)**で、mapの高速化では解けない。「MapReduceは線形にスケールする」は計算部分の話で、シャッフルは別物という切り分けが要点。
まとめ
- MapReduceは**map(レコード独立のキー付け)とreduce(キー単位の集約)**の2段モデル。制約が自動並列化・自動再実行・自動配置を可能にする。
- 両者を繋ぐシャッフルが心臓部で、
hash(k) mod Rで宛先reducerを決め、パーティション分割・ソート・マージを通じて「同一キーの全値」をreducerへ連続ストリームとして届ける。ソートは有界メモリでグループ化を成立させる手段。 - データ局所性は計算をデータへ運びmap入力の転送を消すが、効くのは入力読み出しまで。中間データの再分配は原理的に残る。
- combinerはmap側の事前集約でシャッフル量を削るが、結合則・交換則が成り立ち、呼ばれる回数に依存しない集約に限られる。
- 律速はM×Rの全対全通信(帯域・スピルI/O・全mapper完了までの同期)とホットキーによる偏り。map/reduce計算は線形でも、実行時間はシャッフルで決まりやすい。後継エンジンは中間データのメモリ保持とDAG化でシャッフルを削って高速化した。
データ工学 Article
MapReduceとシャッフルを実務で読む
TL;DRは入口です。実際に選ぶ・使う段階では、何を解決するか、何と比較するか、導入後にどこで詰まるかまで見る必要があります。
解決すること
MapReduce
比較で見る軸
難易度: advanced / カテゴリ: データ工学 / タグ数: 6
導入後に効く点
シャッフルは単なる転送ではなくソートを含む。mapper側で出力をパーティション分割しキー順にソートしておき、reducer側は各mapperの該当パーティションを取得してマージソートする。これによりreducerは同一キーの値を連続ストリームとして受け取れる。
先に潰すリスク
用語だけ覚えても、設計・実装・運用でどこに効くかを確認しないと判断を誤る。
- 難易度
- advanced
- カテゴリ
- データ工学
- タグ数
- 6
判断チェックリスト
- 自社の用途が「MapReduce / シャッフル」に近いか確認する。
- 強みである「MapReduceはmap(各レコードを独立にキー付き中間ペアへ変換)とreduce(同一キーの値をまとめて集約)の2段モデル。両者の間で『同じキーを同じreducerへ集める』再分配がシャッフルで、ここだけが全ノードにまたがる通信を伴う。」が本当に評価軸になるか確認する。
- 注意点の「用語だけ覚えても、設計・実装・運用でどこに効くかを確認しないと判断を誤る。」を運用で吸収できるか確認する。
- 公開値や仕様値は、対象プラン・対象機種・対象リージョンまで確認する。
- 既存システム、ID、ネットワーク、監視、バックアップとの接続方法を先に洗い出す。
- 小さく試してから、本番移行、権限設計、障害時手順、コスト監視を決める。