CodeGym /コース /ChatGPT Apps /ストリーミングチャネル: SSE と HTTP/stream — いつ・どのように使うか

ストリーミングチャネル: SSE と HTTP/stream — いつ・どのように使うか

ChatGPT Apps
レベル 13 , レッスン 1
使用可能

1. ChatGPT App のアーキテクチャで「ストリーム」はどこに現れるのか

SSEHTTP-stream のどちらが良いか議論する前に、私たちのスタックのどこにストリームが存在しているのかを理解するのが有益です。

大きく 3 つのレベルに分けられます。

第一に、ChatGPT とモデルのレベル。 モデル自体がすでにトークン単位で応答をストリームします。つまり、回答テキストが「タイプアウト」されるように見えます。 これもストリームですが、これは OpenAI によって完全に制御され、あなたのコードには直接関係しません。

第二に、MCP のレベル。 ChatGPT があなたの MCP サーバーに接続するとき、通常は SSE 接続を維持します。 サーバーは MCP の JSON-RPC メッセージ(応答と通知)をプッシュし、ChatGPT は別の HTTP エンドポイントにリクエストを送ります。 例: /messages。MCP の用語では、これが基本のトランスポートです。

第三に、Apps SDK とあなたのバックエンドのレベル。 あなたの React ウィジェット GiftGenius は ChatGPT のサンドボックス内で動作し、バックエンド/MCP ゲートウェイとは HTTP で通信します。 通常の fetch、ストリーム付きの fetchReadableStream)、 あるいは SSE 購読(EventSource)です。

これらのレイヤーを混同しないことが重要です。 MCP のイベントは ChatGPT とサーバーの間の「配線」です。一方、ウィジェットとあなたの HTTP バックエンドの間の SSE/HTTP-stream は、あなたの担当範囲です。

図にすると次のようになります。

flowchart TD
  subgraph ChatGPT
    UI[ChatGPT UI + モデル]
    W[GiftGenius Widget]
  end

  subgraph YourInfra[開発者のインフラ]
    GW[MCP Gateway / Backend]
    MCP[MCP Server]
  end

  UI -- "tool-call / 応答\n(内部のトークンストリーム)" --> W

  UI <-- "MCP over SSE\n(/sse + /messages)" --> MCP

  W <-- "HTTP / fetch / SSE / stream" --> GW

  GW <-- "JSON-RPC MCP" --> MCP

今日は Widget ↔ Backend の矢印に焦点を当て、MCP のトランスポートが SSE に基づいている点も一部振り返ります。

まさにこの区間 — Widget ↔ Backend — で、どの通信方法を選ぶか(通常の HTTP なのか、ストリームなのか)を決める必要があります。次のセクションで、ここで「ふつうの」HTTP がすぐに不足する理由を見ていきます。

2. なぜ通常の HTTP リクエストだけでは足りないのか

HTTP の標準モデルは「リクエスト → 単一のレスポンス」です。クライアントが何かを尋ね、サーバーが 1 度だけ応答し、接続は閉じられます。

多くのタスクではそれで十分です。たとえば job の現在のステータス取得、ユーザー設定の保存、 データベースに既にあるギフトの一覧取得などです。

しかし、時間のかかる処理を行うと、すぐにきしみ始めます。

GiftGenius を想像してください。次のような処理をします:

  • 複数のソース(購入履歴、ウィッシュリスト、SNS)からシグナルを収集する。
  • それをいくつかの LLM リクエストにかける。
  • 数百の候補からパーソナライズされたランキングを作る。

これ全体に数十秒かかることがあります。もし通常の HTTP リクエストを 40 秒維持したまま無言で待つなら、 UX は古いブラウザのようになります。ユーザーはスピナーを眺めながら、アプリが死んだのかまだ「考えている」のか当てずっぽうになります。

UX 以外にも純粋な技術的問題があります:

  • ChatGPT、Vercel、プロキシでのタイムアウト;
  • 進捗や部分結果(partial results)などを送れない;
  • 切断を適切に処理して復旧できない。

