CodeGym /课程 /ChatGPT Apps /流式通道:SSE 与 HTTP/stream —— 何时以及如何使用它们

流式通道:SSE 与 HTTP/stream —— 何时以及如何使用它们

ChatGPT Apps
第 13 级 , 课程 1
可用

1. 在 ChatGPT App 架构中,“流”到底出现在哪里

在争论 SSE 更好还是 HTTP-stream 更好之前,先要弄清楚我们的技术栈里哪些地方真的存在“流”

大致可以分为三个层级。

其一:ChatGPT 与模型层。 模型本身就会以 token 进行流式输出:你会看到回复文本像“打印”一样逐字出现。 这也是一种流,但它完全由 OpenAI 管理,与你的代码没有直接关系。

其二:MCP 层。 当 ChatGPT 连接到你的 MCP 服务器时,通常会保持SSE 连接: 服务器向其中推送 MCP 的 JSON‑RPC 消息(响应与通知),而 ChatGPT 则通过单独的 HTTP endpoint 发送请求, 例如 /messages。在 MCP 术语中,这就是基础传输层。

其三:Apps SDK 与你的后端层。 你的 React 小部件 GiftGenius 运行在 ChatGPT 的沙箱中,通过 HTTP 与你的后端/MCP‑gateway 通信: 要么用普通的 fetch,要么用带流的 fetchReadableStream), 或者用 SSE 订阅(EventSource)。

重要的是不要把这些层混为一谈。 MCP 事件是 ChatGPT 与服务器之间的“线路”;而小部件与 HTTP 后端之间的 SSE/HTTP-stream, 则是你需要自己负责的那一段。

我们可以画一张示意图。

flowchart TD
  subgraph ChatGPT
    UI[ChatGPT UI + 模型]
    W[GiftGenius Widget]
  end

  subgraph YourInfra[开发者基础设施]
    GW[MCP Gateway / Backend]
    MCP[MCP Server]
  end

  UI -- "tool-call / 回复\n(内部 token 流)" --> W

  UI <-- "通过 SSE 的 MCP\n(/sse + /messages)" --> MCP

  W <-- "HTTP / fetch / SSE / stream" --> GW

  GW <-- "JSON-RPC MCP" --> MCP

今天我们将聚焦于 Widget ↔ Backend 这条箭头,同时部分回顾:MCP 的传输本身也是基于 SSE 的。

也正是在这条 Widget ↔ Backend 的链路上,我们需要选择如何通信: 用简单的 HTTP 请求,还是使用流。下一节我们看看为什么这里“普通”HTTP 很快就不够用了。

2. 为什么普通的 HTTP 请求不够用

HTTP 的标准模型是“请求 → 一个响应”。客户端发起请求,服务器返回一次响应,然后连接关闭。

很多场景下这已经足够:获取某个 job 的当前状态、保存用户设置、 读取已经在数据库里的礼物清单等。

但一旦你做的是长耗时操作,一切就开始“吱呀作响”。

想象一下 GiftGenius 需要:

  • 从多个来源收集信号(购买历史、愿望清单、社交网络),
  • 通过若干次 LLM 请求进行处理,
  • 从上百个候选中构建个性化排名。

这一切可能需要几十秒。如果你用普通的 HTTP 请求,保持 40 秒不返回、也不输出, 用户体验会像老浏览器:用户盯着旋转的加载图标,猜测应用是“挂了”还是还在“思考”。

除了 UX,还有纯技术层面的麻烦:

  • ChatGPT、Vercel、代理上的超时;
  • 无法发送进度、partial results 等;
  • 无法正确处理断线与恢复。

于是就有了一个自然的选择:从一个大响应转向由很多小块组成的流, 服务器可以在就绪时逐步发送。

这些小块可以是:

  • 事件(job.progressjob.completed)——对应 SSE
  • 单个大负载的片段(报告文本、按行输出的 NDJSON 礼物列表)——对应 HTTP-stream

3. SSE(Server‑Sent Events):事件订阅

先从 SSE 讲起,因为它与 MCP 很“亲”:MCP 本身就是在 HTTP 之上用 SSE 连接把事件从服务器推送给客户端。

用大白话理解 SSE 模型

