1. ChatGPT App のアーキテクチャで「ストリーム」はどこに現れるのか
SSE と HTTP-stream のどちらが良いか議論する前に、私たちのスタックのどこにストリームが存在しているのかを理解するのが有益です。
大きく 3 つのレベルに分けられます。
第一に、ChatGPT とモデルのレベル。 モデル自体がすでにトークン単位で応答をストリームします。つまり、回答テキストが「タイプアウト」されるように見えます。 これもストリームですが、これは OpenAI によって完全に制御され、あなたのコードには直接関係しません。
第二に、MCP のレベル。 ChatGPT があなたの MCP サーバーに接続するとき、通常は SSE 接続を維持します。 サーバーは MCP の JSON-RPC メッセージ(応答と通知)をプッシュし、ChatGPT は別の HTTP エンドポイントにリクエストを送ります。 例: /messages。MCP の用語では、これが基本のトランスポートです。
第三に、Apps SDK とあなたのバックエンドのレベル。 あなたの React ウィジェット GiftGenius は ChatGPT のサンドボックス内で動作し、バックエンド/MCP ゲートウェイとは HTTP で通信します。 通常の fetch、ストリーム付きの fetch(ReadableStream)、 あるいは SSE 購読(EventSource)です。
これらのレイヤーを混同しないことが重要です。 MCP のイベントは ChatGPT とサーバーの間の「配線」です。一方、ウィジェットとあなたの HTTP バックエンドの間の SSE/HTTP-stream は、あなたの担当範囲です。
図にすると次のようになります。
flowchart TD
subgraph ChatGPT
UI[ChatGPT UI + モデル]
W[GiftGenius Widget]
end
subgraph YourInfra[開発者のインフラ]
GW[MCP Gateway / Backend]
MCP[MCP Server]
end
UI -- "tool-call / 応答\n(内部のトークンストリーム)" --> W
UI <-- "MCP over SSE\n(/sse + /messages)" --> MCP
W <-- "HTTP / fetch / SSE / stream" --> GW
GW <-- "JSON-RPC MCP" --> MCP
今日は Widget ↔ Backend の矢印に焦点を当て、MCP のトランスポートが SSE に基づいている点も一部振り返ります。
まさにこの区間 — Widget ↔ Backend — で、どの通信方法を選ぶか(通常の HTTP なのか、ストリームなのか)を決める必要があります。次のセクションで、ここで「ふつうの」HTTP がすぐに不足する理由を見ていきます。
2. なぜ通常の HTTP リクエストだけでは足りないのか
HTTP の標準モデルは「リクエスト → 単一のレスポンス」です。クライアントが何かを尋ね、サーバーが 1 度だけ応答し、接続は閉じられます。
多くのタスクではそれで十分です。たとえば job の現在のステータス取得、ユーザー設定の保存、 データベースに既にあるギフトの一覧取得などです。
しかし、時間のかかる処理を行うと、すぐにきしみ始めます。
GiftGenius を想像してください。次のような処理をします:
- 複数のソース(購入履歴、ウィッシュリスト、SNS)からシグナルを収集する。
- それをいくつかの LLM リクエストにかける。
- 数百の候補からパーソナライズされたランキングを作る。
これ全体に数十秒かかることがあります。もし通常の HTTP リクエストを 40 秒維持したまま無言で待つなら、 UX は古いブラウザのようになります。ユーザーはスピナーを眺めながら、アプリが死んだのかまだ「考えている」のか当てずっぽうになります。
UX 以外にも純粋な技術的問題があります:
- ChatGPT、Vercel、プロキシでのタイムアウト;
- 進捗や部分結果(partial results)などを送れない;
- 切断を適切に処理して復旧できない。
ここから自然な解はこうです。1 つの大きな応答から、サーバーが準備でき次第送れる小さなチャンクのストリームへ切り替えること。
チャンクには次の 2 種類があります:
- イベント(job.progress、job.completed)— これは SSE の話;
- 大きなペイロードの断片(レポート本文、ギフトの NDJSON 行など)— これは HTTP-stream の話。
3. SSE (Server‑Sent Events): イベント購読
SSE から始めましょう。MCP 自体がサーバーからクライアントへイベントをプッシュするために HTTP 上の SSE 接続を使っているからです。
SSE のざっくりモデル
SSE は通常の HTTP 上のプロトコルです:
- クライアントはエンドポイントに GET リクエストを開き、サーバーは Content-Type: text/event-stream を返します。
- サーバーは接続を閉じず、次のような行を書き込み続けます:
event: job.progress
data: {"jobId":"123","percent":40}
event: job.completed
data: {"jobId":"123","resultCount":12}
- ブラウザ側は EventSource を使います。これは:
- 接続が切れたら自動で再接続を試みる;
- event: + data: + 空行 2 つの形式をパースする;
- onmessage / addEventListener("job.progress", ...) を呼ぶ。
重要な点: チャネルは片方向です。サーバーだけがクライアントへイベントを送ります。 クライアントはこの接続経由でデータを送信しません。
ChatGPT Apps において、ウィジェットが jobId でイベントに「購読」し、進捗や完了を受け取りたいときに、このモデルは非常に適しています。
Next.js 16 のミニ SSE エンドポイント例
Job の進捗イベント用のルートハンドラがあるとしましょう:
app/api/gift-jobs/[jobId]/events/route.ts
import { NextRequest } from "next/server";
export async function GET(req: NextRequest, { params }: { params: { jobId: string } }) {
const jobId = params.jobId;
const stream = new ReadableStream({
start(controller) {
// SSE イベントを送信するユーティリティ
const send = (event: string, data: unknown) => {
const payload = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
controller.enqueue(new TextEncoder().encode(payload));
};
send("job.started", { jobId });
let percent = 0;
const interval = setInterval(() => {
percent += 20;
if (percent >= 100) {
send("job.completed", { jobId, totalGifts: 10 });
clearInterval(interval);
controller.close();
} else {
send("job.progress", { jobId, percent });
}
}, 1000);
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream", // ここで SSE を指定
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
これはおもちゃの模擬実装です。1 秒ごとにパーセントが増え、最後に job.completed が届きます。 後でこのタイマーを実際のワーカー/キューのイベントに置き換えますが、スキーム自体はそのままです。
クライアント: GiftGenius ウィジェットでの SSE 購読
React ウィジェット内で、jobId があればこのストリームに購読できます。 ウィジェットの API は ChatGPT のサンドボックスで動きますが、EventSource は通常のブラウザと同様に使用できます。
import { useEffect, useState } from "react";
export function GiftJobProgress({ jobId }: { jobId: string }) {
const [percent, setPercent] = useState(0);
useEffect(() => {
const url = `/api/gift-jobs/${jobId}/events`;
const es = new EventSource(url);
es.addEventListener("job.progress", (event) => {
const data = JSON.parse((event as MessageEvent).data);
setPercent(data.percent);
});
es.addEventListener("job.completed", () => {
setPercent(100);
es.close();
});
es.onerror = () => {
// ここで「接続に問題が発生。再接続を試行しています」と表示してもよい
};
return () => es.close();
}, [jobId]);
return <div>ギフト選定の進捗: {percent}%</div>;
}
これを MCP のツールと組み合わせられます。ツール start_gift_job は jobId を返し、ウィジェットの ToolOutput で GiftJobProgress を描画するだけです。
自動再接続と Last‑Event‑ID
EventSource は標準で、接続が切れたら自動的に再接続を試みます。 サーバーは SSE の標準フィールド id: をイベントに付け、クライアントはヘッダー Last-Event-ID を使うことで、再接続後に抜けたイベントを追いつけます。
シンプルな GiftGenius では、当面は id: も個別のイベント ID も実装せず、 再接続時の小さな「進捗の抜け」を許容してもよいでしょう。 しかし本番、特に高負荷では次の対応が必要になります:
- 各 SSE イベントに標準フィールド id: を付与し、クライアントが再接続時に Last-Event-ID を送れるようにする;
- イベントのペイロードにアプリケーション固有の event_id を入れ、クライアント/バックエンドでの冪等な処理の拠り所にする。
これは冪等性と直結します。同じ job.progress が 2 回届いても、 ハンドラが既知の event_id を見れば副作用を再実行しません。
結果として、SSE は jobId 周辺のイベント購読に便利で、自動再接続と イベント ID による重複制御を提供します。 では 2 つ目のタイプ、1 つのリクエストだが応答が大きく、部分的に返したいケースを見ましょう。
4. HTTP‑streaming: 単一リクエストへの段階的応答
SSE が「独立したイベントの購読」なのに対し、HTTP ストリーミングは「1 つのリクエストに対し、時間をかけてチャンクで返す応答」です。
これは、stream : true を付けて OpenAI API を使うときに目にする仕組みそのものです。 サーバーは JSON チャンク(しばしば SSE 形式ですが、論理は「1 リクエスト ↔ 部分的な応答のストリーム」)を送り、クライアントはそれらを最終テキストに組み立てます。
自分たちの API でも同じことができます:
- 長いテキストレポート(例: 選ばれたギフトのロジック説明)
- 長いギフト一覧(全件待たせずに部分的にストリームする)
最小の HTTP‑stream エンドポイント(Next.js)
選定結果の「説明」テキストを生成する必要があるとします。LLM が長いテキストを書くので、 生成に合わせてウィジェットへストリームしたい、というものです。
app/api/gift-report/route.ts
import { NextRequest } from "next/server";
export async function POST(req: NextRequest) {
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
controller.enqueue(encoder.encode("分析を開始します...\n"));
// ここには実際の LLM 生成(チャンク)が入る想定
for (const line of ["嗜好を収集しています...\n", "予算を計算しています...\n", "最終的な推奨...\n"]) {
await new Promise((r) => setTimeout(r, 1000));
controller.enqueue(encoder.encode(line));
}
controller.close();
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/plain; charset=utf-8",
"Transfer-Encoding": "chunked", // ここで HTTP/stream であることを指定
},
});
}
技術的には、Next が chunked エンコーディングを管理します。あなたは ReadableStream を返すだけで十分です。
ウィジェットで fetch による HTTP ストリームの読み取り
クライアント側(ウィジェット内)では次のように読めます:
async function fetchReport(setText: (s: string) => void) {
const res = await fetch("/api/gift-report", { method: "POST" });
const reader = res.body!.getReader();
const decoder = new TextDecoder();
let acc = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
acc += decoder.decode(value, { stream: true });
setText(acc); // 到着次第 UI を更新
}
}
そしてラッパーコンポーネント:
import { useState } from "react";
export function GiftReport() {
const [text, setText] = useState("");
return (
<div>
<button onClick={() => fetchReport(setText)}>レポートを生成</button>
<pre style={{ whiteSpace: "pre-wrap" }}>{text}</pre>
</div>
);
}
これは古典的なパターンです。1 つの POST リクエスト /api/gift-report に対し、 サーバーからテキストがストリームで返り、順次表示していきます。
テキストではなく JSON をストリーミングする
しばしば、文字列ではなく JSON オブジェクトをストリームしたくなります。最も一般的な形式は NDJSON(Newline‑delimited JSON)です。 各イベントは 1 行の JSON で、最後は \n です。
サーバー側例:
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder();
for (let i = 0; i < 5; i++) {
const chunk = { type: "gift", index: i, name: `ギフト #${i}` };
controller.enqueue(encoder.encode(JSON.stringify(chunk) + "\n"));
await new Promise((r) => setTimeout(r, 500));
}
controller.close();
},
});
クライアントは TextDecoder で読み、\n で分割して、個々の JSON オブジェクトをパースします。
5. SSE と HTTP‑stream の違いと選び方
ここまでで直感的なイメージはつかめたと思いますが、小さな表で要点をまとめておきます。
| 特徴 | SSE (Server‑Sent Events) | HTTP‑stream (chunked) |
|---|---|---|
| 発起者 | クライアントが GET を行い購読する | クライアントがリクエスト(GET/POST)を行い、サーバーが応答をストリームする |
| 方向 | サーバー → クライアントのみ | 特定のリクエストに対するサーバーの応答 |
| セマンティクス | イベントストリームの購読(pub/sub) | 1 つのリクエストに対する部分的な応答 |
| 組み込みプロトコル | あり(event:、data:、id: 等) | なし。形式は自分で決める(テキスト行、NDJSON、JSON) |
| クライアント API | EventSource | fetch + ReadableStream / response.body |
| 再接続サポート | 組み込み(EventSource、Last-Event-ID) | 手動で実装が必要 |
| 代表的なユースケース | 進捗、ステータス、jobId ベースの通知 | テキスト、大型 JSON 応答、LLM 出力のストリーミング |
「現場の勘ルール」にまで単純化すると(乱用は禁物):
- job があり、その周りにイベントがたくさんある → SSE;
- 1 回のツール呼び出しが大きな結果を返し、それを部分的に見せたい → HTTP‑stream。
GiftGenius では、SSE は生のプログレスバーと選定ステータスに、HTTP ストリームは長いテキスト要約や長いギフト一覧の段階的ロードに使う、という棲み分けになります。
6. MCP と GiftGenius にどう接続するか
冒頭のスキーマを思い出しましょう。モデル ↔ MCP ↔ ウィジェット ↔ バックエンド。すでにウィジェット ↔ バックエンドのストリームを見ました。 ここで一歩戻り、この流れのどこが MCP で、どこが「ただの HTTP」かをきちんと切り分けます。
MCP は、ChatGPT(MCP クライアント)があなたの MCP サーバーとどう通信するかを定めます。トランスポートでは:
- ChatGPT は /sse への SSE 接続を開き、MCP メッセージ(応答、通知、イベント)をそこで受け取る;
- ChatGPT は MCP リクエスト(call_tool、list_tools 等)を /messages に送り、通常は POST の JSON‑RPC として送る。
GiftGenius を ChatGPT に接続したとき、このレベルはすでに通過しています。
ここに非同期ジョブとウィジェットの UX ストリームを加えると、アーキテクチャは 2 つの選択肢が出てきます。
選択肢 1 — 「ピュア MCP」: MCP サーバー自身が job.progress と job.completed を生成し、 ChatGPT は MCP‑SSE でそれらを受け取ります。その後、モデルが更新されたコンテキストでウィジェットを呼び、 ウィジェットはバックエンドと直接通信せずに進捗を描画します。これは最も「正統的」な MCP イベントの流れです。
選択肢 2 — ハイブリッド: MCP ツール start_gift_job はジョブを作成して jobId を返します。 ウィジェットは jobId を受け取り、その後は自分でバックエンドと HTTP で通信し、 SSE エンドポイント /api/gift-jobs/{jobId}/events を購読し、必要に応じて HTTP ストリームのレポートを取得します。 MCP 側では特別なことは起きません。
本コースではハイブリッドを採用します。App Router/Next に収まりがよく、ローカルデバッグも簡単だからです。 その後、慣れてきたら「ピュア MCP 通知」へ移行することもできます。
7. 再接続・タイムアウトなどネットワークの現実
ここまで理想的に聞こえました。SSE かストリームを開くだけで、データが流れ、イベントが届き、UX は輝く、と。 現実にはネットワークは予期せぬタイミングで接続を切り、インフラはタイムアウトを課します。
何が問題になり得るか
SSE と HTTP-stream では遅かれ早かれ次の事象に出会います:
- プロキシのアイドルタイムアウト: 「N 秒間データが流れなければ接続を閉じる」;
- バックエンドの再起動(デプロイ、障害);
- ユーザー側の不安定なネットワーク(特にモバイル)。
これは普通に起きます。大事なのは「運に任せる」のではなく、備えることです。
SSE の戦略
この領域で SSE は強みが多いです:
- EventSource は自動再接続を行う;
- id: と Last-Event-ID があり、追いつきができる。
最低限の実務プラクティス:
- サーバー側で定期的に何かを送って(ハートビートのように)接続が完全なアイドルと見なされないようにする。 例えば event: ping の専用イベントや、コメント : keep-alive でもよい。
- クライアント側では onerror で、ウィジェット全体を壊すのではなく 「接続に問題が発生。再接続を試行しています…」のようなわかりやすいステータスを表示する。
- 再接続時、id: を使っているなら、その ID 以降の新しいイベントだけをサーバーが返す。 GiftGenius ではまず id: なしで始め、最後に受け取った job.progress/job.completed を元に状態を「組み直す」でもよい。
HTTP‑stream の戦略
HTTP ストリームは 1 回のリクエストなので、切断されたら基本的に最初からやり直しになります:
- テキストレポートをストリームしている場合は、 「完全なレポートを取得できませんでした。もう一度お試しください」と伝え、最初からやり直す。
- 構造化データ(NDJSON)をストリームしている場合は、再開(resume)メカニズムを検討する。 例えば、どこから再開すべきかを示す offset や cursor をリクエストに渡す。
最初は複雑にせず、単純なポリシーでも構いません。応答ストリームが最後まで届かなかったら、 届いた分だけ表示し、「生成を続行」ボタンで新しいリクエストを送る、といった形です。
重要なのは、ユーザーを「永遠の待ち」状態にしないことです。
8. GiftGenius への適用: 最初から最後までのシナリオ
ここまでの SSE、HTTP‑stream、MCP との 2 つのアーキテクチャを、GiftGenius の実シナリオでつなげてみます。
ユーザーが ChatGPT に「ボードゲーム好きのファンに合うギフトを選んで。予算は 100 ドルまで」と書きます。 モデルは GiftGenius を呼ぶことを決め、あなたの MCP サーバーにツール呼び出し start_gift_job を行います。 サーバーは:
- ジョブを DB に記録する;
- 内部キューへ投入する(キューとワーカーの詳細は次回の講義。ここでは「誰かが実行する」とします);
- ツール呼び出しへの応答として jobId を同期的に返す。
ウィジェット GiftGenius は jobId を含む ToolOutput を受け取り、次のコンポーネントを描画します:
function GiftGeniusRoot({ jobId }: { jobId: string }) {
return (
<div>
<h2>最適なギフトを探しています...</h2>
<GiftJobProgress jobId={jobId} />
<GiftReport />
</div>
);
}
コンポーネント GiftJobProgress は SSE /api/gift-jobs/{jobId}/events を購読して進捗を描画します。 各 job.progress でパーセントを更新し、job.completed で 100% にして、「詳細レポートを表示」ボタンを有効にする、などが考えられます。
コンポーネント GiftReport はボタンクリックで POST /api/gift-report(jobId を渡す)を送り、 サーバーが HTTP ストリームのチャンクを返す間、テキストレポートを段階的に表示します。
SSE 接続が切れた場合、ウィジェットはソフトな警告を表示し、EventSource は再接続を試みます。 レポートのストリームに問題があれば、届いた部分を見せつつ「生成を続行」や「もう一度試す」ボタンを提示します。
ChatGPT と MCP の視点では:
- MCP はツール呼び出し start_gift_job と、場合によってはジョブのステータス通知を目にします;
- ストリーム周りの UX は主にウィジェットとあなたのバックエンドの HTTP レベルで実装されます。
9. SSE と HTTP‑stream での典型的なミス
エラー No.1: SSE と HTTP‑stream を「同じもの」と見なす。
確かに下層はどちらも HTTP と chunked 応答ですが、セマンティクスは大きく異なります。 SSE は独立したイベントの購読で、いつ来るかはクライアントに事前にはわかりません。 HTTP ストリームは、時間に引き伸ばされた 1 つの具体的な応答です。 複数の jobId への購読を 1 本の HTTP ストリームで実装しようとすると、 バイトの上に自作プロトコルを発明することになり、実質的に SSE の半分を再実装する羽目になります。
エラー No.2: SSE の自動再接続を無視し、冪等性を考えない。
多くの人が「単純」な SSE サーバーを書きます。data: ... だけを送り、標準の id:(Last-Event-ID 用)も、 アプリ固有の event_id も入れません。 すると最初の切断と再接続でイベントの重複が増殖します。 練られた event_id と「このイベントは既に見た」というロジックがないと、 クライアントの処理は同じ状態を 2 回更新したり、同じ job.completed を 2 回出したり、 ひどい場合は重複課金/重複付与などの副作用を 2 度起こします。
エラー No.3: ワーカーの「くしゃみ」をすべて個別の SSE イベントで送る。
ミリ秒単位で進捗を送れば、滑らかなアニメーションどころかネットワークとクライアントを殺す可能性が高くなります。 更新は集約し、例えば 200–500 ms ごとやステージ変更時に送るのが賢明です。 スロットリングやバックプレッシャーは別途扱いますが、この段階からイベント頻度を考えるべきです。
エラー No.4: 明確な形式なしに HTTP ストリーム上で複雑なプロトコルを作る。
典型的なアンチパターンは、区切りなしに JSON をストリームし、どこで 1 つのオブジェクトが終わり、次が始まるかを「推測」すること。 あるいは 1 つのストリームでテキストと JSON を混在させること。 最善はシンプルで理解しやすい形式を選ぶことです。行単位のテキスト、または NDJSON(1 行 1 JSON)、 あるいは明示的な区切り。そうすればクライアントのパーサは健全なままです。
エラー No.5: タイムアウトと「永遠の」ストリームを忘れる。
ときどき、5–10 分間何も送らない SSE エンドポイントを作り、 ユーザーからサーバーまでの経路(ロードバランサ、API ゲートウェイ、企業プロキシ)で切断されることに驚く、ということが起きます。 定期的なハートビートイベントやコメントは、接続を生かし、切断を早期に検知する助けになります。 また、HTTP ストリームは無限の応答になってはいけません。永続購読には SSE があります。
エラー No.6: 正規のイベントの代わりに、HTTP ストリームで複雑な pub/sub をやろうとする。
「ストリーム 1 本に、進捗も部分結果もログも全部流そう」という誘惑が生まれることがあります。 その結果、クライアントには各チャンクを解析して、どの jobId に属するかを判定する複雑な多重化処理が生まれます。 多くの場合、job.progress、job.completed といったイベントで SSE を使い、 ジョブごとにチャンネルを分けるほうが、自家製の巨大プロトコルを HTTP ストリーム上に作るより簡単で堅牢です。
エラー No.7: 「ストリームは決して落ちない」という前提に UX をハードコードする。
どんなストリームもいつか切れます。切れたとき、ウィジェットが永遠に回るプログレスバーのままで、何のアクションもできないなら、 UX は「壊れている」と感じられます。 「接続が切れたようです。ギフト選定を再実行してみてください」のような簡単なメッセージと「再試行」ボタンがあるだけで、 沈黙よりずっと良くなります。
GO TO FULL VERSION