share.dax.cool · streaming-backpressure

Streaming & Backpressure

流式传输与反压机制 · 一份给非熟手的可视化指南

Streaming(流式传输)和 Backpressure(反压)是处理"数据持续流动"时绕不开的两个概念。一个讲怎么把大数据切块送, 一个讲接收方处理慢于发送方时怎么办。下面用水管类比 + 实时模拟 + 代码 diff + 按严重程度色码的陷阱清单四件套讲清楚。

01 · Concepts

两个概念, 一对组合

为什么它们总是一起出现?

Streaming = 数据分块送, 不一次性塞完。 一个 10 GB 的文件不会等加载完才开始处理, 而是边读边处理边写。LLM 不会等整段回复生成完才发, 而是逐 token 流式吐字。视频不会等全片下载完才播, 而是边下边播。

Backpressure = 接收方处理慢于发送方时, 反向施压让发送方"慢点"。 这是流式传输的"必备配套"——一旦数据持续流动, 处理速度永远不会完美匹配, 慢的一方必须有办法告诉快的一方 stop / slow / resume

没有反压的流式传输是炸弹——快的一方持续灌, 慢的一方处理不过来, 中间缓冲区无限膨胀, 直到内存爆炸 / 进程崩溃 / 数据被迫丢弃。

02 · Analogy

水管 + 漏斗类比

三种情况, 一眼看懂为什么需要反压

① 流速匹配 · 一切正常
水龙头 ─────▶ 水管 ─────▶ 漏斗 ─────▶ 桶
   2 L/s        缓冲区        2 L/s
                几乎空

进出速度相同, 漏斗维持低水位, 系统稳态。
② 发送快 · 接收慢 · 没有反压
水龙头 ──▶ 水管 ─────▶ 漏斗 ─────▶ 桶
  5 L/s      缓冲爆膨胀     1 L/s
            ⚠️⚠️⚠️ OOM

漏斗每秒进 5 L 出 1 L, 净流入 4 L/s。
有限的内存几秒钟就被填满 → 进程崩溃 / 数据丢失。
③ 发送快 · 接收慢 · 有反压
水龙头 ──▶ 水管 ─────▶ 漏斗 ─────▶ 桶
  ↑5L/s     高水位     1 L/s
  暂停──────"我满了, 停一下"

漏斗水位达到阈值 → 反向通知水龙头关闭。
水位降下来 → 通知水龙头重开。
系统稳态, 实际吞吐被慢的一方限速。
03 · Live Simulation

实时模拟

点按钮切换三种策略, 观察缓冲区水位变化

无反压 0 产 · 0 处 · 0 丢 · 0
5 生产 / 帧
缓冲: 0 / 100
1 处理 / 帧
— 等待启动 —
04 · Code Diff

错误 vs 正确

Node.js Streams 经典示例 · 上面是反例, 下面是正解

✘ 没有反压 · 内存爆炸
// 一边读一边推, 不等接收方
readable.on('data', (chunk) => {
  writable.write(chunk);  // 不看返回值
  // write() 返回 false 表示
  // 内部缓冲已满, 但代码无视
});

// readable 读多快, 就推多快
// writable 内部 buffer 无限增长
// 最终: heap out of memory
✓ 正确反压 · 内存稳定
// 检查 write 返回值, 满了就暂停
readable.on('data', (chunk) => {
  const ok = writable.write(chunk);
  if (!ok) readable.pause();
});

writable.on('drain', () => {
  readable.resume();  // 缓冲排空再继续
});

// 或者一行: readable.pipe(writable)
// pipe 内部自动做反压

关键差异在三个动作:write() 返回值不能吞——返回 false 就是接收方在喊"我满了"。 ② 收到 false 立刻 readable.pause() 停止读取上游。 ③ 监听 drain 事件——内部缓冲降到 lowWaterMark 以下时触发, 这时才 resume()

实践中 能用 pipe() 就用 pipe()——它把上面三步全部封装了, 一行代码自带反压。手动监听 data 事件几乎总是错误的开始。

05 · Pitfalls

按严重程度色码的陷阱清单

从致命到无害 · 按色排序自上而下

