SparkのRDDとDAG実行
SparkがなぜMapReduceより速いのかを原理からつかむために。遅延評価で変換を系譜(lineage)として溜め、DAGスケジューラがシャッフル境界でステージへ切り、パーティションをタスクとして並列実行する仕組みを解説します。
- 1.RDDは分割された不変の分散コレクションで、mapやfilterなどの変換は即座に計算されず系譜(lineage)として記録されるだけ。collectやcountなどのアクションが呼ばれて初めて計算が起動する(遅延評価)。
- 2.アクション実行時、DAGスケジューラは系譜をたどり、ワイド依存(シャッフルを伴う)を境界にしてDAGをステージへ分割する。各ステージ内はナロー依存だけなのでパイプライン実行でき、1パーティションが1タスクになって並列に走る。
- 3.MapReduceが各段で中間結果をディスクへ書くのに対し、Sparkはナロー依存の連鎖をメモリ上でパイプライン処理し、再利用するRDDだけを明示的にキャッシュする。反復計算や対話的分析で桁違いに速いのはこのためで、遅い箇所はシャッフル境界に集中する。
RDDとは何か:分散された不変のコレクション
Sparkの計算モデルの中核は RDD(Resilient Distributed Dataset:耐障害性のある分散データセット) です。名前が示す通り、これは3つの性質を持つデータ構造です。分散(クラスタの複数ノードにパーティション分割されて置かれる)、不変(一度作ったら書き換えない。変換すると新しいRDDができる)、そして耐障害性(一部が失われても再計算で復元できる)。
利用者から見るとRDDは「1つの巨大なコレクション」ですが、内部では複数の パーティション(partition) に分かれています。パーティションこそが並列処理と障害復旧の単位で、Sparkは1パーティションにつき1タスクを実行します。HDFS上のファイルを読めば既定でブロック(既定128MB)ごとに1パーティションができ、パーティション数がそのステージの並列度の上限を決めます。
ここで決定的に重要なのが、RDDがデータそのものを保持するとは限らない点です。RDDが本当に持っているのは「どの親RDDから、どの変換で、どう自分を計算するか」という系譜(lineage:リネージ) の情報です。実データは、必要になったときにこの系譜をたどって計算されます。
遅延評価:変換は記録され、アクションで初めて走る
RDDの操作は2種類に厳密に分かれます。この区別が遅延評価の土台です。
| 種別 | 何をするか | 評価タイミング | 代表例 |
|---|---|---|---|
| 変換(transformation) | 既存RDDから新しいRDDを定義する | 遅延(すぐには計算しない) | map / filter / flatMap / reduceByKey / join |
| アクション(action) | 計算を起動して値かファイルを返す | 即時(ここで計算が走る) | collect / count / reduce / take / saveAsTextFile |
変換はすべて遅延(lazy) です。map や filter を呼んでも、その場では1バイトも処理されません。Sparkは「この親RDDにこの変換を適用する」という事実を系譜に書き足すだけで、新しいRDDを表すオブジェクトを返します。実際の計算が始まるのは、collect(結果をドライバへ集める)や count(件数を数える)といったアクションが呼ばれた瞬間です。
rdd = sc.textFile("logs.txt") # 変換: まだ何も読まない。ファイルへのポインタ
errs = rdd.filter(_ contains "ERROR") # 変換: filter を系譜に足すだけ
upts = errs.map(parseTimestamp) # 変換: map を系譜に足すだけ
upts.count() # アクション: ここで初めて全体が計算される
なぜ遅延にするのか。アクションが呼ばれるまで全体像が確定しないから、そこでまとめて最適化できるからです。上の例なら、filter と map は連続するナロー依存(後述)なので、Sparkは1行を読むごとに「フィルタ→パース→カウント」を1パスでパイプライン実行でき、中間RDDを丸ごとメモリに実体化せずに済みます。filter の時点で即座に全件を実体化していたら、この融合はできません。
RDDが実データではなく系譜を持つことは、そのままフォールトトレランスの仕組みになります。あるパーティションが載っていたノードが落ちても、Sparkはそのパーティションの系譜(親RDDと適用する変換の連鎖)をたどり、失われたパーティションだけを再計算できます。データ全体のレプリケーションを必須にせず、計算のやり直しで復元するのがSparkの立場です。だから変換は決定的(同じ入力から同じ出力)である必要があります。
ナロー依存とワイド依存:DAGを切る境界
系譜は、RDDどうしの依存関係のグラフとして表せます。各RDDは1つ以上の親を持ち、この親子関係の向きは循環しないので、全体は DAG(有向非巡回グラフ) になります。この依存には2種類あり、その違いが実行計画のすべてを決めます。
- ナロー依存(narrow dependency):親RDDの各パーティションが、子RDDの高々1つのパーティションにしか使われない。
map、filter、unionなどが該当。データはパーティションをまたいで動かず、各パーティションは独立に計算できる。 - ワイド依存(wide dependency):子RDDの1つのパーティションが、親RDDの複数(多くは全部)のパーティションに依存する。
groupByKey、reduceByKey、join、repartitionなどが該当。同じキーを1か所へ集めるため、ノード間でデータを再配置する必要がある。
ナロー依存(map/filter) ワイド依存(reduceByKey)
親P0 ── 子P0 親P0 ─┐
親P1 ── 子P1 親P1 ─┼─→ 子P0(key=a を集約)
親P2 ── 子P2 親P2 ─┘
※各パーティションが独立 ※全親から該当キーを掻き集める
ワイド依存を実現する全ノード間のデータ再配置が シャッフル(shuffle) です。シャッフルは、各パーティションを読んでキーごとに振り分け直し、ネットワーク越しに送り、受け手側で該当キーを集める all-to-all の操作で、Spark全体で最も高コストな処理です。ディスク書き出しとネットワーク転送を伴うため、「どこでシャッフルが起きるか」を見切ることが性能理解の核心になります。
groupByKey().mapValues(sum) と reduceByKey(_ + _) はどちらも同じ合計を出しますが、内部は別物です。reduceByKey はシャッフル前に各パーティションで部分集約(map側結合) してから送るため、ネットワークに流れるデータ量が激減します。一方 groupByKey は同一キーの値を全部そのままシャッフルで集めてから畳むので、キーに値が偏ると1ノードにデータが集中し、メモリ不足やスキューの温床になります。「同じ結果でも系譜の組み方でシャッフル境界の重さが変わる」——これが遅延評価下でのチューニングの勘所です。
ステージとタスク:DAGスケジューラの分割
アクションが呼ばれると、DAGスケジューラがRDDの系譜(DAG)を実行計画へ変換します。手順は明快です。アクションの対象RDDから親へさかのぼり、ワイド依存(シャッフル)に出会うたびにそこで切る。こうしてDAGは複数の ステージ(stage) に分割されます。
- ステージの内部はナロー依存だけで構成される。だからステージ内の変換の連鎖は、パーティションごとに1本のパイプラインとして融合実行できる(中間RDDを実体化しない)。
- ステージの境界は必ずシャッフル。前段ステージが全パーティションのシャッフル出力を書き終えないと、後段ステージは始められない。ここがステージ間の同期点になる。
ステージにはシャッフル出力を生む中間ステージ(ShuffleMapStage)と、結果をアクションへ返す最終ステージ(ResultStage)の区別があります。各ステージは、その担当パーティションの数だけの タスク(task) に展開されます。
系譜: textFile → map → filter →[シャッフル]→ reduceByKey → collect
└─────── Stage 1 ───────┘ └──── Stage 2 ────┘
(ShuffleMapStage: 出力を書く) (ResultStage: 結果を返す)
Stage 1 が 3 パーティション → タスク3個(各パーティションを独立処理)
Stage 2 が 2 パーティション → タスク2個(Stage 1 のシャッフル出力を読む)
タスクこそが実際にexecutor(ワーカー上のJVMプロセス)へ送られて動く最小単位です。DAGスケジューラがステージを切り、下位の タスクスケジューラ が各タスクをクラスタの空きスロットへ割り当てます。1タスク=1パーティションの処理なので、あるステージの並列度はそのパーティション数で頭打ちになり、コア数より多くのタスクは順番待ちになります。
Spark UIやチューニングでDAGを読むときは、ステージの数がシャッフルの回数だと考えると速く見通せます。ステージ間には「前段の全タスク完了を待つ」バリアがあるため、遅い1タスク(データスキューや遅いノード)が1つあるだけでステージ全体が引きずられます(ストラグラー問題)。逆にステージ内はパイプライン化されて軽いので、最適化の狙い目は常にステージ境界=シャッフルに集中します。パーティション数の調整・部分集約・ブロードキャスト結合はいずれもシャッフル境界を軽くする手です。
メモリ計算の利点:なぜMapReduceより速いのか
Sparkの速さの本質は、しばしば「インメモリだから」と一言で語られますが、正確には中間結果の扱い方にあります。古典的なMapReduceは、Map→Reduceの各段の間で必ず中間結果をディスク(HDFS)へ書き、次のジョブがそれを読み直します。反復アルゴリズム(機械学習の学習ループ、グラフ処理など)では、この「毎段ディスクへ書いて読む」往復が支配的なコストになります。
Sparkは2つの点でこれを避けます。第一に、ナロー依存の連鎖をメモリ上でパイプライン融合し、ステージ内では中間RDDをディスクにもメモリにも実体化しません。ディスクI/Oが発生するのは原理的にシャッフル境界だけです。第二に、繰り返し使うRDDを明示的にキャッシュできます。
rdd = load(...).map(...).filter(...) # 変換(まだ計算されない)
rdd.cache() # 「実体化したらメモリに残せ」と指示(これも遅延)
rdd.count() # 1回目: 実際に計算し、結果をメモリに保持
for i in 1..100: # 反復:
model = train(rdd, model) # 毎回ディスクを読まず、メモリ上のrddを再利用
cache()(= persist(MEMORY_ONLY))は「このRDDが次に実体化されたとき、各ノードは自分の担当パーティションをメモリに保持し、以降のアクションで再利用せよ」という指示です。注意点として、cache() 自体も遅延で、最初のアクションが走って初めてメモリに載ります。反復のたびに系譜を最初から再計算する無駄が消えるため、「10倍以上速くなることも珍しくない」と公式が言うのはこの再利用が効くケースです。
押さえるべき勘所。(1) 変換は遅延・アクションで起動。map を100個つないでも、count を呼ぶまで何も走らない。(2) ステージ境界=ワイド依存=シャッフル。ステージ数はシャッフル回数で、ステージ内はナロー依存のパイプライン。(3) 1パーティション=1タスクで、並列度はパーティション数が上限。(4) 耐障害性は系譜による再計算で担保し、シャッフル出力は再計算回避のためSparkが自動でディスクに保持する。(5) 速さの理由は「インメモリ」より正確には中間結果をディスクに書かずパイプライン融合し、再利用RDDだけをキャッシュする点。「cacheすれば必ず速い」ではなく、複数回使うRDDにだけ効く(1回しか使わないRDDのcacheはメモリを食うだけ無駄)。
なお、以上はRDDという低レベルAPIの実行モデルです。実務で多いDataFrame/Dataset APIは、この上にCatalystオプティマイザとTungsten実行エンジンを重ね、宣言的なクエリを解析して同種のステージ/タスクへ落とし込みます。列指向のメモリ表現や述語プッシュダウンなどの最適化が加わりますが、「遅延評価→DAG→シャッフル境界でステージ分割→パーティション単位のタスク」という骨格は共通です。したがってRDDとDAG実行を理解することが、Sparkのあらゆるレイヤを読む土台になります。分散処理一般の考え方は /devops/、機械学習ワークロードとの接続は /ai/ も参照してください。
まとめ
- RDD は分割された不変の分散コレクションで、実データではなく系譜(lineage) を持つ。系譜が再計算による耐障害性の源になる。
- 操作は変換(遅延) とアクション(即時起動) に分かれる。変換は系譜へ記録されるだけで、アクションが呼ばれて初めて全体が計算される(遅延評価)。
- 依存にはナロー依存(親パーティションが子の高々1つに使われる/独立計算可)とワイド依存(子が複数の親に依存/シャッフルが必要)があり、後者が最もコストの高い all-to-all のデータ再配置を生む。
- DAGスケジューラは系譜をワイド依存=シャッフルで切ってステージに分割する。ステージ内はナロー依存のパイプライン、ステージ境界はシャッフルという同期点。各ステージは1パーティション=1タスクに展開される。
- 速さの本質は「インメモリ」よりも、中間結果をディスクに書かずナロー依存を融合実行し、再利用するRDDだけを明示キャッシュする点。ディスクI/Oは原理的にシャッフル境界に集中する。
- DataFrame/DatasetはこのモデルにCatalyst/Tungstenの最適化を重ねるが、遅延→DAG→ステージ→タスクの骨格は共通。
データ工学 Article
SparkのRDDとDAG実行を実務で読む
TL;DRは入口です。実際に選ぶ・使う段階では、何を解決するか、何と比較するか、導入後にどこで詰まるかまで見る必要があります。
解決すること
Spark
比較で見る軸
難易度: advanced / カテゴリ: データ工学 / タグ数: 6
導入後に効く点
アクション実行時、DAGスケジューラは系譜をたどり、ワイド依存(シャッフルを伴う)を境界にしてDAGをステージへ分割する。各ステージ内はナロー依存だけなのでパイプライン実行でき、1パーティションが1タスクになって並列に走る。
先に潰すリスク
用語だけ覚えても、設計・実装・運用でどこに効くかを確認しないと判断を誤る。
- 難易度
- advanced
- カテゴリ
- データ工学
- タグ数
- 6
判断チェックリスト
- 自社の用途が「Spark / RDD」に近いか確認する。
- 強みである「RDDは分割された不変の分散コレクションで、mapやfilterなどの変換は即座に計算されず系譜(lineage)として記録されるだけ。collectやcountなどのアクションが呼ばれて初めて計算が起動する(遅延評価)。」が本当に評価軸になるか確認する。
- 注意点の「用語だけ覚えても、設計・実装・運用でどこに効くかを確認しないと判断を誤る。」を運用で吸収できるか確認する。
- 公開値や仕様値は、対象プラン・対象機種・対象リージョンまで確認する。
- 既存システム、ID、ネットワーク、監視、バックアップとの接続方法を先に洗い出す。
- 小さく試してから、本番移行、権限設計、障害時手順、コスト監視を決める。