1. 在 ChatGPT App 架构中,“流”到底出现在哪里
在争论 SSE 更好还是 HTTP-stream 更好之前,先要弄清楚我们的技术栈里哪些地方真的存在“流”。
大致可以分为三个层级。
其一:ChatGPT 与模型层。 模型本身就会以 token 进行流式输出:你会看到回复文本像“打印”一样逐字出现。 这也是一种流,但它完全由 OpenAI 管理,与你的代码没有直接关系。
其二:MCP 层。 当 ChatGPT 连接到你的 MCP 服务器时,通常会保持SSE 连接: 服务器向其中推送 MCP 的 JSON‑RPC 消息(响应与通知),而 ChatGPT 则通过单独的 HTTP endpoint 发送请求, 例如 /messages。在 MCP 术语中,这就是基础传输层。
其三:Apps SDK 与你的后端层。 你的 React 小部件 GiftGenius 运行在 ChatGPT 的沙箱中,通过 HTTP 与你的后端/MCP‑gateway 通信: 要么用普通的 fetch,要么用带流的 fetch(ReadableStream), 或者用 SSE 订阅(EventSource)。
重要的是不要把这些层混为一谈。 MCP 事件是 ChatGPT 与服务器之间的“线路”;而小部件与 HTTP 后端之间的 SSE/HTTP-stream, 则是你需要自己负责的那一段。
我们可以画一张示意图。
flowchart TD
subgraph ChatGPT
UI[ChatGPT UI + 模型]
W[GiftGenius Widget]
end
subgraph YourInfra[开发者基础设施]
GW[MCP Gateway / Backend]
MCP[MCP Server]
end
UI -- "tool-call / 回复\n(内部 token 流)" --> W
UI <-- "通过 SSE 的 MCP\n(/sse + /messages)" --> MCP
W <-- "HTTP / fetch / SSE / stream" --> GW
GW <-- "JSON-RPC MCP" --> MCP
今天我们将聚焦于 Widget ↔ Backend 这条箭头,同时部分回顾:MCP 的传输本身也是基于 SSE 的。
也正是在这条 Widget ↔ Backend 的链路上,我们需要选择如何通信: 用简单的 HTTP 请求,还是使用流。下一节我们看看为什么这里“普通”HTTP 很快就不够用了。
2. 为什么普通的 HTTP 请求不够用
HTTP 的标准模型是“请求 → 一个响应”。客户端发起请求,服务器返回一次响应,然后连接关闭。
很多场景下这已经足够:获取某个 job 的当前状态、保存用户设置、 读取已经在数据库里的礼物清单等。
但一旦你做的是长耗时操作,一切就开始“吱呀作响”。
想象一下 GiftGenius 需要:
- 从多个来源收集信号(购买历史、愿望清单、社交网络),
- 通过若干次 LLM 请求进行处理,
- 从上百个候选中构建个性化排名。
这一切可能需要几十秒。如果你用普通的 HTTP 请求,保持 40 秒不返回、也不输出, 用户体验会像老浏览器:用户盯着旋转的加载图标,猜测应用是“挂了”还是还在“思考”。
除了 UX,还有纯技术层面的麻烦:
- ChatGPT、Vercel、代理上的超时;
- 无法发送进度、partial results 等;
- 无法正确处理断线与恢复。
于是就有了一个自然的选择:从一个大响应转向由很多小块组成的流, 服务器可以在就绪时逐步发送。
这些小块可以是:
- 事件(job.progress、job.completed)——对应 SSE;
- 单个大负载的片段(报告文本、按行输出的 NDJSON 礼物列表)——对应 HTTP-stream。
3. SSE(Server‑Sent Events):事件订阅
先从 SSE 讲起,因为它与 MCP 很“亲”:MCP 本身就是在 HTTP 之上用 SSE 连接把事件从服务器推送给客户端。
用大白话理解 SSE 模型
SSE 是一种基于普通 HTTP 的协议:
- 客户端对某个 endpoint 发起 GET 请求,服务器以 Content-Type: text/event-stream 响应;
- 服务器不关闭连接,而是不时写入如下格式的文本行:
event: job.progress
data: {"jobId":"123","percent":40}
event: job.completed
data: {"jobId":"123","resultCount":12}
- 浏览器端使用 EventSource,它会:
- 自动处理连接的重连;
- 解析 event: + data: + 双换行 的格式;
- 触发 onmessage / addEventListener("job.progress", ...) 等回调。
关键点:该通道是单向的,只有服务器向客户端发事件; 客户端不会通过这条连接发送数据。
对于 ChatGPT Apps,当小部件只是想“按 jobId 订阅”并对进度与完成进行响应时,这个模型非常合适。
Next.js 16 中的一个最小 SSE endpoint 示例
假设我们有一个用于 Job 进度事件的路由处理器:
app/api/gift-jobs/[jobId]/events/route.ts
import { NextRequest } from "next/server";
export async function GET(req: NextRequest, { params }: { params: { jobId: string } }) {
const jobId = params.jobId;
const stream = new ReadableStream({
start(controller) {
// 用于发送 SSE 事件的工具函数
const send = (event: string, data: unknown) => {
const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
controller.enqueue(new TextEncoder().encode(payload));
};
send("job.started", { jobId });
let percent = 0;
const interval = setInterval(() => {
percent += 20;
if (percent >= 100) {
send("job.completed", { jobId, totalGifts: 10 });
clearInterval(interval);
controller.close();
} else {
send("job.progress", { jobId, percent });
}
}, 1000);
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream", // 在这里设置 SSE
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
这是一个玩具级的模拟:每秒进度增长,最后发出 job.completed。 之后你会把这个计时器替换为真实的 worker/队列事件,但整体思路不变。
客户端:在 GiftGenius 小部件中订阅 SSE
在 React 小部件内部,只要有 jobId,我们就可以订阅该流。 提醒一下,小部件的 API 在 ChatGPT 沙箱中运行,但 EventSource 与普通浏览器中一样可用。
import { useEffect, useState } from "react";
export function GiftJobProgress({ jobId }: { jobId: string }) {
const [percent, setPercent] = useState(0);
useEffect(() => {
const url = `/api/gift-jobs/${jobId}/events`;
const es = new EventSource(url);
es.addEventListener("job.progress", (event) => {
const data = JSON.parse((event as MessageEvent).data);
setPercent(data.percent);
});
es.addEventListener("job.completed", () => {
setPercent(100);
es.close();
});
es.onerror = () => {
// 这里可以提示“网络异常,正在尝试重新连接”
};
return () => es.close();
}, [jobId]);
return <div>礼物筛选进度:{percent}%</div>;
}
现在你可以把它与 MCP 工具关联起来。工具 start_gift_job 返回 jobId,而在小部件的 ToolOutput 中直接渲染 GiftJobProgress 即可。
自动重连与 Last‑Event‑ID
EventSource 标准中内置了断线自动重连。 服务器可以在事件里使用标准的 SSE 字段 id:;客户端可通过请求头 Last-Event-ID 在重连后补上漏掉的事件。
对于简单的 GiftGenius,一开始可以不实现 id:,也不做单独的事件标识, 容忍重连时进度有些许“回退”。 但在生产环境,尤其高负载场景下,你会需要:
- 为每个 SSE 事件添加标准字段 id:,使客户端可在重连时携带 Last-Event-ID;
- 在事件 payload 中加入业务侧的 event_id,并在客户端/后端按该字段实现幂等处理。
这与幂等性直接相关:即使同一个 job.progress 到达两次, 处理器看到已见过的 event_id 也不会重复产生副作用。
最终,SSE 为我们提供了围绕 jobId 的事件订阅、自动重连与通过事件标识控制去重的能力。 接下来看看第二种流:当我们只有一个请求,但响应很大,希望分块返回。
4. HTTP‑streaming:对单个请求进行逐步响应
如果说 SSE 是“订阅独立的事件流”,那么 HTTP 流就是“一个请求,一个响应,但响应被拉长为时间上的一串分片”。
这正是你在使用 OpenAI API 并设置 stream : true 时看到的机制: 服务器发送 JSON 分片(常以 SSE 格式包装,但语义是“单请求 ↔ 部分响应的流”),客户端再将其组装为最终文本。
在你自己的 API 中,也可以用它来:
- 输出大型文本报告(例如,解释为何选了这些礼物),
- 输出很长的礼物清单(按部分流式传输,而不是让用户一直等待)。
Next.js 中最简单的 HTTP‑stream endpoint
假设我们要生成“结果说明”,其中 LLM 会写一段很长的文本。 我们想在生成的同时把它流给小部件。
app/api/gift-report/route.ts
import { NextRequest } from "next/server";
export async function POST(req: NextRequest) {
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
controller.enqueue(encoder.encode("开始分析...\n"));
// 这里本可以是 LLM 的真实分块生成
for (const line of ["收集偏好...\n", "估算预算...\n", "最终建议...\n"]) {
await new Promise((r) => setTimeout(r, 1000));
controller.enqueue(encoder.encode(line));
}
controller.close();
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/plain; charset=utf-8",
"Transfer-Encoding": "chunked", // 这里标明这是 HTTP/stream
},
});
}
从技术上讲,Next 会自己处理 chunked 编码;你只需要返回 ReadableStream。
在小部件中通过 fetch 读取 HTTP 流
在客户端(小部件内)可以这样读取流:
async function fetchReport(setText: (s: string) => void) {
const res = await fetch("/api/gift-report", { method: "POST" });
const reader = res.body!.getReader();
const decoder = new TextDecoder();
let acc = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
acc += decoder.decode(value, { stream: true });
setText(acc); // 随着数据到达更新 UI
}
}
以及一个包装组件:
import { useState } from "react";
export function GiftReport() {
const [text, setText] = useState("");
return (
<div>
<button onClick={() => fetchReport(setText)}>生成报告</button>
<pre style={{ whiteSpace: "pre-wrap" }}>{text}</pre>
</div>
);
}
这是经典模式:一次 POST 请求 /api/gift-report, 返回的是文本流,你一边读取一边展示。
流式 JSON,而非纯文本
很多时候你会希望流的不是字符串,而是 JSON 对象。最常见的格式是NDJSON(Newline‑delimited JSON): 每个事件是一行 JSON 字符串,并以 \n 结尾。
服务端示例:
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
for (let i = 0; i < 5; i++) {
const chunk = { type: "gift", index: i, name: `礼物 #${i}` };
controller.enqueue(encoder.encode(JSON.stringify(chunk) + "\n"));
await new Promise((r) => setTimeout(r, 500));
}
controller.close();
},
});
客户端用 TextDecoder 读取,按 \n 分割,再逐个解析成 JSON 对象。
5. SSE vs HTTP‑stream:区别与选择
到这一步直觉上应该已经很清晰了,但我们还是做个小表总结一下。
| 特性 | SSE (Server‑Sent Events) | HTTP‑stream (chunked) |
|---|---|---|
| 发起方 | 客户端发起 GET 并进行订阅 | 客户端发起请求(GET/POST),服务器以流式响应 |
| 方向 | 仅 服务器 → 客户端 | 针对单个请求的服务器响应 |
| 语义 | 订阅事件流(pub/sub) | 对单个请求的部分响应 |
| 内置协议 | 有(event:、data:、id: 等) | 无,需要自定格式(纯文本行、NDJSON、JSON) |
| 客户端 API | EventSource | fetch + ReadableStream / response.body |
| 重连支持 | 内置(EventSource、Last-Event-ID) | 需要自行实现 |
| 典型场景 | 进度、状态、按 jobId 的通知 | 流式文本、大型 JSON 响应、LLM 输出 |
如果把选择简化成“手指规则”(小心使用,别教条化):
- 你有一个 job,其周围有一堆事件 → 用SSE;
- 你的一个 tool 调用会返回很大的结果,并希望分段展示 → 用HTTP‑stream。
对于 GiftGenius:SSE 用于实时进度条与状态;HTTP 流用于长文本汇总或逐步加载很长的礼物清单。
6. 它与 MCP 与 GiftGenius 如何衔接
回到一开始的示意:模型 ↔ MCP ↔ 小部件 ↔ 后端。我们已经看过小部件 ↔ 后端这一级的流, 现在退回一步,分清楚哪里是 MCP,哪里是“纯 HTTP”。
MCP 定义了 ChatGPT(作为 MCP 客户端)如何与 MCP 服务器通信。其传输层是:
- ChatGPT 打开 SSE 连接 /sse,并通过它接收 MCP 消息(响应、通知、事件);
- ChatGPT 将 MCP 请求(call_tool、list_tools 等)发到 /messages,通常是 POST JSON‑RPC。
当你把 GiftGenius 接入 ChatGPT 时,这一层你已经跑通了。
现在,当我们在小部件中加入异步任务与 UX 流时,有两种架构方案。
方案一——“纯 MCP”: MCP 服务器自行生成 job.progress 与 job.completed 事件; ChatGPT 通过 MCP‑SSE 接收;随后模型再用更新过的上下文调用你的小部件, 小部件据此渲染进度,无需直接与后端通信。这是最“正统”的 MCP‑events 方案。
方案二——混合式: MCP 工具 start_gift_job 创建任务并返回 jobId; 小部件拿到 jobId 后,自己通过 HTTP 与后端通信, 订阅 SSE endpoint /api/gift-jobs/{jobId}/events,并在需要时请求 HTTP 流的报告。 在 MCP 侧则无需做额外的特殊处理。
课程中我们采用混合式路径:它更容易与 App Router/Next 集成,也更便于本地调试。 等你熟悉之后,再迁移到“纯 MCP 通知”也不迟。
7. 重连、超时与网络的其他现实
到目前为止一切听起来都很理想:打开 SSE 或流,数据顺畅传输,事件如期而至,UX 熠熠生辉。 现实中网络总会在意想不到的时候中断连接,基础设施也会设置各种超时。
可能出现的问题
使用 SSE 与 HTTP-stream,你或早或晚会遇到:
- 代理上的 idle 超时:“若连接在 N 秒内无数据传输——关闭”;
- 后端重启(部署、故障);
- 用户端网络不稳定(尤其移动端)。
这很正常;重要的是做好准备,而不是指望“不会发生”。
SSE 的策略
SSE 在这方面有不少优势:
- EventSource 会按一定延迟自动重连;
- 你有 id: 与 Last-Event-ID 来追赶漏掉的事件。
最低限度的实践:
- 服务器定期发送心跳,避免连接被认定为完全 idle。 可以发独立事件 event: ping,或仅发送注释 : keep-alive。
- 客户端在 onerror 中给用户清晰的状态提示, 如“网络出现问题,正在尝试重新连接…”,而不是让小部件整体崩掉。
- 在重连时,如果你使用了 id:,只下发该 ID 之后的新事件。 对于 GiftGenius,一开始可以不加 id:,仅基于最后收到的 job.progress/job.completed 重新构建状态。
HTTP‑stream 的策略
HTTP 流是单次请求,因此一旦中断,本质上需要重新开始:
- 如果你流的是文本报告,可以直接提示用户: “未能获取完整报告,请重试”,然后重新开始;
- 如果你流的是结构化数据(NDJSON),可以考虑断点续传机制: 例如在请求中传 offset 或 cursor,指明从哪里继续。
初期可以不复杂化:若响应流未完整结束,就展示已到达的部分,并给出“继续生成报告”的按钮来发起新请求。
关键是不要让用户陷入“无尽等待”的状态。
8. 应用于 GiftGenius:端到端场景
现在把关于 SSE、HTTP 流以及两种与 MCP 的架构方案都串起来, 用一个 GiftGenius 的真实场景,从用户请求到报告产出。
用户在 ChatGPT 中输入:“给桌游爱好者挑选礼物,预算不超过 100 美元。” 模型决定调用 GiftGenius。应用/代理对你的 MCP 服务器发起 tool‑call start_gift_job。 服务器:
- 把 job 写入数据库;
- 把它放入内部队列(队列与 worker 的细节在下一讲,这里先假设“有人”会执行);
- 同步返回 jobId 作为 tool‑call 的响应。
GiftGenius 小部件收到带有 jobId 的 ToolOutput 并渲染组件:
function GiftGeniusRoot({ jobId }: { jobId: string }) {
return (
<div>
<h2>正在寻找理想的礼物...</h2>
<GiftJobProgress jobId={jobId} />
<GiftReport />
</div>
);
}
组件 GiftJobProgress 订阅 SSE /api/gift-jobs/{jobId}/events 并绘制进度。 每个 job.progress 更新百分比,job.completed 将其置为 100%,并可能启用“显示详细报告”的按钮。
组件 GiftReport 在按钮点击时发送 POST /api/gift-report(传入 jobId),并在服务器流出 HTTP 分片时逐步展示 文本报告。
在 SSE 连接中断时,小部件会展示友好的提示,EventSource 会尝试自动重连。 在报告流出现问题时,用户会看到已到达的部分,并有“继续生成”或“重试”的按钮。
从 ChatGPT 与 MCP 的视角:
- MCP 看到了工具调用 start_gift_job,以及可能随后出现的 job 状态通知;
- 围绕流的 UX 主要是在小部件与后端之间的 HTTP 层实现的。
9. 使用 SSE 与 HTTP‑stream 时的常见错误
错误 №1:把 SSE 与 HTTP‑stream 当作“同一回事”。
是的,它们底层都跑在 HTTP 与分块响应之上,但语义差异很大。 SSE 是对独立事件的订阅,这些事件可能在任意时刻到来,客户端事先并不知道。 HTTP 流是一个具体响应在时间上的拆分。 如果你试图通过一个 HTTP 流实现多个 jobId 的订阅, 就不得不在字节流上自造协议,等于重造了一半的 SSE。
错误 №2:忽视 SSE 的自动重连,不考虑幂等性。
许多人写“最简单”的 SSE 服务器:只发送 data: ..., 既不加标准的 id:(供 Last-Event-ID 使用), 也不在事件体中加入业务侧的 event_id。 结果第一次断线重连后,事件副本开始增殖。 没有合理的 event_id 与“我已经见过该事件”的逻辑,客户端处理器可能会重复更新状态、 重复显示同一个 job.completed, 甚至更糟,重复扣款/发放积分。
错误 №3:worker 的每个“喷嚏”都发成一个 SSE 事件。
如果你每毫秒通过 SSE 发送一次进度,八成会把网络和客户端都搞崩,而不是带来丝滑的动画。 更合理的是聚合更新,例如每 200–500 ms 发一次, 或在阶段变化时发送。限流与背压的话题我们还会讨论,但此刻就应该考虑事件频率。
错误 №4:在 HTTP 流之上构建复杂协议,却没有明确格式。
典型反模式:以流式传输 JSON,但不加分隔符,试图“猜测”一个对象的结束与下一个对象的开始。 或者在同一个流里混合文本与 JSON。 最好选择简单清晰的格式:逐行文本,或 NDJSON(每行一个 JSON 对象), 或明确的分隔符。这样客户端的解析器才会保持清醒。
错误 №5:忘记超时与“永恒”的流。
有时开发者会做出几乎不发任何数据的 SSE endpoint,连续 5–10 分钟沉默, 然后惊讶于从用户到服务器一路上的连接被切断(负载均衡、API 网关、企业代理)。 定期的心跳事件或注释有助于保持连接存活并及时发现断线。 而 HTTP 流不应变成无穷无尽的响应——若需要永续订阅,请用 SSE。
错误 №6:试图用 HTTP 流实现复杂的 pub/sub,而不是用正常的事件。
有时会有诱惑:“我们做一个流,在里面同时发送进度、partial results 以及零散日志。” 结果客户端需要一个复杂的多路分发器,逐个分片分析并决定它属于哪个 jobId。 多数情况下,使用按 job.progress、job.completed 类型划分的 SSE 事件, 并按 job 使用单独通道,比在 HTTP 流之上发明一个巨型私有协议更简单可靠。
错误 №7:把 UX 死死绑在“流永不掉线”的假设上。
任何流最终都会中断。如果你的小部件因此只剩下永远转圈的进度条、且没有任何操作选项—— UX 会被视为“坏掉了”。 哪怕是简单的一句“看起来连接断开了。请尝试重新开始礼物筛选。”并配上“重试”的按钮, 也远胜于沉默。
GO TO FULL VERSION