CRITICAL
内存爆炸 / OOM
缓冲区无限增长直到进程被 OS 杀掉。复制大文件 / 转码视频 / ETL 大数据集时最常见, 单次故障就能崩生产。
解法: 用 pipe / pipeline / pipeTo, 不手写 data 事件
CRITICAL
数据丢失
缓冲满后选择"丢弃"作为反压策略——快的发送方继续推, 队列丢弃尾部数据。性能感觉没事, 实际数据完整性已经坏掉, 下游统计 / 账单 / 业务全错。
解法: 慎用 drop 策略; 必须 drop 时在 metrics 里 expose drop count, 报警阈值要严
WARNING
延迟堆积 (Latency Buildup)
没爆但缓冲一直处于高水位, 数据要排几秒甚至几分钟队才被处理。系统看似 OK 但实时性已经死了——监控 dashboard 显示数据有 5 分钟前的, 用户操作后 10 秒才生效。
解法: 监控 buffer 大小 + 处理延迟; 设容量上限触发反压, 不让无限堆
WARNING
背压隐藏在多层之间
A → B → C 三层管道, A 自己实现了反压, 但中间 B 把数据从 A 拉过来后用同步循环灌给 C, 等于 B 把 A 的反压"吃掉了"。系统会在 C 慢的时候卡死整条链。
解法: 端到端用同一套流原语 (Web Streams / Node Streams / RxJS), 不混用同步和异步处理
CAUTION
不必要的内存拷贝
每次 chunk 经过一层就拷贝一次——Buffer → string → Buffer 来回转, 1 GB 的流可能在内存里被复制 4-5 份。性能损耗大但不会崩。
解法: 全程用 Buffer / Uint8Array, 避免编码转换; 必要的转换放在最后一步
CAUTION
反压粒度太细 / 太粗
每个字节都做反压协商 → 协商开销超过数据量; 每 100 MB 才协商一次 → 反压响应慢, 缓冲已经爆了。
解法: 默认值通常合理(Node 16 KB / TCP 64 KB), 不要手贱改; 改之前先 benchmark
OK
用 pipe / pipeline / pipeTo 串联
把流式原语之间的连接交给标准 API, 反压自动处理。Node 用 stream.pipeline(), Web Streams 用 readable.pipeTo(writable), 都是端到端反压 + 错误传播 + 资源清理一起做好。
对应: readable.pipe(writable) · pipeline(a, b, c) · readable.pipeTo(writable)
OK
监听 drain 事件 + 检查 write 返回值
手写流处理时的标准范式, 三件套: write() 返回 false → pause() 上游 → drain 触发 → resume()
对应: 见上方代码 diff 右侧
06 · Tech Stack Matrix

不同技术栈里的反压

每一项都是工业上验证过的反压实现

技术栈
反压机制
是否自动
TCP
滑动窗口 (sliding window)——接收方在 ACK 里通告剩余 receive buffer 大小, 发送方据此调整发送量
自动
Node.js Streams
highWaterMark + write() 返回值 + drain 事件; pipe() / pipeline() 自动处理
pipe 自动
Web Streams API
ReadableStream.pipeTo(WritableStream) 自带反压; 自定义 controller 时用 controller.desiredSize 判断
pipeTo 自动
gRPC streaming
基于 HTTP/2 的 flow control window, 客户端和服务端各自维护; 应用层用 onReadyHandler 协调
半自动
RxJS
不是真反压(推送模式无法回压上游), 用 throttleTime / sample / bufferTime 做"采样/丢弃"近似
手动近似
SSE (Server-Sent Events)
协议本身无反压机制; 由底层 TCP 提供。客户端慢时浏览器维护内部缓冲, 服务端 socket.write 会被 TCP 阻塞
靠 TCP
WebSocket
同 SSE; 但应用层常用 bufferedAmount 检查客户端待发送队列, 超阈值则跳过推送
靠 TCP + 应用
Kafka / 消息队列
消费者拉模式 (pull-based)——消费者按自己速度拉, 生产者写入受 broker 容量限制
拉模式天然
LLM 流式输出 (SSE)
服务端边生成边推 SSE 字节; 客户端处理慢时 TCP 反压会让服务端 token 生成停顿(因为 socket.write 阻塞)
靠 TCP
07 · Takeaways

三句话记住

实操遇到流式场景时回想这几条

① 凡是"持续流动的数据", 必须问反压怎么实现。 没问就是把炸弹放进系统。

② 能用平台原生的 pipe / pipeTo / pipeline 就用, 别手写 data 事件。 自己接的反压链 99% 会出 bug, 标准 API 已经把所有边角踩过了。

③ 反压的目标是"匹配速率", 不是"无脑加大缓冲"。 缓冲只能吸收瞬时抖动 (秒级), 处理不了系统性慢——发送方比接收方持续慢 10%, 任何缓冲早晚都会满。