ここから自然な解はこうです。1 つの大きな応答から、サーバーが準備でき次第送れる小さなチャンクのストリームへ切り替えること。

チャンクには次の 2 種類があります:

  • イベント(job.progressjob.completed)— これは SSE の話;
  • 大きなペイロードの断片(レポート本文、ギフトの NDJSON 行など)— これは HTTP-stream の話。

3. SSE (Server‑Sent Events): イベント購読

SSE から始めましょう。MCP 自体がサーバーからクライアントへイベントをプッシュするために HTTP 上の SSE 接続を使っているからです。

SSE のざっくりモデル

SSE は通常の HTTP 上のプロトコルです:

  1. クライアントはエンドポイントに GET リクエストを開き、サーバーは Content-Type: text/event-stream を返します。
  2. サーバーは接続を閉じず、次のような行を書き込み続けます:
event: job.progress
data: {"jobId":"123","percent":40}

event: job.completed
data: {"jobId":"123","resultCount":12}
  1. ブラウザ側は EventSource を使います。これは:
    • 接続が切れたら自動で再接続を試みる;
    • event: + data: + 空行 2 つの形式をパースする;
    • onmessage / addEventListener("job.progress", ...) を呼ぶ。

重要な点: チャネルは片方向です。サーバーだけがクライアントへイベントを送ります。 クライアントはこの接続経由でデータを送信しません。

ChatGPT Apps において、ウィジェットが jobId でイベントに「購読」し、進捗や完了を受け取りたいときに、このモデルは非常に適しています。

Next.js 16 のミニ SSE エンドポイント例

Job の進捗イベント用のルートハンドラがあるとしましょう:

app/api/gift-jobs/[jobId]/events/route.ts

import { NextRequest } from "next/server";

export async function GET(req: NextRequest, { params }: { params: { jobId: string } }) {
  const jobId = params.jobId;

  const stream = new ReadableStream({
    start(controller) {
      // SSE イベントを送信するユーティリティ
      const send = (event: string, data: unknown) => {
        const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
        controller.enqueue(new TextEncoder().encode(payload));
      };

      send("job.started", { jobId });

      let percent = 0;
      const interval = setInterval(() => {
        percent += 20;
        if (percent >= 100) {
          send("job.completed", { jobId, totalGifts: 10 });
          clearInterval(interval);
          controller.close();
        } else {
          send("job.progress", { jobId, percent });
        }
      }, 1000);
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream", // ここで SSE を指定
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

これはおもちゃの模擬実装です。1 秒ごとにパーセントが増え、最後に job.completed が届きます。 後でこのタイマーを実際のワーカー/キューのイベントに置き換えますが、スキーム自体はそのままです。

クライアント: GiftGenius ウィジェットでの SSE 購読

React ウィジェット内で、jobId があればこのストリームに購読できます。 ウィジェットの API は ChatGPT のサンドボックスで動きますが、EventSource は通常のブラウザと同様に使用できます。

import { useEffect, useState } from "react";

export function GiftJobProgress({ jobId }: { jobId: string }) {
  const [percent, setPercent] = useState(0);

  useEffect(() => {
    const url = `/api/gift-jobs/${jobId}/events`;
    const es = new EventSource(url);

    es.addEventListener("job.progress", (event) => {
      const data = JSON.parse((event as MessageEvent).data);
      setPercent(data.percent);
    });

    es.addEventListener("job.completed", () => {
      setPercent(100);
      es.close();
    });

    es.onerror = () => {
      // ここで「接続に問題が発生。再接続を試行しています」と表示してもよい
    };

    return () => es.close();
  }, [jobId]);

  return <div>ギフト選定の進捗: {percent}%</div>;
}

これを MCP のツールと組み合わせられます。ツール start_gift_jobjobId を返し、ウィジェットの ToolOutputGiftJobProgress を描画するだけです。

自動再接続と Last‑Event‑ID

EventSource は標準で、接続が切れたら自動的に再接続を試みます。 サーバーは SSE の標準フィールド id: をイベントに付け、クライアントはヘッダー Last-Event-ID を使うことで、再接続後に抜けたイベントを追いつけます。

シンプルな GiftGenius では、当面は id: も個別のイベント ID も実装せず、 再接続時の小さな「進捗の抜け」を許容してもよいでしょう。 しかし本番、特に高負荷では次の対応が必要になります:

  • 各 SSE イベントに標準フィールド id: を付与し、クライアントが再接続時に Last-Event-ID を送れるようにする;
  • イベントのペイロードにアプリケーション固有の event_id を入れ、クライアント/バックエンドでの冪等な処理の拠り所にする。

これは冪等性と直結します。同じ job.progress が 2 回届いても、 ハンドラが既知の event_id を見れば副作用を再実行しません。

結果として、SSE は jobId 周辺のイベント購読に便利で、自動再接続と イベント ID による重複制御を提供します。 では 2 つ目のタイプ、1 つのリクエストだが応答が大きく、部分的に返したいケースを見ましょう。

4. HTTP‑streaming: 単一リクエストへの段階的応答

SSE が「独立したイベントの購読」なのに対し、HTTP ストリーミングは「1 つのリクエストに対し、時間をかけてチャンクで返す応答」です。

これは、stream : true を付けて OpenAI API を使うときに目にする仕組みそのものです。 サーバーは JSON チャンク(しばしば SSE 形式ですが、論理は「1 リクエスト ↔ 部分的な応答のストリーム」)を送り、クライアントはそれらを最終テキストに組み立てます。

自分たちの API でも同じことができます:

  • 長いテキストレポート(例: 選ばれたギフトのロジック説明)
  • 長いギフト一覧(全件待たせずに部分的にストリームする)

最小の HTTP‑stream エンドポイント(Next.js)

選定結果の「説明」テキストを生成する必要があるとします。LLM が長いテキストを書くので、 生成に合わせてウィジェットへストリームしたい、というものです。

app/api/gift-report/route.ts

import { NextRequest } from "next/server";

export async function POST(req: NextRequest) {
  const stream = new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder();

      controller.enqueue(encoder.encode("分析を開始します...\n"));

      // ここには実際の LLM 生成(チャンク)が入る想定
      for (const line of ["嗜好を収集しています...\n", "予算を計算しています...\n", "最終的な推奨...\n"]) {
        await new Promise((r) => setTimeout(r, 1000));
        controller.enqueue(encoder.encode(line));
      }

      controller.close();
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/plain; charset=utf-8",
      "Transfer-Encoding": "chunked", // ここで HTTP/stream であることを指定
    },
  });
}

技術的には、Next が chunked エンコーディングを管理します。あなたは ReadableStream を返すだけで十分です。

ウィジェットで fetch による HTTP ストリームの読み取り

クライアント側(ウィジェット内)では次のように読めます:

async function fetchReport(setText: (s: string) => void) {
  const res = await fetch("/api/gift-report", { method: "POST" });
  const reader = res.body!.getReader();
  const decoder = new TextDecoder();

  let acc = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    acc += decoder.decode(value, { stream: true });
    setText(acc); // 到着次第 UI を更新
  }
}

そしてラッパーコンポーネント:

import { useState } from "react";

export function GiftReport() {
  const [text, setText] = useState("");

  return (
    <div>
      <button onClick={() => fetchReport(setText)}>レポートを生成</button>
      <pre style={{ whiteSpace: "pre-wrap" }}>{text}</pre>
    </div>
  );
}

これは古典的なパターンです。1 つの POST リクエスト /api/gift-report に対し、 サーバーからテキストがストリームで返り、順次表示していきます。

テキストではなく JSON をストリーミングする

しばしば、文字列ではなく JSON オブジェクトをストリームしたくなります。最も一般的な形式は NDJSON(Newline‑delimited JSON)です。 各イベントは 1 行の JSON で、最後は \n です。

