分散集約と部分集約・近似集約(HLL/t-digest)
巨大データの集計が少ないネットワーク転送で速く終わる理由が分かります。ノードローカルの部分集約とコーディネータでの最終マージ、近似スケッチが効く代数的条件まで原理から押さえられます。
- 1.分散集約は各ノードで部分集約し、コーディネータが部分結果だけをマージする。転送量がグループ数に縮むため、生データを全部集める方式より桁で速い。
- 2.この二段集約が成立する鍵は集約関数の代数的性質。SUM/COUNT/MIN/MAX のような分配的・代数的な関数は中間状態をマージできるが、MEDIAN や COUNT(DISTINCT) のような全体的な関数は素朴にはマージできない。
- 3.全体的な関数も、HyperLogLog(基数)や t-digest(分位点)のようなマージ可能スケッチに置き換えれば、固定サイズの中間状態で近似的に分散集約できる。
分散集約という問題設定
SELECT region, SUM(amount), COUNT(DISTINCT user_id) FROM sales GROUP BY region のような集計を、データが複数ノードに分かれて置かれた分散データベースや MPP(massively parallel processing)エンジンで実行することを考えます。素朴に全ノードの生データを1か所に集めてから集約すれば正しい答えは出ますが、ネットワーク転送が全行ぶんに膨らみ、集約地点がボトルネックになります。
そこで使われるのが二段集約です。各ノードが手元のデータをまず**部分集約(partial aggregation)し、コーディネータ(または集約担当ノード)が部分結果だけを集めて最終マージ(final merge)**する。本稿は、この分割がなぜ正しく成立するのか、どんな集約関数で成立するのか、そして素朴には分割できない関数を近似スケッチで救う原理を解説します。
二段集約:部分集約と最終マージ
二段集約は集約処理を partial(部分集約)と final(最終マージ)の2フェーズに分けます。
-- フェーズ1: 各ノードでローカルに部分集約
各ノード i:
for each row in ローカルデータ:
g = LocalState[ row.group_key ]
accumulate(g, row) -- 中間状態を更新
部分結果 PartialState_i を出力 -- グループ単位の中間状態
-- フェーズ2: コーディネータで最終マージ
for each ノード i:
for each (key, state) in PartialState_i:
merge(FinalState[key], state) -- 同一キーの中間状態を結合
for each key in FinalState:
finalize(FinalState[key]) -- 中間状態を最終値へ変換
ここで効く性質が2つあります。第一に、フェーズ1の出力がグループ数ぶんに縮むこと。1億行が1万グループに集計されれば、転送するのは1万件の中間状態だけで、ネットワーク量が劇的に減ります。第二に、同一グループキーの中間状態を結合(merge)して最終値が変わらないこと。後者が成立する条件こそ集約関数の代数的性質です。
なお GROUP BY の分散には、キーをハッシュで再分配してから集約する方式(シャッフル集約)もあります。これは部分集約と相補的で、しばしば「部分集約 → シャッフル → 最終集約」と組み合わさります。再分配の原理は分散結合とシャッフルと同型です。
集約関数の代数的分類
二段集約が使えるかは、集約関数が中間状態をマージできるかにかかります。Gray らの分類(データキューブ論文)に従うと、集約関数は3種類に分けられます。
| 分類 | 定義 | 例 | 二段集約 |
|---|---|---|---|
| 分配的(distributive) | 全体の集約が部分集約の同じ関数の再適用で求まる | COUNT・SUM・MIN・MAX | そのまま可(中間状態=結果型) |
| 代数的(algebraic) | 固定サイズの中間状態を経由すれば部分集約から合成できる | AVG・分散・標準偏差 | 可(中間状態を設計する) |
| 全体的(holistic) | 固定サイズの中間状態で表せず、全要素の保持が原理的に必要 | MEDIAN・分位点・COUNT(DISTINCT)・MODE | 素朴には不可(近似で救う) |
分配的関数は最も素直です。SUM は「部分和の和」、COUNT は「部分カウントの和」、MIN/MAX は「部分最小/最大の最小/最大」。注意すべきは COUNT で、最終マージ側は受け取った部分カウントを足すのであって数え直さない点です(merge は SUM になる)。
代数的関数は、結果そのものは直接マージできないが、適切な中間状態を介せばマージできます。AVG の中間状態は (sum, count) のペアで、マージは要素ごとの加算、最終化(finalize)は sum / count。分散も (count, sum, sumsq) を持てば合成できます。中間状態のサイズがデータ量によらず定数である点が代数的関数の要件です。
-- AVG を代数的に分散集約する
partial: state = (sum=Σ amount, count=件数)
merge: (s1,c1) + (s2,c2) = (s1+s2, c1+c2) -- 要素ごとの加算
finalize: avg = sum / count -- 最後に1回だけ割る
全体的関数は、固定サイズの中間状態では表現できないものです。中央値や分位点は順序統計量で、部分集約後の「上位ノードの中央値」を集めても全体の中央値は復元できません。COUNT(DISTINCT) も同様で、各ノードのローカル distinct 数を足すと、ノード間で重複する値を二重に数えてしまいます。これらが分散集約の難所です。
COUNT(DISTINCT user_id) を「各ノードのローカル distinct を SUM」で求めるのは誤りです。同じ user_id が複数ノードに現れると重複加算になり、過大評価します。正しく素朴に解くにはノードをまたいで全 distinct 値を突き合わせる必要があり、これは全要素の集約(=全体的関数)と同じ困難に帰着します。だから近似スケッチが要るのです。
全体的関数を救う:マージ可能スケッチ
全体的関数を分散集約するには、固定サイズで**マージ可能(mergeable)**なデータ構造でその関数を近似します。マージ可能とは、2つのスケッチを結合した結果が、両者の元データを連結して1から作ったスケッチと(誤差の範囲で)一致する性質です。これがあれば、スケッチをそのまま代数的関数の中間状態のように扱えます。
基数:HyperLogLog(HLL)
COUNT(DISTINCT) の近似には HyperLogLog を使います。各値をハッシュし、ハッシュ値先頭の連続 0 の最大数をレジスタに記録する構造で、メモリは distinct 数によらずレジスタ数だけで固定(16384 レジスタで約 12 KB、標準誤差およそ 0.8%)です。アルゴリズムの詳細はカーディナリティ推定とヒストグラム・スケッチを参照してください。
分散集約で決定的に効くのがマージの単純さです。2つの HLL のマージは、同じレジスタ位置どうしで最大値を取るだけ。
-- HLL のマージ(レジスタ単位の max)
for j in 0 .. m-1:
merged.register[j] = max(A.register[j], B.register[j])
なぜ max でよいか。各レジスタは「そのバケットに落ちた値の中での最大 rank」を持つので、両ノードのデータを連結したときの最大 rank は、各ノードの最大 rank の大きいほうに一致します。重複する値が両ノードにあっても、同じ値は同じレジスタの同じ rank を生むため、max を取れば自然に重複排除されます。これが「ローカル distinct を足す」誤りを構造的に回避します。マージは可換・結合的なので、ノードを任意の順や木構造で集約してよく、並列集約と相性が良いのも利点です。
分位点:t-digest
中央値や P99 のような分位点の近似には t-digest(あるいは KLL・GK といった分位点スケッチ)を使います。t-digest はソート済みの値域を重み付きセントロイド(centroid、代表値と重みの組)の集合に圧縮した構造です。要点は、分布の裾(0 や 1 に近い分位点)ほどセントロイドを細かく、中央ほど粗く配置すること。これにより P99・P999 のような端の分位点を高精度に保ちつつ、全体は固定サイズに収まります。
-- t-digest の構造(概念)
centroids = [ (mean_1, weight_1), (mean_2, weight_2), ... ] -- mean 昇順
不変条件: 累積重みが端(0/1 付近)ほど、各セントロイドの重み上限が小さい
分位点 q の推定: 累積重みが q に達する位置のセントロイドを線形補間
t-digest もマージ可能です。2つの digest のセントロイドを全部集めて mean 順に並べ、重み上限を超えない範囲で隣接セントロイドを再統合すれば、1つの digest に畳めます。これにより各ノードがローカルに作った digest をコーディネータが結合し、全体の任意分位点を近似できます。「各ノードの中央値を集めても全体の中央値は出ない」という全体的関数の壁を、マージ可能スケッチが越えるわけです。
近似集約はスケッチと集約の対応で選びます。基数(distinct 数)は HLL、分位点・中央値・P99 は t-digest(や KLL/GK)、上位頻出要素(heavy hitters)は Count-Min Sketch や Space-Saving。いずれも固定サイズ・マージ可能が共通の武器で、目的に合わないスケッチを使うと無駄な誤差や容量を招きます。
厳密 / 近似のトレードオフ
近似集約は無料ではありません。何を引き換えに何を得るのかを整理します。
| 観点 | 厳密集約(全要素保持) | 近似集約(スケッチ) |
|---|---|---|
| 中間状態サイズ | distinct 数 / 要素数に比例 | 固定(HLL 数 KB、t-digest 数 KB) |
| 転送量・メモリ | 高カーディナリティで膨張 | グループ数 × 定数で予測可能 |
| 結果 | 正確 | 誤差あり(HLL 約 1%、t-digest 端で高精度) |
| マージ | 全要素の突き合わせが必要 | 可換・結合的で並列・再帰マージ可 |
| 向く場面 | 正確さ必須/低カーディナリティ | ダッシュボード・監視・探索的分析 |
判断の軸はカーディナリティと正確性の要求です。distinct 数が小さければ厳密 COUNT(DISTINCT) でもメモリに収まり近似は不要。逆に数千万 distinct を多数のグループで集計するダッシュボードでは、厳密値の転送・保持コストが支配的になり、固定サイズで済むスケッチが圧倒的に有利です。多くの DWH が APPROX_COUNT_DISTINCT や APPROX_PERCENTILE を備えるのはこのためです。OLAP の文脈はデータウェアハウス、確率的データ構造の基礎はブルームフィルタと確率的データ構造もあわせて参照してください。
分散集約=部分集約(ノードローカル)+最終マージ(コーディネータ)で、転送量がグループ数に縮むことを即答できるように。集約関数の分類(分配的=SUM/COUNT/MIN/MAX、代数的=AVG は (sum,count) を介す、全体的=MEDIAN/COUNT(DISTINCT))と、全体的関数は素朴にはマージできない理由を説明できることが重要です。救済策がマージ可能スケッチで、基数は HLL(レジスタ単位の max でマージ=自然な重複排除)、分位点は t-digest(裾を細かく持ちセントロイドを再統合)と対応づけられれば十分です。
まとめ
- 分散集約は各ノードでの部分集約とコーディネータでの最終マージの二段。出力がグループ数に縮むため、生データを全部集める方式よりネットワーク転送が桁で少なくなる。
- 成立条件は集約関数の代数的性質。分配的(SUM/COUNT/MIN/MAX)はそのまま、代数的(AVG は
(sum,count)を介す)は中間状態を設計すればマージ可能。全体的(MEDIAN・COUNT(DISTINCT))は固定サイズの中間状態で表せず素朴には不可。 - 特に
COUNT(DISTINCT)はローカル distinct を足すと重複加算で過大評価になる。これが全体的関数の難しさの典型。 - 全体的関数はマージ可能スケッチで近似的に分散集約できる。基数は HLL(レジスタ単位 max で重複を自然排除)、分位点は t-digest(裾を細かく、マージはセントロイド再統合)。固定サイズ・可換結合マージが共通の武器。
- 近似は誤差と引き換えに転送量・メモリを定数化する。高カーディナリティのダッシュボードや監視では
APPROX_COUNT_DISTINCT/APPROX_PERCENTILEが定石となる。
データベース Article
分散集約と部分集約・近似集約(HLL/t-digest)を実務で読む
TL;DRは入口です。実際に選ぶ・使う段階では、何を解決するか、何と比較するか、導入後にどこで詰まるかまで見る必要があります。
解決すること
分散集約
比較で見る軸
難易度: advanced / カテゴリ: データベース / タグ数: 6
導入後に効く点
この二段集約が成立する鍵は集約関数の代数的性質。SUM/COUNT/MIN/MAX のような分配的・代数的な関数は中間状態をマージできるが、MEDIAN や COUNT(DISTINCT) のような全体的な関数は素朴にはマージできない。
先に潰すリスク
用語だけ覚えても、設計・実装・運用でどこに効くかを確認しないと判断を誤る。
- 難易度
- advanced
- カテゴリ
- データベース
- タグ数
- 6
判断チェックリスト
- 自社の用途が「分散集約 / 部分集約」に近いか確認する。
- 強みである「分散集約は各ノードで部分集約し、コーディネータが部分結果だけをマージする。転送量がグループ数に縮むため、生データを全部集める方式より桁で速い。」が本当に評価軸になるか確認する。
- 注意点の「用語だけ覚えても、設計・実装・運用でどこに効くかを確認しないと判断を誤る。」を運用で吸収できるか確認する。
- 公開値や仕様値は、対象プラン・対象機種・対象リージョンまで確認する。
- 既存システム、ID、ネットワーク、監視、バックアップとの接続方法を先に洗い出す。
- 小さく試してから、本番移行、権限設計、障害時手順、コスト監視を決める。