分散クエリプランニングと最適化
同じSQLでも書き方や設計次第で分析クエリのコストが桁で変わる理由を内部から掴む。論理・物理プラン、コストベース最適化、プッシュダウンとプルーニング、シャッフル境界でのステージ分割の原理を分析基盤の視点で解きほぐします。
- 1.分散クエリエンジン(Spark SQL/Presto・Trino/BigQuery等)は、SQLをまず論理プラン(何を計算するか)に変換し、規則ベースの書き換えとコストベース最適化を経て、シャッフルを含む物理プランに落とす。1台のDBと決定的に違うのは、演算子の並びだけでなくデータをどのノードへ動かすか(分散・シャッフル)まで含めてコストを見積もる点にある。
- 2.分析基盤で最も効く最適化は、読む前に捨てる系のプッシュダウンだ。述語プッシュダウンと射影プルーニングでスキャンする行と列を削り、パーティションプルーニングでディレクトリ単位のファイル群を丸ごと読み飛ばす。これらはストレージ(Parquet/オブジェクトストレージ)まで押し込めて初めて、スキャン量課金とI/Oを桁で減らす。
- 3.物理プランはシャッフル(データ再分散)の境界でステージに分割され、ステージ間はシャッフルで区切られる同期点になる。コスト最適化の主眼は結合順序・結合方式(ブロードキャストかシャッフルか)の選択でシャッフル総量を最小化することで、統計が古いと誤った物理プランを選び、全体が遅くなる。
なぜ分散環境ではクエリプランが桁で効くのか
同じ結果を返すSQLでも、内部でどう実行するかは無数にあります。3つの表を結合するだけでも、どの2表から結合するか、各結合をどの方式で行うか、フィルタをどの段階でかけるかで、実行の手順は大きく変わります。単一DBならこの選択は主に「どのインデックスを使い、どの順で結合するか」に閉じますが、分散環境ではさらに『データをどのノードへ動かすか』という次元が加わります。ここでの誤りは、ネットワークを大量のデータが飛び交う遅延として跳ね返り、数十秒で終わるはずのクエリが数十分になります。
分散クエリエンジン——Spark SQL、Presto/Trino、BigQuery、あるいはMPPウェアハウス(/data-engineering/mpp-data-warehouse/)——のクエリプランナ(オプティマイザ) は、この膨大な選択肢の中から、なるべく安い実行手順を自動で選ぶ役割を担います。B+木の探索やMVCCといった単一ノードのストレージ内部(/database/)の話ではなく、主題はクラスタ全体でスキャン量とネットワーク転送量をどう削るかです。ユーザーは「何が欲しいか」を宣言的に書き、プランナが「どう計算するか」を組み立てる——この分業こそがSQLの生産性の源泉であり、分析基盤ではその巧拙がコストを直接左右します。
SQLは「結果の定義(what)」だけを書き、「実行手順(how)」は書きません。だからこそエンジンは、同じ結果を保ったまま実行手順を自由に組み替えられます。プランナの仕事は、意味を変えない範囲で等価な実行プランの中から最も安いものを選ぶこと。分析基盤ではデータ量が巨大なので、この自動選択の良し悪しが人手のチューニングを上回る効果を持ちます。
論理プランと物理プラン:何を計算するかと、どう計算するか
プランニングは大きく2段階に分かれます。まず論理プラン(logical plan) は、SQLを関係演算子(スキャン、フィルタ、射影、結合、集約)の木として表したもので、「何を計算するか」だけを表し、実行方法は決めません。JOIN は「結合する」という意味を持つだけで、ハッシュ結合かソートマージ結合かは未定です。
次に物理プラン(physical plan) は、各論理演算子に具体的な実行アルゴリズムとデータ移動を割り当てたものです。ここで初めて「この結合はハッシュ結合」「ここで結合キーによるシャッフルが要る」「この小表はブロードキャストする」が決まります。分散エンジンの物理プランには、単一DBには無いExchange(データ交換)演算子——シャッフルやブロードキャストを表すノード——が現れるのが最大の特徴です。
論理プラン(何を計算するか):
Aggregate(SUM(amount) GROUP BY region)
└ Join(orders.cust = customers.cust)
├ Filter(orders.date >= '2026-01-01')
│ └ Scan(orders)
└ Scan(customers)
物理プラン(どう計算するか。Exchange = ノード間のデータ移動):
HashAggregate(final) ← 部分集約をマージ
└ Exchange(hash by region) ← 集約キーで再分散(シャッフル)
└ HashAggregate(partial) ← 各ノードで部分集約
└ BroadcastHashJoin(cust)
├ Scan(orders, filter: date>=..., columns: cust,amount) ← 大表はローカルのまま(動かさない)
└ Broadcast ← 小さい customers を全ノードへ複製
└ Scan(customers, columns: cust,region)
論理から物理への変換は一意ではなく、同じ論理プランから無数の物理プランが導けます。上の例でも customers をブロードキャストせずシャッフルする物理プランもあり得ます。どれを選ぶかを決めるのが、次のコストベース最適化です。
最適化の2系統:規則ベースとコストベース
プランの書き換えには、性質の異なる2つの系統があります。
| 観点 | 規則ベース最適化(RBO) | コストベース最適化(CBO) |
|---|---|---|
| 判断基準 | 常に得になる変換規則を無条件に適用 | 統計から複数候補のコストを見積もり最安を選択 |
| 代表例 | 述語プッシュダウン、射影プルーニング、定数畳み込み | 結合順序、結合方式、ブロードキャスト可否 |
| 必要な情報 | 不要(プラン構造だけで判断) | 行数・カーディナリティ・データサイズの統計 |
| 外すリスク | ほぼ無い(等価変換で常に改善) | 統計が古い・不正確だと誤った物理プランを選ぶ |
規則ベース最適化(RBO:Rule-Based Optimization) は、「この形が現れたら必ずこう書き換える」という適用すれば常に得になる等価変換を機械的に施します。後述する述語プッシュダウンや射影プルーニングは典型で、コスト計算なしに無条件で適用できます。
コストベース最適化(CBO:Cost-Based Optimization) は、複数の等価な物理プラン候補それぞれについて推定コストを計算し、最小のものを選びます。コストは主にスキャン量・ネットワーク転送量・CPUの重み付き和で、分散環境ではシャッフルによるネットワーク転送が支配的な項になります。ここで決定的に重要なのがカーディナリティ推定——各演算子が何行を出力するかの見積もりです。特に結合結果の行数はフィルタの選択率や結合の重複度に依存し、これを誤ると下流のコスト計算が連鎖的に外れます。
CBOの品質はテーブル統計(行数・列ごとのカーディナリティ・値の分布ヒストグラム) の鮮度に完全に依存します。統計が古いと、実際は数億行ある表を数千行と誤推定し、本来シャッフルすべき大表をブロードキャストしようとして全ノードのメモリを溢れさせる、といった破滅的なプランを選びかねません。だからこそ大量ロード後の統計再計算(ANALYZE)が実務で重要で、多くのエンジンは実行時に実際の行数を見て計画を修正する適応的クエリ実行(AQE:Adaptive Query Execution) を備え、推定誤りを走りながら是正します。
プッシュダウンとプルーニング:読む前に捨てる
分析基盤で最も費用対効果が高い最適化は、そもそも読むデータを減らすことです。数億行を読んでから捨てるのではなく、読む前に捨てる。これを実現するのが以下の3つで、いずれも規則ベースで無条件に適用されます。
- 述語プッシュダウン(predicate pushdown):
WHEREなどのフィルタ条件を、プラン木のできるだけ下(スキャンに近い側)へ押し下げる。上位で捨てる行を下位で先に捨てれば、その後の結合・集約が扱う行数が減り、シャッフルに乗るデータも減ります。真価は条件をストレージ層まで押し込めるときに出ます。Parquet/ORC(/data-engineering/columnar-parquet-orc/)は行グループごとに最小値・最大値の統計を持ち、条件に該当し得ない行グループをファイルを開いた時点で読み飛ばせます。オブジェクトストレージへのスキャン量課金を、条件がストレージまで届くか否かが桁で変えます。 - 射影プルーニング(projection pruning):クエリが使わない列をスキャン対象から外す。列指向フォーマットは列ごとに独立して格納されるので、
SELECTと結合・フィルタに登場する列だけを読み、残りのI/Oをまるごと省けます。SELECT *が分析基盤で嫌われるのはこのためです。 - パーティションプルーニング(partition pruning):日付などでディレクトリ分割された表(/data-engineering/partitioning-bucketing/)で、フィルタ条件に合致しないパーティションのファイル群を丸ごと読み飛ばす。
WHERE dt = '2026-06-01'なら他の日付ディレクトリは一切開きません。これはファイル単位・ディレクトリ単位の最も粗く最も効く枝刈りで、うまく効けばスキャン対象が全体の数百分の一になります。
プッシュダウン前: プッシュダウン後:
Filter(date >= '2026-06-01') Scan(orders,
└ Join(...) ─────▶ partitions: dt>=2026-06-01, ← パーティションプルーニング
└ Scan(orders 全体) columns: cust,amount,dt, ← 射影プルーニング
rowgroup filter: date>=...) ← 述語プッシュダウン
粒度の階層を押さえるのが肝心です。パーティションプルーニングがディレクトリ(ファイル群)単位、述語プッシュダウンの行グループ枝刈りがファイル内のブロック単位、そして実際の述語評価が行単位。上の粗い層で捨てられるほど、下の層に届くデータが減ります。この「読まずに済ませる」設計こそ、分散クエリ最適化の中核であり、単一DBのインデックス探索とは目的の異なるスキャン量そのものを削るアプローチです。
これらの最適化は、データの物理配置とファイル形式が前提条件を満たして初めて効きます。パーティションプルーニングは「よく絞る列でパーティションが切られている」ことが、行グループ枝刈りは「その列でデータがある程度ソートまたはクラスタ化されている」ことが必要です。列がファイル内に散らばっていれば統計の min/max 幅が広くなり、どの行グループも読む羽目になります。クエリの書き方より、パーティション列とファイルレイアウトの設計が、プルーニングの効き目を根本から決めます。
ステージ分割:物理プランをシャッフル境界で切る
物理プランが決まると、エンジンはそれを分散実行できる単位に刻みます。ここで境界になるのがシャッフル(データ再分散) です。各ノードが自分のローカルデータだけで進められる演算子の連なりは1つのステージ(stage) にまとめられ、シャッフルが必要な箇所がステージの切れ目になります。Sparkが系譜(lineage)をDAGにしてシャッフル境界でステージへ切る仕組み(/data-engineering/spark-rdd-dag/)は、まさにこの物理プラン分割の具体化です。
(両表が大きくブロードキャストできず、シャッフル結合を選んだ場合)
Stage 1: Scan(orders) → Filter ← 各パーティションで独立・並列
Stage 1': Scan(customers) ← 同じく独立・並列
│
▼ Exchange / シャッフル(両表を結合キーで再分散) ← ステージ境界=同期点
│
Stage 2: ShuffleHashJoin → 部分集約 ← 再分散後のデータで並列
│
▼ Exchange / シャッフル(集約キーで再分散)
│
Stage 3: 最終集約 → 結果
なぜシャッフルが境界になるのか。ステージ内の演算子はパーティション単位で完全に独立しており、各パーティションを1つのタスクとして並列に走らせられます。しかしシャッフルは全ノードから全ノードへデータを配り直す(全対全通信) ため、下流ステージは上流ステージの全タスクの完了を待たねば始められません。つまりステージ境界はクラスタ全体の同期点であり、シャッフルの前後で並列実行が一度収束します。だからシャッフルは、ネットワーク帯域の消費と同期の待ちという二重の意味で、分散実行の最大のコストになります。
ここでコストベース最適化の狙いが明確になります。結合順序と結合方式の選択は、突き詰めればシャッフルの総量を最小化する問題に帰着します。小さい表をブロードキャストできればシャッフルが1つ消え、ステージ境界が1つ減ります。結合順序を工夫して中間結果が小さくなる順に結合すれば、後続のシャッフルに乗るデータ量が減ります。プランナは候補ごとにこの「どれだけデータが動くか」を統計から見積もり、最も動かさずに済むプランを選ぼうとします。
押さえるべき勘所。(1) 論理プラン=何を計算するか、物理プラン=どう計算するか(アルゴリズム+データ移動)。分散では物理プランに Exchange(シャッフル/ブロードキャスト) が現れる。(2) 最適化は2系統。規則ベース(RBO) は述語プッシュダウン・射影プルーニングなど常に得な等価変換、コストベース(CBO) は統計から結合順序・結合方式を選ぶ。(3) CBOはカーディナリティ推定と統計の鮮度が命で、外すと大表をブロードキャストして破綻する。AQE は実測で是正する。(4) 分析基盤の要は読む前に捨てる:パーティションプルーニング(ディレクトリ単位)→述語プッシュダウン(行グループ単位)→射影プルーニング(列を削る)の粒度階層。効くかは物理設計(パーティション列・ファイルレイアウト) で決まる。(5) 物理プランはシャッフル境界でステージ分割され、境界は全対全通信+同期点。CBOの目的は結局シャッフル総量の最小化。「オプティマイザ任せ」ではなく「プルーニングとコロケーションが効く設計」が前提。
まとめ
- 分散クエリプランナは、宣言的なSQLから等価な実行プランの中で最も安いものを選ぶ。単一DBと違い、演算子の並びだけでなくデータをどのノードへ動かすか(シャッフル・ブロードキャスト) までコストに含める。
- 論理プランは「何を計算するか」、物理プランは「どう計算するか(アルゴリズム+データ移動)」。分散の物理プランには Exchange 演算子(シャッフル/ブロードキャスト)が現れ、同じ論理プランから無数の物理プランが導ける。
- 最適化は規則ベース(常に得な等価変換) とコストベース(統計から結合順序・方式を選択) の2系統。CBOはカーディナリティ推定と統計の鮮度に依存し、外すと破滅的なプランを選ぶため、
ANALYZEと AQE(適応的クエリ実行) が実務で効く。 - 分析基盤で最も効くのは読む前に捨てる最適化。パーティションプルーニング(ディレクトリ単位)→述語プッシュダウン(行グループ単位)→射影プルーニング(列削減) の粒度階層で、スキャン量とI/Oを桁で削る。効くか否かはパーティション列とファイルレイアウトの物理設計が決める。
- 物理プランはシャッフル境界でステージに分割され、境界は全対全通信であり同期点になる。コストベース最適化の本質は、結合順序・結合方式の選択を通じたシャッフル総量の最小化にある。
データ工学 Article
分散クエリプランニングと最適化を実務で読む
TL;DRは入口です。実際に選ぶ・使う段階では、何を解決するか、何と比較するか、導入後にどこで詰まるかまで見る必要があります。
解決すること
クエリ最適化
比較で見る軸
難易度: advanced / カテゴリ: データ工学 / タグ数: 6
導入後に効く点
分析基盤で最も効く最適化は、読む前に捨てる系のプッシュダウンだ。述語プッシュダウンと射影プルーニングでスキャンする行と列を削り、パーティションプルーニングでディレクトリ単位のファイル群を丸ごと読み飛ばす。これらはストレージ(Parquet/オブジェクトストレージ)まで押し込めて初めて、スキャン量課金とI/Oを桁で減らす。
先に潰すリスク
用語だけ覚えても、設計・実装・運用でどこに効くかを確認しないと判断を誤る。
- 難易度
- advanced
- カテゴリ
- データ工学
- タグ数
- 6
判断チェックリスト
- 自社の用途が「クエリ最適化 / 分散処理」に近いか確認する。
- 強みである「分散クエリエンジン(Spark SQL/Presto・Trino/BigQuery等)は、SQLをまず論理プラン(何を計算するか)に変換し、規則ベースの書き換えとコストベース最適化を経て、シャッフルを含む物理プランに落とす。1台のDBと決定的に違うのは、演算子の並びだけでなくデータをどのノードへ動かすか(分散・シャッフル)まで含めてコストを見積もる点にある。」が本当に評価軸になるか確認する。
- 注意点の「用語だけ覚えても、設計・実装・運用でどこに効くかを確認しないと判断を誤る。」を運用で吸収できるか確認する。
- 公開値や仕様値は、対象プラン・対象機種・対象リージョンまで確認する。
- 既存システム、ID、ネットワーク、監視、バックアップとの接続方法を先に洗い出す。
- 小さく試してから、本番移行、権限設計、障害時手順、コスト監視を決める。