サーバー側例:

const stream = new ReadableStream({
  async start(controller) {
    const encoder = new TextEncoder();
    for (let i = 0; i < 5; i++) {
      const chunk = { type: "gift", index: i, name: `ギフト #${i}` };
      controller.enqueue(encoder.encode(JSON.stringify(chunk) + "\n"));
      await new Promise((r) => setTimeout(r, 500));
    }
    controller.close();
  },
});

クライアントは TextDecoder で読み、\n で分割して、個々の JSON オブジェクトをパースします。

5. SSE と HTTP‑stream の違いと選び方

ここまでで直感的なイメージはつかめたと思いますが、小さな表で要点をまとめておきます。

特徴 SSE (Server‑Sent Events) HTTP‑stream (chunked)
発起者 クライアントが GET を行い購読する クライアントがリクエスト(GET/POST)を行い、サーバーが応答をストリームする
方向 サーバー → クライアントのみ 特定のリクエストに対するサーバーの応答
セマンティクス イベントストリームの購読(pub/sub 1 つのリクエストに対する部分的な応答
組み込みプロトコル あり(event:data:id: 等) なし。形式は自分で決める(テキスト行、NDJSON、JSON)
クライアント API EventSource fetch + ReadableStream / response.body
再接続サポート 組み込み(EventSourceLast-Event-ID 手動で実装が必要
代表的なユースケース 進捗、ステータス、jobId ベースの通知 テキスト、大型 JSON 応答、LLM 出力のストリーミング

「現場の勘ルール」にまで単純化すると(乱用は禁物):

  • job があり、その周りにイベントがたくさんある → SSE;
  • 1 回のツール呼び出しが大きな結果を返し、それを部分的に見せたい → HTTP‑stream

GiftGenius では、SSE は生のプログレスバーと選定ステータスに、HTTP ストリームは長いテキスト要約や長いギフト一覧の段階的ロードに使う、という棲み分けになります。

6. MCP と GiftGenius にどう接続するか

冒頭のスキーマを思い出しましょう。モデル ↔ MCP ↔ ウィジェット ↔ バックエンド。すでにウィジェット ↔ バックエンドのストリームを見ました。 ここで一歩戻り、この流れのどこが MCP で、どこが「ただの HTTP」かをきちんと切り分けます。

MCP は、ChatGPT(MCP クライアント)があなたの MCP サーバーとどう通信するかを定めます。トランスポートでは:

  • ChatGPT は /sse への SSE 接続を開き、MCP メッセージ(応答、通知、イベント)をそこで受け取る;
  • ChatGPT は MCP リクエスト(call_toollist_tools 等)を /messages に送り、通常は POST の JSON‑RPC として送る。

GiftGenius を ChatGPT に接続したとき、このレベルはすでに通過しています。

ここに非同期ジョブとウィジェットの UX ストリームを加えると、アーキテクチャは 2 つの選択肢が出てきます。

選択肢 1 — 「ピュア MCP」: MCP サーバー自身が job.progressjob.completed を生成し、 ChatGPT は MCP‑SSE でそれらを受け取ります。その後、モデルが更新されたコンテキストでウィジェットを呼び、 ウィジェットはバックエンドと直接通信せずに進捗を描画します。これは最も「正統的」な MCP イベントの流れです。

選択肢 2 — ハイブリッド: MCP ツール start_gift_job はジョブを作成して jobId を返します。 ウィジェットは jobId を受け取り、その後は自分でバックエンドと HTTP で通信し、 SSE エンドポイント /api/gift-jobs/{jobId}/events を購読し、必要に応じて HTTP ストリームのレポートを取得します。 MCP 側では特別なことは起きません。

本コースではハイブリッドを採用します。App Router/Next に収まりがよく、ローカルデバッグも簡単だからです。 その後、慣れてきたら「ピュア MCP 通知」へ移行することもできます。

7. 再接続・タイムアウトなどネットワークの現実

ここまで理想的に聞こえました。SSE かストリームを開くだけで、データが流れ、イベントが届き、UX は輝く、と。 現実にはネットワークは予期せぬタイミングで接続を切り、インフラはタイムアウトを課します。

何が問題になり得るか

SSEHTTP-stream では遅かれ早かれ次の事象に出会います:

  • プロキシのアイドルタイムアウト: 「N 秒間データが流れなければ接続を閉じる」;
  • バックエンドの再起動(デプロイ、障害);
  • ユーザー側の不安定なネットワーク(特にモバイル)。

これは普通に起きます。大事なのは「運に任せる」のではなく、備えることです。

SSE の戦略

この領域で SSE は強みが多いです:

  • EventSource は自動再接続を行う;
  • id:Last-Event-ID があり、追いつきができる。

最低限の実務プラクティス:

  1. サーバー側で定期的に何かを送って(ハートビートのように)接続が完全なアイドルと見なされないようにする。 例えば event: ping の専用イベントや、コメント : keep-alive でもよい。
  2. クライアント側では onerror で、ウィジェット全体を壊すのではなく 「接続に問題が発生。再接続を試行しています…」のようなわかりやすいステータスを表示する。
  3. 再接続時、id: を使っているなら、その ID 以降の新しいイベントだけをサーバーが返す。 GiftGenius ではまず id: なしで始め、最後に受け取った job.progress/job.completed を元に状態を「組み直す」でもよい。

HTTP‑stream の戦略

HTTP ストリームは 1 回のリクエストなので、切断されたら基本的に最初からやり直しになります:

  • テキストレポートをストリームしている場合は、 「完全なレポートを取得できませんでした。もう一度お試しください」と伝え、最初からやり直す。
  • 構造化データ(NDJSON)をストリームしている場合は、再開(resume)メカニズムを検討する。 例えば、どこから再開すべきかを示す offsetcursor をリクエストに渡す。

最初は複雑にせず、単純なポリシーでも構いません。応答ストリームが最後まで届かなかったら、 届いた分だけ表示し、「生成を続行」ボタンで新しいリクエストを送る、といった形です。

重要なのは、ユーザーを「永遠の待ち」状態にしないことです。

8. GiftGenius への適用: 最初から最後までのシナリオ

ここまでの SSE、HTTP‑stream、MCP との 2 つのアーキテクチャを、GiftGenius の実シナリオでつなげてみます。

ユーザーが ChatGPT に「ボードゲーム好きのファンに合うギフトを選んで。予算は 100 ドルまで」と書きます。 モデルは GiftGenius を呼ぶことを決め、あなたの MCP サーバーにツール呼び出し start_gift_job を行います。 サーバーは:

  • ジョブを DB に記録する;
  • 内部キューへ投入する(キューとワーカーの詳細は次回の講義。ここでは「誰かが実行する」とします);
  • ツール呼び出しへの応答として jobId を同期的に返す。

ウィジェット GiftGenius は jobId を含む ToolOutput を受け取り、次のコンポーネントを描画します:

function GiftGeniusRoot({ jobId }: { jobId: string }) {
  return (
    <div>
      <h2>最適なギフトを探しています...</h2>
      <GiftJobProgress jobId={jobId} />
      <GiftReport />
    </div>
  );
}

コンポーネント GiftJobProgress は SSE /api/gift-jobs/{jobId}/events を購読して進捗を描画します。 各 job.progress でパーセントを更新し、job.completed100% にして、「詳細レポートを表示」ボタンを有効にする、などが考えられます。

コンポーネント GiftReport はボタンクリックで POST /api/gift-reportjobId を渡す)を送り、 サーバーが HTTP ストリームのチャンクを返す間、テキストレポートを段階的に表示します。

SSE 接続が切れた場合、ウィジェットはソフトな警告を表示し、EventSource は再接続を試みます。 レポートのストリームに問題があれば、届いた部分を見せつつ「生成を続行」や「もう一度試す」ボタンを提示します。

ChatGPT と MCP の視点では:

  • MCP はツール呼び出し start_gift_job と、場合によってはジョブのステータス通知を目にします;
  • ストリーム周りの UX は主にウィジェットとあなたのバックエンドの HTTP レベルで実装されます。

9. SSE と HTTP‑stream での典型的なミス

エラー No.1: SSE と HTTP‑stream を「同じもの」と見なす。
確かに下層はどちらも HTTP と chunked 応答ですが、セマンティクスは大きく異なります。 SSE は独立したイベントの購読で、いつ来るかはクライアントに事前にはわかりません。 HTTP ストリームは、時間に引き伸ばされた 1 つの具体的な応答です。 複数の jobId への購読を 1 本の HTTP ストリームで実装しようとすると、 バイトの上に自作プロトコルを発明することになり、実質的に SSE の半分を再実装する羽目になります。

エラー No.2: SSE の自動再接続を無視し、冪等性を考えない。
多くの人が「単純」な SSE サーバーを書きます。data: ... だけを送り、標準の id:Last-Event-ID 用)も、 アプリ固有の event_id も入れません。 すると最初の切断と再接続でイベントの重複が増殖します。 練られた event_id と「このイベントは既に見た」というロジックがないと、 クライアントの処理は同じ状態を 2 回更新したり、同じ job.completed を 2 回出したり、 ひどい場合は重複課金/重複付与などの副作用を 2 度起こします。

