Streams API(バックプレッシャと内部キュー)
巨大データを少ないメモリで安全に流せる理由が腑に落ちる。内部キューと高水位線、バックプレッシャの伝播、リーダー/ライターのロックを原理から正確に解きます。
- 1.各ストリームは内部キューを持ち、高水位線(highWaterMark)と sizeAlgorithm から desiredSize を算出する。desiredSize が 0 以下になると「もう要らない」という圧力が生産側へ伝わり、これがバックプレッシャの実体。
- 2.pipeTo / pipeThrough は消費側の desiredSize を生産側の pull 呼び出しへ橋渡しし、TransformStream は writable 側と readable 側の 2 キューを内部で連結してバックプレッシャを上流へ素通しする。
- 3.ReadableStream は getReader()、WritableStream は getWriter() で排他ロックされ、ロック中は元ストリームを直接操作できない。releaseLock() で解放するまで pipe や別リーダーは使えない。
Streams API は「データを少しずつ流す」だけの仕組みに見えますが、その核心はむしろ流れすぎを止めることにあります。生産側が消費側より速いと、未処理データが無限に溜まってメモリを食い潰します。これを防ぐのが内部キューと高水位線(highWaterMark)に基づくバックプレッシャで、Streams 標準はこの圧力を生産側へ自動で伝える機構を厳密に定義しています。ここでは ReadableStream / WritableStream / TransformStream の内部キュー構造、desiredSize の算出、圧力の伝播経路、そしてリーダー/ライターのロック機構を原理から見ていきます。ボディが ReadableStream である Fetch API の内部 を押さえておくと、各概念が実務とつながります。
内部キューと高水位線:圧力はどう測られるか
すべてのストリームは内部キュー(チャンクの待ち行列)を 1 本以上持ちます。キューに溜まったチャンクの総「サイズ」と、許容上限である**高水位線(highWaterMark, 以下 HWM)**との差分が、ストリームの空き具合を表す desiredSize です。定義はシンプルで、desiredSize = highWaterMark - queueTotalSize です。ここで queueTotalSize は、各チャンクに size(chunk) を適用して足し合わせた値です。
チャンクのサイズをどう測るかは **queuing strategy(キュー戦略)**が決めます。標準が用意するのは 2 種類です。
| 戦略 | size(chunk) の値 | HWM の単位 | 用途 |
|---|---|---|---|
| CountQueuingStrategy | 常に 1 | チャンク個数 | オブジェクト列・行データなど |
| ByteLengthQueuingStrategy | chunk.byteLength | バイト数 | バイナリ(Uint8Array 等) |
| 自前 strategy | 任意の size 関数 | 任意 | 重み付きの独自基準 |
たとえば new ReadableStream(source, new CountQueuingStrategy({ highWaterMark: 3 })) なら、キューに 3 チャンク溜まった時点で desiredSize が 0 になり、「これ以上は要らない」状態を示します。バイト基準なら TypedArray・ArrayBuffer のメモリレイアウト で扱う byteLength がそのまま重みになります。
desiredSize は HWM を超えてキューに積まれると負の値になります。たとえば HWM が 3 のキューに 5 チャンク入れば desiredSize は -2 です。負=「むしろ減らしてほしい」という強い圧力で、生産側はこの符号と大きさを見て送出ペースを落とします。desiredSize が null のときは、ストリームがエラー状態であることを意味します(クローズ済みの場合は null ではなく 0 を返します)。
ReadableStream:pull とバックプレッシャ
ReadableStream は start/pull/cancel を持つ underlying source(基盤ソース)から作ります。バックプレッシャの中核は pull がいつ呼ばれるかです。ストリームは内部キューに余裕がある(desiredSize が正)間だけ pull(controller) を呼んでソースに補充を促し、controller.enqueue(chunk) で積まれたチャンクでキューが満ちると pull の呼び出しを止めます。消費側が read() でチャンクを引き取ってキューが空けば、再び desiredSize が正に転じて pull が再開されます。
const stream = new ReadableStream({
pull(controller) {
// desiredSize が正の間だけブラウザが pull を呼ぶ。
// enqueue でキューが満ちたら、次の read まで pull は呼ばれない。
const chunk = produceOne();
if (chunk === null) controller.close();
else controller.enqueue(chunk);
}
}, new CountQueuingStrategy({ highWaterMark: 3 }));
ここが重要な点で、pull を呼ぶか止めるかという制御そのものがバックプレッシャです。生産側コードは「ペースを落とせ」という命令を直接受け取るのではなく、「呼ばれないこと」によって自然に待たされます。controller.desiredSize を自分で読めば、1 回の pull でどれだけ補充すべきかを判断する材料にもできます。
highWaterMark: 0 を指定すると desiredSize は最初から 0 で、ストリームは先読みを一切せず、read() が来た瞬間だけ pull を呼びます。プリフェッチによる無駄な取得を避けたい外部リソース(課金 API・大容量ソース)で有効です。逆に HWM を大きくすると先読みバッファが増え、スループットは上がる一方でメモリ使用が増えます。
WritableStream:ライター側の圧力
WritableStream は start/write/close/abort を持つ underlying sink(基盤シンク)へ書き込みます。こちらでも内部キューと HWM が効きますが、圧力の見方が逆向きです。writer.write(chunk) は Promise を返し、writer.desiredSize でシンクの空き具合が読めます。シンクの処理(write)が遅れてキューが HWM を超えると desiredSize が 0 以下になり、これが「書き込みを控えてくれ」という合図です。
正しい作法は、writer.ready の解決を待ってから次を書くことです。ready は desiredSize が正に戻るまで保留される Promise で、これを await することで生産側のペースがシンクの処理速度に同期します。
const writer = writable.getWriter();
for (const chunk of source) {
// ready が解決= desiredSize が正=書いてよい状態になるまで待つ。
// これを省くとキューが無限に膨らみメモリを食う。
await writer.ready;
writer.write(chunk); // この write 自体を await する必要はない
}
await writer.close();
各 write() を逐一 await すると 1 チャンクごとに完了を待つ直列処理になり、パイプラインの良さが消えます。バックプレッシャ制御は writer.ready で行い、write() 自体は投げっぱなしにするのが定石です。write() の Promise はそのチャンクのシンク処理完了を示し、エラー検知やフラッシュ完了の確認に使います。
バックプレッシャの伝播:pipeTo と pipeThrough
個々のストリームの圧力を鎖としてつなぐのが pipeTo/pipeThrough です。readable.pipeTo(writable) は、書き込み先 writable の desiredSize が 0 以下の間、読み取り元 readable からの read() を控えます。read を控えれば readable のキューが空かず、その desiredSize も下がり、結果として readable の pull が止まる――こうして圧力が消費側から生産側へ一段ずつ遡って伝わります。
// 圧力は writable → readable の向きに自動で遡る。
// 遅い writable があると、上流の fetch のボディ取得まで自然に減速する。
await response.body
.pipeThrough(new TextDecoderStream())
.pipeTo(slowWritable);
pipeThrough(transform) は内部的に readable.pipeTo(transform.writable) と transform.readable の返却を組み合わせたもので、複数の TransformStream を数珠つなぎにできます。手書きの read()/write() ループでバックプレッシャを正しく実装するのは難しいため、可能なら pipe 系に任せるのが安全です。
| 操作 | 戻り値 | バックプレッシャ |
|---|---|---|
| readable.pipeTo(writable) | 完了 Promise | writable の圧力が readable まで遡る |
| readable.pipeThrough(ts) | ts.readable | 下流の圧力が ts を通して readable まで遡る |
| 手動 read/write ループ | — | 自分で ready を待たないと壊れる |
TransformStream:2 つのキューを連結する
TransformStream は writable 側と readable 側の 2 つを 1 オブジェクトにまとめたものです。ts.writable へ書かれたチャンクが transform(chunk, controller) を通り、controller.enqueue() で ts.readable 側のキューへ流れます。内部にはキューが 2 本(writable 側と readable 側)あり、それぞれに HWM があります。
伝播はこうです。下流が遅くて ts.readable のキューが詰まると、transformer は新たな enqueue を控え、transform の呼び出しが進まなくなります。すると ts.writable 側のキューも空かず、その desiredSize が下がって上流に圧力が伝わる。TransformStream は 2 つのキューを連結することで、下流の圧力を上流へ素通しする継手として働きます。
// 1 行ごとに区切る変換器。下流が詰まれば transform が止まり、圧力が上流へ伝わる。
const lineSplitter = new TransformStream({
transform(chunk, controller) {
for (const line of chunk.split('\n')) controller.enqueue(line);
}
});
transform が Promise を返すと、ストリームはその解決を待ってから次のチャンクを処理します。これにより「1 チャンクの変換に時間がかかる処理」自体が自然なバックプレッシャ源になります。TextDecoderStream/TextEncoderStream/CompressionStream/DecompressionStream はブラウザ組み込みの TransformStream で、デコードや圧縮をパイプの途中に差し込めます。
リーダー/ライターのロック機構
ストリームの一貫性を守る最後の柱がロックです。readable.getReader() を呼ぶと ReadableStream はロックされ、以後その reader 経由でしか read() できません。ロック中は元の ReadableStream に対する pipeTo/pipeThrough/tee/別 reader の取得がすべて拒否されます。理由は明快で、複数の読み手が同じキューを同時に引くと、どのチャンクが誰に渡るか不定になり、ストリームの「順序保証・一度きり消費」が壊れるからです。WritableStream の getWriter() も同様にライターでロックします。
ロックは reader.releaseLock()/writer.releaseLock() で解放します。解放するまで、たとえ read を 1 つも発行していなくても元ストリームは触れません。
| 取得 | ロック対象 | ロック中にできないこと | 解放 |
|---|---|---|---|
| getReader() | ReadableStream | pipeTo / pipeThrough / tee / 別 getReader | releaseLock() |
| getWriter() | WritableStream | 別 getWriter / 直接 close | releaseLock() |
| pipeTo / pipeThrough | 両端を内部ロック | 完了まで手動操作 | 完了/中断で自動解放 |
const reader = readable.getReader(); // ここで readable はロックされる
const { value, done } = await reader.read();
reader.releaseLock(); // 解放するまで pipe も別リーダーも不可
await readable.pipeTo(writable); // 解放後なら OK
response.body.getReader() でリーダーを取った後に同じ body を pipeTo しようとすると TypeError(locked to a reader)になります。逆も同様で、pipeTo 実行中の body から getReader はできません。1 本のストリームに対し「手動 read ループ」か「pipe」のどちらか一方を選び、両方使いたいなら次の tee() で複製します。
tee:分岐と未読バッファの代償
readable.tee() は 1 本の ReadableStream を独立に読める 2 本へ複製します(元ストリームはロックされる)。中身を 2 通りに処理したい、あるいは読みつつ別経路にも流したいときに使います。ただし設計上の代償があり、2 本の消費速度がずれると、速い側のために遅い側がまだ読んでいないチャンクを内部でバッファし続け、メモリが膨らみます。tee はバックプレッシャを「遅い方に合わせて止める」のではなく「速い方を進めるためにバッファする」挙動を既定とする点に注意してください。
ストリームをワーカ間で移送する場合は、ReadableStream/WritableStream/TransformStream 自体が **transferable(転送可能)**であり、structuredClone と転送可能オブジェクト の仕組みで postMessage の transfer リストに載せて所有権ごと渡せます。コピーではなく移送なので、元コンテキストでは使えなくなります。
まとめ
Streams API のバックプレッシャは、内部キューの queueTotalSize と高水位線(HWM)の差である desiredSize を軸に動きます。desiredSize が 0 以下になると「もう要らない」という圧力が生まれ、ReadableStream では pull の呼び出しが止まり、WritableStream では writer.ready が保留されて生産側を待たせます。サイズの測り方は CountQueuingStrategy(個数)か ByteLengthQueuingStrategy(バイト数)で選びます。pipeTo/pipeThrough は消費側の圧力を生産側へ一段ずつ遡らせ、TransformStream は writable 側と readable 側の 2 キューを連結して圧力を上流へ素通しします。getReader()/getWriter() はストリームを排他ロックし、解放まで pipe や別リーダを禁じることで一度きり消費の一貫性を守ります。複製は tee() で行えますが、消費速度の差が未読バッファとしてメモリを食う点に注意します。非同期の土台は イベントループ と、データソースとしての実例は Fetch API の内部 と合わせて押さえると、ストリームの流量制御が一本の線でつながります。
Web/フロントエンド Article
Streams API(バックプレッシャと内部キュー)を実務で読む
TL;DRは入口です。実際に選ぶ・使う段階では、何を解決するか、何と比較するか、導入後にどこで詰まるかまで見る必要があります。
解決すること
Streams API
比較で見る軸
難易度: advanced / カテゴリ: Web/フロントエンド / タグ数: 5
導入後に効く点
pipeTo / pipeThrough は消費側の desiredSize を生産側の pull 呼び出しへ橋渡しし、TransformStream は writable 側と readable 側の 2 キューを内部で連結してバックプレッシャを上流へ素通しする。
先に潰すリスク
用語だけ覚えても、設計・実装・運用でどこに効くかを確認しないと判断を誤る。
- 難易度
- advanced
- カテゴリ
- Web/フロントエンド
- タグ数
- 5
判断チェックリスト
- 自社の用途が「Streams API / Web」に近いか確認する。
- 強みである「各ストリームは内部キューを持ち、高水位線(highWaterMark)と sizeAlgorithm から desiredSize を算出する。desiredSize が 0 以下になると「もう要らない」という圧力が生産側へ伝わり、これがバックプレッシャの実体。」が本当に評価軸になるか確認する。
- 注意点の「用語だけ覚えても、設計・実装・運用でどこに効くかを確認しないと判断を誤る。」を運用で吸収できるか確認する。
- 公開値や仕様値は、対象プラン・対象機種・対象リージョンまで確認する。
- 既存システム、ID、ネットワーク、監視、バックアップとの接続方法を先に洗い出す。
- 小さく試してから、本番移行、権限設計、障害時手順、コスト監視を決める。