SSE 是一种基于普通 HTTP 的协议:

  1. 客户端对某个 endpoint 发起 GET 请求,服务器以 Content-Type: text/event-stream 响应;
  2. 服务器不关闭连接,而是不时写入如下格式的文本行:
event: job.progress
data: {"jobId":"123","percent":40}

event: job.completed
data: {"jobId":"123","resultCount":12}
  1. 浏览器端使用 EventSource,它会:
    • 自动处理连接的重连;
    • 解析 event: + data: + 双换行 的格式;
    • 触发 onmessage / addEventListener("job.progress", ...) 等回调。

关键点:该通道是单向的,只有服务器向客户端发事件; 客户端不会通过这条连接发送数据。

对于 ChatGPT Apps,当小部件只是想“按 jobId 订阅”并对进度与完成进行响应时,这个模型非常合适。

Next.js 16 中的一个最小 SSE endpoint 示例

假设我们有一个用于 Job 进度事件的路由处理器:

app/api/gift-jobs/[jobId]/events/route.ts

import { NextRequest } from "next/server";

export async function GET(req: NextRequest, { params }: { params: { jobId: string } }) {
  const jobId = params.jobId;

  const stream = new ReadableStream({
    start(controller) {
      // 用于发送 SSE 事件的工具函数
      const send = (event: string, data: unknown) => {
        const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
        controller.enqueue(new TextEncoder().encode(payload));
      };

      send("job.started", { jobId });

      let percent = 0;
      const interval = setInterval(() => {
        percent += 20;
        if (percent >= 100) {
          send("job.completed", { jobId, totalGifts: 10 });
          clearInterval(interval);
          controller.close();
        } else {
          send("job.progress", { jobId, percent });
        }
      }, 1000);
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream", // 在这里设置 SSE
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

这是一个玩具级的模拟:每秒进度增长,最后发出 job.completed。 之后你会把这个计时器替换为真实的 worker/队列事件,但整体思路不变。

客户端:在 GiftGenius 小部件中订阅 SSE

在 React 小部件内部,只要有 jobId,我们就可以订阅该流。 提醒一下,小部件的 API 在 ChatGPT 沙箱中运行,但 EventSource 与普通浏览器中一样可用。

import { useEffect, useState } from "react";

export function GiftJobProgress({ jobId }: { jobId: string }) {
  const [percent, setPercent] = useState(0);

  useEffect(() => {
    const url = `/api/gift-jobs/${jobId}/events`;
    const es = new EventSource(url);

    es.addEventListener("job.progress", (event) => {
      const data = JSON.parse((event as MessageEvent).data);
      setPercent(data.percent);
    });

    es.addEventListener("job.completed", () => {
      setPercent(100);
      es.close();
    });

    es.onerror = () => {
      // 这里可以提示“网络异常,正在尝试重新连接”
    };

    return () => es.close();
  }, [jobId]);

  return <div>礼物筛选进度:{percent}%</div>;
}

现在你可以把它与 MCP 工具关联起来。工具 start_gift_job 返回 jobId,而在小部件的 ToolOutput 中直接渲染 GiftJobProgress 即可。

自动重连与 Last‑Event‑ID

EventSource 标准中内置了断线自动重连。 服务器可以在事件里使用标准的 SSE 字段 id:;客户端可通过请求头 Last-Event-ID 在重连后补上漏掉的事件。

对于简单的 GiftGenius,一开始可以不实现 id:,也不做单独的事件标识, 容忍重连时进度有些许“回退”。 但在生产环境,尤其高负载场景下,你会需要:

  • 为每个 SSE 事件添加标准字段 id:,使客户端可在重连时携带 Last-Event-ID
  • 在事件 payload 中加入业务侧的 event_id,并在客户端/后端按该字段实现幂等处理。

这与幂等性直接相关:即使同一个 job.progress 到达两次, 处理器看到已见过的 event_id 也不会重复产生副作用。

最终,SSE 为我们提供了围绕 jobId 的事件订阅、自动重连与通过事件标识控制去重的能力。 接下来看看第二种流:当我们只有一个请求,但响应很大,希望分块返回。

4. HTTP‑streaming:对单个请求进行逐步响应

如果说 SSE 是“订阅独立的事件流”,那么 HTTP 流就是“一个请求,一个响应,但响应被拉长为时间上的一串分片”。

这正是你在使用 OpenAI API 并设置 stream : true 时看到的机制: 服务器发送 JSON 分片(常以 SSE 格式包装,但语义是“单请求 ↔ 部分响应的流”),客户端再将其组装为最终文本。

在你自己的 API 中,也可以用它来:

  • 输出大型文本报告(例如,解释为何选了这些礼物),
  • 输出很长的礼物清单(按部分流式传输,而不是让用户一直等待)。

Next.js 中最简单的 HTTP‑stream endpoint

假设我们要生成“结果说明”,其中 LLM 会写一段很长的文本。 我们想在生成的同时把它流给小部件。

app/api/gift-report/route.ts

import { NextRequest } from "next/server";

export async function POST(req: NextRequest) {
  const stream = new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder();

      controller.enqueue(encoder.encode("开始分析...\n"));

      // 这里本可以是 LLM 的真实分块生成
      for (const line of ["收集偏好...\n", "估算预算...\n", "最终建议...\n"]) {
        await new Promise((r) => setTimeout(r, 1000));
        controller.enqueue(encoder.encode(line));
      }

      controller.close();
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/plain; charset=utf-8",
      "Transfer-Encoding": "chunked", // 这里标明这是 HTTP/stream
    },
  });
}

从技术上讲,Next 会自己处理 chunked 编码;你只需要返回 ReadableStream

在小部件中通过 fetch 读取 HTTP 流

在客户端(小部件内)可以这样读取流:

async function fetchReport(setText: (s: string) => void) {
  const res = await fetch("/api/gift-report", { method: "POST" });
  const reader = res.body!.getReader();
  const decoder = new TextDecoder();

  let acc = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    acc += decoder.decode(value, { stream: true });
    setText(acc); // 随着数据到达更新 UI
  }
}

以及一个包装组件:

import { useState } from "react";

export function GiftReport() {
  const [text, setText] = useState("");

  return (
    <div>
      <button onClick={() => fetchReport(setText)}>生成报告</button>
      <pre style={{ whiteSpace: "pre-wrap" }}>{text}</pre>
    </div>
  );
}

这是经典模式:一次 POST 请求 /api/gift-report, 返回的是文本流,你一边读取一边展示。

流式 JSON,而非纯文本

很多时候你会希望流的不是字符串,而是 JSON 对象。最常见的格式是NDJSON(Newline‑delimited JSON): 每个事件是一行 JSON 字符串,并以 \n 结尾。

服务端示例:

const stream = new ReadableStream({
  async start(controller) {
    const encoder = new TextEncoder();
    for (let i = 0; i < 5; i++) {
      const chunk = { type: "gift", index: i, name: `礼物 #${i}` };
      controller.enqueue(encoder.encode(JSON.stringify(chunk) + "\n"));
      await new Promise((r) => setTimeout(r, 500));
    }
    controller.close();
  },
});

客户端用 TextDecoder 读取,按 \n 分割,再逐个解析成 JSON 对象。

5. SSE vs HTTP‑stream:区别与选择

到这一步直觉上应该已经很清晰了,但我们还是做个小表总结一下。