エラー No.3: ワーカーの「くしゃみ」をすべて個別の SSE イベントで送る。
ミリ秒単位で進捗を送れば、滑らかなアニメーションどころかネットワークとクライアントを殺す可能性が高くなります。 更新は集約し、例えば 200500 ms ごとやステージ変更時に送るのが賢明です。 スロットリングやバックプレッシャーは別途扱いますが、この段階からイベント頻度を考えるべきです。

エラー No.4: 明確な形式なしに HTTP ストリーム上で複雑なプロトコルを作る。
典型的なアンチパターンは、区切りなしに JSON をストリームし、どこで 1 つのオブジェクトが終わり、次が始まるかを「推測」すること。 あるいは 1 つのストリームでテキストと JSON を混在させること。 最善はシンプルで理解しやすい形式を選ぶことです。行単位のテキスト、または NDJSON(1 行 1 JSON)、 あるいは明示的な区切り。そうすればクライアントのパーサは健全なままです。

エラー No.5: タイムアウトと「永遠の」ストリームを忘れる。
ときどき、510 分間何も送らない SSE エンドポイントを作り、 ユーザーからサーバーまでの経路(ロードバランサ、API ゲートウェイ、企業プロキシ)で切断されることに驚く、ということが起きます。 定期的なハートビートイベントやコメントは、接続を生かし、切断を早期に検知する助けになります。 また、HTTP ストリームは無限の応答になってはいけません。永続購読には SSE があります。

エラー No.6: 正規のイベントの代わりに、HTTP ストリームで複雑な pub/sub をやろうとする。
「ストリーム 1 本に、進捗も部分結果もログも全部流そう」という誘惑が生まれることがあります。 その結果、クライアントには各チャンクを解析して、どの jobId に属するかを判定する複雑な多重化処理が生まれます。 多くの場合、job.progressjob.completed といったイベントで SSE を使い、 ジョブごとにチャンネルを分けるほうが、自家製の巨大プロトコルを HTTP ストリーム上に作るより簡単で堅牢です。

エラー No.7: 「ストリームは決して落ちない」という前提に UX をハードコードする。
どんなストリームもいつか切れます。切れたとき、ウィジェットが永遠に回るプログレスバーのままで、何のアクションもできないなら、 UX は「壊れている」と感じられます。 「接続が切れたようです。ギフト選定を再実行してみてください」のような簡単なメッセージと「再試行」ボタンがあるだけで、 沈黙よりずっと良くなります。

コメント
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION