データスキューの検出と対処
分散ジョブが「99%で止まる」あの現象を根本から潰すために。ホットキーによる偏りがなぜ1タスクを詰まらせるのかを解き、ソルティング・スキュー結合・再分配・事前集約という対処を原理から使い分けられます。
- 1.データスキューとは、シャッフル後のパーティション(=1タスクの担当データ)がキー分布の偏りで極端に不均等になり、少数のホットキーを持つタスクだけが巨大化する現象。全体はほぼ終わっているのに1〜数タスクが延々と走る『99%で止まる』ストラグラーの正体で、並列度を上げても遅い1タスクは分割されないので解決しない。
- 2.検出はタスク単位のメトリクスで行う。ステージ内タスクのシャッフル読取バイト数・処理時間の最大値と中央値の比(max/median)が数倍〜桁になっていれば偏りが濃厚。原因キーはキーごとの件数を近似集計(サンプリングやapprox集計)で特定し、NULL・既定値・人気エンティティが上位を占めていないか見る。
- 3.対処は偏りを分配し直すこと。ホットキーにソルト(乱数接尾辞)を付けて複数パーティションへ散らし二段で集約するソルティング、片側が小さければブロードキャスト結合でシャッフル自体を消す、ホットキーだけ別処理するスキュー結合、reduceByKey相当の事前集約で運ぶ量を根元で削る、AQEなどの実行時パーティション再分配、が定石。
スキューとは何か:偏りが並列性を殺す仕組み
分散データ処理の速さは「仕事を等分して全ノードで同時に走らせる」ことに立脚しています。ところが集約や結合は、同じキーの値を1か所へ集めるシャッフルを避けられません(この再分配の原理は /data-engineering/mapreduce-shuffle/ を参照)。シャッフルの宛先は通常 partition = hash(key) mod N(N はパーティション数)で決まり、1つのパーティションが1タスクの担当データになります。ここで暗黙に置かれているのが「キーがパーティション間に一様に散る」という仮定です。
データスキュー(data skew) は、この仮定が破れて特定キーに値が偏る現象を指します。ハッシュ関数がいくら一様でも、入力のキー分布そのものが偏っていれば分配は偏ります。人気商品のID、特定言語コード、そして頻出する NULL や既定値——現実のデータは容易に一様性を裏切ります。あるキーだけ全体の30%の行を持てば、そのキーが写る1パーティションが突出して巨大化し、担当タスクだけが桁違いのデータを1台で処理する羽目になります。
勘違いしやすいのは「データが多いから遅い」ではなく「特定タスクにだけ偏るから遅い」点です。総データ量が同じでも、均等に散れば各タスクは軽く、偏れば1タスクが重い。だから CPU を増やしても、パーティション数 N を増やしても、単一ホットキーは1つのパーティションにしか入らないため、その重いタスクは分割されず解決しません。スキューは並列化の前提(仕事の等分)を壊す、分配レイヤの病です。
ストラグラー:なぜ「99%で止まる」のか
分散ジョブはステージ単位でバリア同期します。あるステージの全タスクが終わるまで次段は始められない(ステージとシャッフル境界の関係は /data-engineering/spark-rdd-dag/ が詳しい)。すると、ステージの完了時刻は平均ではなく最も遅いタスクの完了時刻で決まります。
スキューがあると、大多数のタスクは軽くて即座に終わり、ホットキーを掴んだ少数のタスクだけが延々と走ります。これが ストラグラー(straggler:落伍者)タスクで、進捗バーが「99%」で長時間固まる典型パターンです。仮に1000タスク中999が10秒、1タスクが600秒なら、ステージは600秒かかります——並列度は実質1に落ちています。
理想(均等) スキュー(ホットキーあり)
task0 ████ task0 ████
task1 ████ task1 ████
task2 ████ task2 ████████████████████████ ← ホットキー
task3 ████ task3 ████
完了 = 4 完了 = 24(遅い1本が全体を律速)
さらに悪いのは、重いタスクがメモリに載りきらないとき。巨大パーティションのグループ化・結合でスピル(ディスク退避)が多発し、最悪 OOM でタスクが落ちて再試行され、同じ偏りでまた落ちる、という悪循環に入ります。
検出:タスクメトリクスの max/median を読む
対処の前に、遅さがスキュー由来だと切り分けることが要ります。鍵はステージ全体ではなくタスク単位の分布を見ることです。
| 観測する指標 | スキューのサイン | 見方 |
|---|---|---|
| タスクのシャッフル読取バイト | max が median の数倍〜桁 | 1タスクだけ突出=偏りの直接証拠 |
| タスクの実行時間 | 少数タスクだけ極端に長い | 分布の裾が重い(ロングテール) |
| スピル量(メモリ→ディスク) | 特定タスクで大量スピル/OOM | 巨大パーティションが載りきらない |
| ステージ内の完了タイムライン | 大半が早く終わり数本が居残る | 99%で停滞=ストラグラー |
Spark UI ならステージ詳細のタスク分布(75th パーセンタイルと Max の乖離)が一目で分かります。原因キーの特定には、キーごとの件数を近似集計するのが実務的です。全キーを厳密に数えると重いので、サンプリングや approx_count_distinct/頻度上位の概算で「どのキーが何行持つか」の上位を出します。
-- 原因キーの当たりを付ける(概念コード)
SELECT join_key, COUNT(*) AS cnt
FROM fact
GROUP BY join_key
ORDER BY cnt DESC
LIMIT 20; -- 上位に NULL や特定IDが極端な件数で並べば、それがホットキー
実データで最も多いスキュー原因が NULL キーです。外部キーが未設定の行がすべて同じ NULL に落ち、hash(NULL) は1つの値なので全 NULL 行が単一パーティションへ集中します。結合では多くの場合 NULL は結合対象外(等値結合はマッチしない)なのに、シャッフルだけは律儀に1か所へ集めてしまう。対処は結合前に NULL を除外する、あるいは NULL にランダム値を割り当てて散らす(どうせマッチしないので結果は変わらない)ことです。
対処1:ソルティングで偏りを分割する
最も汎用的な武器が ソルティング(salting) です。ホットキーを、乱数の接尾辞(ソルト)を付けて人工的に複数のキーへ割り、複数パーティションに散らす発想です。集約なら二段構えにします。
まず各行のキーに 0 から S-1(S はソルト数)の乱数を足し、key#3 のように分割キーを作る。第一段の集約はこのソルト付きキーで行うので、元は1つだったホットキーが最大 S 個のパーティションへ分散し、S 台で並列に部分集約されます。その後ソルトを剥がして元キーで第二段の集約をかけ、部分結果をまとめます。
段1: ソルト付きで部分集約(ホットキーを S=4 個へ散らす)
(hotkey#0, 部分和) (hotkey#1, 部分和) (hotkey#2, 部分和) (hotkey#3, 部分和)
段2: ソルトを剥がし元キーで最終集約
hotkey → 4つの部分和を合算 → 最終値
これが成立するのは、集約が結合則・交換則を満たす(合計・件数・最大など、部分集約してから全体集約しても結果が変わらない)場合です。この代数的条件は combiner/reduceByKey の適用可否と同じ条件で、詳しくは /data-engineering/mapreduce-shuffle/ に整理があります。平均のように単純加算できない集約は、合計と件数のペアを持ち回して最後に割る形へ分解します。
対処2:スキュー結合とブロードキャスト
結合(JOIN)のスキューは専用の対処が効きます。
ブロードキャスト結合(broadcast join)。片方のテーブルが十分小さければ、それを全ノードへ丸ごと配って各ノードのローカルで結合します。小表を各所へコピーするのでシャッフルそのものが消え、大表側のキー分布がどれだけ偏っていても、そもそも再分配しないのでスキューが起きません。片側が数十MB〜数百MB以下ならまず検討する第一手です。
スキュー結合(skew join)。両表とも大きく片側だけホットキーがある場合、ホットキーとそれ以外を分けて別々に結合します。非ホット部分は通常のシャッフル結合、ホットキー部分は結合相手側の該当行を複製してソルティングで散らす。あるいは、片側でホットキーを key#0..S-1 に分割し、もう片側の対応行を S 倍に**複製(explode)**して全ソルトに当てることで、1つのキーの結合を S タスクへ割ります。
Spark の AQE(Adaptive Query Execution) は、シャッフル後の実測パーティションサイズを見て計画を組み替えます。突出して大きいパーティションを検出すると、それを複数の小片へ自動分割し(skewedPartitionFactor 等の閾値超過が対象)、相手側の対応パーティションを複製して結合し直します。手作業のソルティングを実行時に肩代わりする仕組みで、spark.sql.adaptive.enabled と skewJoin.enabled を有効化しておくのが今日の標準です。ただし万能ではなく、集約側のスキューや極端な単一キーには手動ソルティングが依然有効です。
対処3:事前集約と再分配
事前集約(partial/pre-aggregation)。運ぶデータ量をシャッフルの前に根元で削るのが、偏りにもコストにも効く王道です。groupByKey().mapValues(sum) は全値をそのまま集めてから畳むためホットキーに値が殺到しますが、reduceByKey(_ + _) はmap側で部分集約してから送るので、同一キーが1ノードに1万件あっても送るのは集約後の1件で済みます。同じ結果でも、事前集約の有無でシャッフル量とスキューの深刻さが桁で変わります(この違いは /data-engineering/spark-rdd-dag/ 参照)。
パーティション再分配(repartition)。パーティション数が少なすぎると、各パーティションが太って偏りが顕在化しやすくなります。repartition(N) で数を増やすと均しは改善しますが、それ自体が全対全シャッフルを起こすコストである点と、単一ホットキーは何度再分配しても1パーティションにしか入らない点に注意が要ります。再分配は「多数の中規模キーが少数パーティションに固まっている」タイプの偏りには効きますが、単一巨大キーには効きません——そこはソルティングの領分です。そもそもの物理配置でスキューとシャッフルを抑えるパーティション/バケット設計は /data-engineering/partitioning-bucketing/ が、分散キー設計でシャッフルを減らす発想は /data-engineering/mpp-data-warehouse/ が詳しく、いずれもキーの偏りが遅さの主因になる点は共通です。
押さえる点。(1) スキューは計算量でなく分配の偏りの問題で、CPU増やしても並列度を上げても単一ホットキーは1パーティションに固まるため解けない。(2) ステージは最も遅いタスクで律速され、これが「99%で止まる」ストラグラー。(3) 検出はタスク単位でシャッフル読取・時間の max/median を見る。NULLキーが最悪の常連。(4) 対処:ソルティング(乱数接尾辞で散らし二段集約、結合則が前提)、ブロードキャスト結合(小表配布でシャッフル消滅)、スキュー結合(ホットキー分離+相手側複製)、事前集約(reduceByKey相当で運ぶ量を削る)、再分配/AQE(実行時のパーティション組み替え)。(5) groupByKey より reduceByKey、単一巨大キーは再分配でなくソルティング、という使い分けが要点。
まとめ
- データスキューは、シャッフルの分配
hash(key) mod Nがキー分布の偏りで不均等になり、ホットキーを掴んだ少数タスクだけが巨大化する現象。総量でなく偏りが並列性を殺す。 - ステージは最も遅いタスクで完了が決まるため、偏りは ストラグラー(99%で停滞・OOM再試行)として現れる。並列度を上げても単一ホットキーは分割されない。
- 検出はタスク単位のメトリクス(シャッフル読取・実行時間・スピルの max/median)で行い、原因キーは近似集計で特定する。
NULL・既定値・人気エンティティが常連。 - 対処は「偏りを分配し直す/そもそも運ばない」。ソルティング(二段集約、結合則が前提)、ブロードキャスト結合(シャッフル消滅)、スキュー結合(ホットキー分離+複製)、事前集約(reduceByKey相当)、再分配・AQE(実行時組み替え)を偏りの型で使い分ける。
- 単一巨大キーは再分配では解けずソルティングの領分、多数の中規模キーの固まりは再分配で均せる——この切り分けが実務の勘所。
データ工学 Article
データスキューの検出と対処を実務で読む
TL;DRは入口です。実際に選ぶ・使う段階では、何を解決するか、何と比較するか、導入後にどこで詰まるかまで見る必要があります。
解決すること
データスキュー
比較で見る軸
難易度: advanced / カテゴリ: データ工学 / タグ数: 6
導入後に効く点
検出はタスク単位のメトリクスで行う。ステージ内タスクのシャッフル読取バイト数・処理時間の最大値と中央値の比(max/median)が数倍〜桁になっていれば偏りが濃厚。原因キーはキーごとの件数を近似集計(サンプリングやapprox集計)で特定し、NULL・既定値・人気エンティティが上位を占めていないか見る。
先に潰すリスク
用語だけ覚えても、設計・実装・運用でどこに効くかを確認しないと判断を誤る。
- 難易度
- advanced
- カテゴリ
- データ工学
- タグ数
- 6
判断チェックリスト
- 自社の用途が「データスキュー / ソルティング」に近いか確認する。
- 強みである「データスキューとは、シャッフル後のパーティション(=1タスクの担当データ)がキー分布の偏りで極端に不均等になり、少数のホットキーを持つタスクだけが巨大化する現象。全体はほぼ終わっているのに1〜数タスクが延々と走る『99%で止まる』ストラグラーの正体で、並列度を上げても遅い1タスクは分割されないので解決しない。」が本当に評価軸になるか確認する。
- 注意点の「用語だけ覚えても、設計・実装・運用でどこに効くかを確認しないと判断を誤る。」を運用で吸収できるか確認する。
- 公開値や仕様値は、対象プラン・対象機種・対象リージョンまで確認する。
- 既存システム、ID、ネットワーク、監視、バックアップとの接続方法を先に洗い出す。
- 小さく試してから、本番移行、権限設計、障害時手順、コスト監視を決める。