特性 SSE (Server‑Sent Events) HTTP‑stream (chunked)
发起方 客户端发起 GET 并进行订阅 客户端发起请求(GET/POST),服务器以流式响应
方向 仅 服务器 → 客户端 针对单个请求的服务器响应
语义 订阅事件流(pub/sub 对单个请求的部分响应
内置协议 有(event:data:id: 等) 无,需要自定格式(纯文本行、NDJSON、JSON)
客户端 API EventSource fetch + ReadableStream / response.body
重连支持 内置(EventSourceLast-Event-ID 需要自行实现
典型场景 进度、状态、按 jobId 的通知 流式文本、大型 JSON 响应、LLM 输出

如果把选择简化成“手指规则”(小心使用,别教条化):

  • 你有一个 job,其周围有一堆事件 → 用SSE
  • 你的一个 tool 调用会返回很大的结果,并希望分段展示 → 用HTTP‑stream

对于 GiftGenius:SSE 用于实时进度条与状态;HTTP 流用于长文本汇总或逐步加载很长的礼物清单。

6. 它与 MCP 与 GiftGenius 如何衔接

回到一开始的示意:模型 ↔ MCP ↔ 小部件 ↔ 后端。我们已经看过小部件 ↔ 后端这一级的流, 现在退回一步,分清楚哪里是 MCP,哪里是“纯 HTTP”。

MCP 定义了 ChatGPT(作为 MCP 客户端)如何与 MCP 服务器通信。其传输层是:

  • ChatGPT 打开 SSE 连接 /sse,并通过它接收 MCP 消息(响应、通知、事件);
  • ChatGPT 将 MCP 请求(call_toollist_tools 等)发到 /messages,通常是 POST JSON‑RPC。

当你把 GiftGenius 接入 ChatGPT 时,这一层你已经跑通了。

现在,当我们在小部件中加入异步任务与 UX 流时,有两种架构方案。

方案一——“纯 MCP”: MCP 服务器自行生成 job.progressjob.completed 事件; ChatGPT 通过 MCP‑SSE 接收;随后模型再用更新过的上下文调用你的小部件, 小部件据此渲染进度,无需直接与后端通信。这是最“正统”的 MCP‑events 方案。

方案二——混合式: MCP 工具 start_gift_job 创建任务并返回 jobId; 小部件拿到 jobId 后,自己通过 HTTP 与后端通信, 订阅 SSE endpoint /api/gift-jobs/{jobId}/events,并在需要时请求 HTTP 流的报告。 在 MCP 侧则无需做额外的特殊处理。

课程中我们采用混合式路径:它更容易与 App Router/Next 集成,也更便于本地调试。 等你熟悉之后,再迁移到“纯 MCP 通知”也不迟。

7. 重连、超时与网络的其他现实

到目前为止一切听起来都很理想:打开 SSE 或流,数据顺畅传输,事件如期而至,UX 熠熠生辉。 现实中网络总会在意想不到的时候中断连接,基础设施也会设置各种超时。

可能出现的问题

使用 SSEHTTP-stream,你或早或晚会遇到:

  • 代理上的 idle 超时:“若连接在 N 秒内无数据传输——关闭”;
  • 后端重启(部署、故障);
  • 用户端网络不稳定(尤其移动端)。

这很正常;重要的是做好准备,而不是指望“不会发生”。

SSE 的策略

SSE 在这方面有不少优势:

  • EventSource 会按一定延迟自动重连;
  • 你有 id:Last-Event-ID 来追赶漏掉的事件。

最低限度的实践:

  1. 服务器定期发送心跳,避免连接被认定为完全 idle。 可以发独立事件 event: ping,或仅发送注释 : keep-alive
  2. 客户端在 onerror 中给用户清晰的状态提示, 如“网络出现问题,正在尝试重新连接…”,而不是让小部件整体崩掉。
  3. 在重连时,如果你使用了 id:,只下发该 ID 之后的新事件。 对于 GiftGenius,一开始可以不加 id:,仅基于最后收到的 job.progress/job.completed 重新构建状态。

HTTP‑stream 的策略

HTTP 流是单次请求,因此一旦中断,本质上需要重新开始:

  • 如果你流的是文本报告,可以直接提示用户: “未能获取完整报告,请重试”,然后重新开始;
  • 如果你流的是结构化数据(NDJSON),可以考虑断点续传机制: 例如在请求中传 offsetcursor,指明从哪里继续。

初期可以不复杂化:若响应流未完整结束,就展示已到达的部分,并给出“继续生成报告”的按钮来发起新请求。

关键是不要让用户陷入“无尽等待”的状态。

8. 应用于 GiftGenius:端到端场景

现在把关于 SSE、HTTP 流以及两种与 MCP 的架构方案都串起来, 用一个 GiftGenius 的真实场景,从用户请求到报告产出。

用户在 ChatGPT 中输入:“给桌游爱好者挑选礼物,预算不超过 100 美元。” 模型决定调用 GiftGenius。应用/代理对你的 MCP 服务器发起 tool‑call start_gift_job。 服务器:

  • 把 job 写入数据库;
  • 把它放入内部队列(队列与 worker 的细节在下一讲,这里先假设“有人”会执行);
  • 同步返回 jobId 作为 tool‑call 的响应。

GiftGenius 小部件收到带有 jobIdToolOutput 并渲染组件:

function GiftGeniusRoot({ jobId }: { jobId: string }) {
  return (
    <div>
      <h2>正在寻找理想的礼物...</h2>
      <GiftJobProgress jobId={jobId} />
      <GiftReport />
    </div>
  );
}

组件 GiftJobProgress 订阅 SSE /api/gift-jobs/{jobId}/events 并绘制进度。 每个 job.progress 更新百分比,job.completed 将其置为 100%,并可能启用“显示详细报告”的按钮。

组件 GiftReport 在按钮点击时发送 POST /api/gift-report(传入 jobId),并在服务器流出 HTTP 分片时逐步展示 文本报告。

在 SSE 连接中断时,小部件会展示友好的提示,EventSource 会尝试自动重连。 在报告流出现问题时,用户会看到已到达的部分,并有“继续生成”或“重试”的按钮。

从 ChatGPT 与 MCP 的视角:

  • MCP 看到了工具调用 start_gift_job,以及可能随后出现的 job 状态通知;
  • 围绕流的 UX 主要是在小部件与后端之间的 HTTP 层实现的。

9. 使用 SSE 与 HTTP‑stream 时的常见错误

错误 №1:把 SSE 与 HTTP‑stream 当作“同一回事”。
是的,它们底层都跑在 HTTP 与分块响应之上,但语义差异很大。 SSE 是对独立事件的订阅,这些事件可能在任意时刻到来,客户端事先并不知道。 HTTP 流是一个具体响应在时间上的拆分。 如果你试图通过一个 HTTP 流实现多个 jobId 的订阅, 就不得不在字节流上自造协议,等于重造了一半的 SSE。

错误 №2:忽视 SSE 的自动重连,不考虑幂等性。
许多人写“最简单”的 SSE 服务器:只发送 data: ..., 既不加标准的 id:(供 Last-Event-ID 使用), 也不在事件体中加入业务侧的 event_id。 结果第一次断线重连后,事件副本开始增殖。 没有合理的 event_id 与“我已经见过该事件”的逻辑,客户端处理器可能会重复更新状态、 重复显示同一个 job.completed, 甚至更糟,重复扣款/发放积分。

错误 №3:worker 的每个“喷嚏”都发成一个 SSE 事件。
如果你每毫秒通过 SSE 发送一次进度,八成会把网络和客户端都搞崩,而不是带来丝滑的动画。 更合理的是聚合更新,例如每 200500 ms 发一次, 或在阶段变化时发送。限流与背压的话题我们还会讨论,但此刻就应该考虑事件频率。

错误 №4:在 HTTP 流之上构建复杂协议,却没有明确格式。
典型反模式:以流式传输 JSON,但不加分隔符,试图“猜测”一个对象的结束与下一个对象的开始。 或者在同一个流里混合文本与 JSON。 最好选择简单清晰的格式:逐行文本,或 NDJSON(每行一个 JSON 对象), 或明确的分隔符。这样客户端的解析器才会保持清醒。

错误 №5:忘记超时与“永恒”的流。
有时开发者会做出几乎不发任何数据的 SSE endpoint,连续 510 分钟沉默, 然后惊讶于从用户到服务器一路上的连接被切断(负载均衡、API 网关、企业代理)。 定期的心跳事件或注释有助于保持连接存活并及时发现断线。 而 HTTP 流不应变成无穷无尽的响应——若需要永续订阅,请用 SSE。

错误 №6:试图用 HTTP 流实现复杂的 pub/sub,而不是用正常的事件。
有时会有诱惑:“我们做一个流,在里面同时发送进度、partial results 以及零散日志。” 结果客户端需要一个复杂的多路分发器,逐个分片分析并决定它属于哪个 jobId。 多数情况下,使用按 job.progressjob.completed 类型划分的 SSE 事件, 并按 job 使用单独通道,比在 HTTP 流之上发明一个巨型私有协议更简单可靠。

错误 №7:把 UX 死死绑在“流永不掉线”的假设上。
任何流最终都会中断。如果你的小部件因此只剩下永远转圈的进度条、且没有任何操作选项—— UX 会被视为“坏掉了”。 哪怕是简单的一句“看起来连接断开了。请尝试重新开始礼物筛选。”并配上“重试”的按钮, 也远胜于沉默。

评论
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION