1. 为什么 ChatGPT App 需要异步任务
如果世界足够理想,你的每个 MCP 工具调用都能在几百毫秒内完成。但现实中有趣的事情往往——既耗时又沉重:
- 解析包含用户购买历史的大型 CSV;
- 聚合多个外部 API 的数据,而它们时常要么睡着了,要么以 503 回应;
- 构建包含大量中间步骤的复杂推荐;
- 生成体量很大的报表与演示文稿。
如果尝试把这些塞进一个同步的 tool‑call,你会遇到三类问题。
首先,超时。ChatGPT 会话、HTTP 基础设施、MCP 客户端——都不是为了“几分钟后再回复”而设计的。长时间保持连接的服务器,在 ChatGPT 和用户看来都会像是“卡住了”。
其次,负载管理。如果有一百位用户同时启动“新年礼物超级分析”,你绝不希望 MCP 服务器在 HTTP 线程里同步持有一百个长任务。你需要一层能够接住流量高峰、将任务排队并由多个工作进程处理的中间层。
第三,UX。用户在 GiftGenius 小部件里点了按钮,然后盯着一个转圈圈看上 40 秒——体验就像早年的网银。更好的模式是“快速响应 + 进度 + 可取消”。
这些问题可以用一个通用方案来解决:“启动 → 队列 → 后台 → 事件”。
2. MCP 语境下的 async‑job 基本架构
以我们的 GiftGenius 为例。假设新增了一个重型场景:“基于购买历史和好友社交数据的深度偏好分析”。这类任务可能需要几分钟,因此:
- MCP 工具(tool)接收来自模型的请求参数。
- 它不立刻开始计算,而是在数据库中创建 Job 记录。
- 把任务放入队列。
- 立刻回复 ChatGPT:“分析已启动,这是 jobId”。
- 后台工作进程从队列取出任务、执行耗时工作,期间发送 MCP 事件 job.progress 和 job.partial, 最后发送 job.completed 或 job.failed。
从架构角度看,大致如下:
flowchart LR
subgraph ChatGPT
U[用户] --> GPT[模型 + ChatGPT UI]
end
GPT -->|call_tool analyze_preferences| MCP[MCP 服务器]
subgraph Backend
MCP -->|创建 Job| DB[(任务数据库)]
MCP -->|enqueue| Q[队列]
W[工作进程] -->|take job| Q
W -->|update status/progress| DB
W -->|MCP events: job.progress/job.completed| MCP
end
MCP -->|SSE events| GPT
关键点:MCP 服务器不一定是单体。它常常充当你内部异步基础设施的门面:接收 tool‑call、创建 job、发送事件,而耗时工作由独立的工作进程完成。
3. 异步任务的数据模型
从一个简单的 Job 模型开始。我们使用 TypeScript 和一个示意的 Node/MCP 服务器,便于你直接融入自己的技术栈。
内存/数据库中的最简模型可以是这样:
// openai/jobs/model.ts
export type JobStatus =
| 'pending'
| 'in_progress'
| 'completed'
| 'failed'
| 'canceled';
export interface GiftJob {
id: string; // jobId
type: 'deep_gift_analysis';
status: JobStatus;
payload: {
recipientProfile: string; // 文本/档案 ID
budget: number;
};
result?: unknown; // 最终推荐
error?: string; // 错误原因
attempts: number; // 已尝试执行的次数
createdAt: Date;
updatedAt: Date;
}
在真实项目里,你会把 GiftJob 存在 Postgres、DynamoDB、Firestore 等处,但这里我们关注的字段是:
- status —— 任务当前状态,会体现在事件与 UX 中;
- attempts —— 重试计数;
- error —— 用于日志与调试;
- payload —— 工作进程处理时所需的输入数据。
4. 创建 async‑job 的 MCP 工具
来看工具 start_deep_analysis。过去它也许会同步完成全部工作,现在只负责把任务放入队列并返回 jobId。
// openai/tools/startDeepAnalysis.ts
import { v4 as uuid } from 'uuid';
import { createJobAndEnqueue } from '../jobs/queue';
// MCP SDK 的伪类型
type StartDeepAnalysisInput = {
recipientProfile: string;
budget: number;
};
type StartDeepAnalysisOutput = {
jobId: string;
message: string;
};
export async function startDeepAnalysisTool(
input: StartDeepAnalysisInput
): Promise<StartDeepAnalysisOutput> {
const jobId = uuid();
await createJobAndEnqueue({
id: jobId,
type: 'deep_gift_analysis',
status: 'pending',
payload: {
recipientProfile: input.recipientProfile,
budget: input.budget,
},
attempts: 0,
createdAt: new Date(),
updatedAt: new Date(),
});
return {
jobId,
message: `已启动深度分析。任务 ID:${jobId}。我会在处理过程中持续推送更新。`,
};
}
这里要点如下:
- MCP 工具需要快速返回:最多只做几次对数据库/队列的调用;
- 它返回一个带有 jobId 的结构化响应,ChatGPT 可据此“向用户解释”, 而 GiftGenius 小部件也可以将其保存到 widgetState 中。
对此工具的 JSON Schema 只需把 jobId 描述为字符串、message 描述为可读文本—— 模型会理解它是任务标识,并能在后续对话步骤中引用它。
5. 简易队列与工作进程:教学版本
先不引入 Redis、RabbitMQ 等,做一个简化的内存队列。生产环境当然会使用专门的服务(SQS/BullMQ/Cloud Tasks 等),但逻辑相同。
先准备队列骨架:
// openai/jobs/queue.ts
import type { GiftJob } from './model';
const jobs = new Map<string, GiftJob>(); // 内存中的 "DB"
export const queue: string[] = []; // 简化的按 id 的队列
export async function createJobAndEnqueue(job: GiftJob) {
jobs.set(job.id, job);
queue.push(job.id);
}
export function getJob(id: string): GiftJob | undefined {
return jobs.get(id);
}
export function updateJob(id: string, patch: Partial<GiftJob>) {
const job = jobs.get(id);
if (!job) return;
const updated: GiftJob = { ...job, ...patch, updatedAt: new Date() };
jobs.set(id, updated);
}
接着写一个原始的工作进程,定期查看队列、取出并处理任务:
// openai/jobs/worker.ts
import { getJob, updateJob } from './queue';
import { emitJobEvent } from './events';
async function processJob(jobId: string) {
const job = getJob(jobId);
if (!job) return;
updateJob(jobId, { status: 'in_progress' });
await emitJobEvent(jobId, 'job.started', {});
try {
// 在这里调用耗时的业务逻辑
const result = await doDeepGiftAnalysis(job.id, job.payload);
updateJob(jobId, { status: 'completed', result });
await emitJobEvent(jobId, 'job.completed', { resultSummary: summarize(result) });
} catch (err) {
updateJob(jobId, {
status: 'failed',
error: (err as Error).message,
});
await emitJobEvent(jobId, 'job.failed', { error: 'Internal error' });
}
}
以及一个“循环”的工作进程,在应用启动时某处运行它:
// openai/jobs/workerLoop.ts
import { queue } from './queue';
import { processJob } from './worker';
export function startWorkerLoop() {
setInterval(async () => {
const jobId = queue.shift(); // 严格来说需要并发竞争保护
if (!jobId) return;
await processJob(jobId);
}, 1000); // 每秒检查一次队列
}
这只是教学示例。真实世界中不会用 setInterval,而是让真正的队列在有新消息时“唤醒”工作进程。 但核心思路很清晰:工作进程与 MCP 工具解耦、在后台运行,并通过事件与 MCP 服务器通信。
6. 从工作进程生成 MCP 事件
你在前面的课程中已经见过 MCP 事件的格式:类型(type)、唯一的 event_id、 timestamp、job_id 以及 payload。 现在我们展示一下,工作进程如何调用 helper emitJobEvent, 而它又如何通过 MCP 服务器的 SSE 通道把事件送达 ChatGPT。
一个简单的 helper 示例:
// openai/jobs/events.ts
import { randomUUID } from 'crypto';
import { sendMcpEvent } from '../mcp/eventBus';
export async function emitJobEvent(
jobId: string,
type: 'job.started' | 'job.progress' | 'job.completed' | 'job.failed',
payload: unknown
) {
const event = {
event_id: randomUUID(),
type,
job_id: jobId,
timestamp: new Date().toISOString(),
payload,
};
await sendMcpEvent(event);
}
而 MCP 服务器内部的 sendMcpEvent 已经知道如何把事件推送到 MCP SDK 的 SSEServerTransport:例如通过本地事件总线或 Redis Pub/Sub,正如我们在第 12 模块中讲过的那样。
要点:工作进程不直接与 ChatGPT 通信。它与 MCP 服务器通信,而 MCP 服务器维护 SSE 连接并把事件转发给客户端。
7. 工作进程中的进度与部分结果(partial results)
说点更有趣的:进度与部分结果。在 GiftGenius 中,耗时分析可以拆成多个阶段:
- 数据收集与规范化;
- 构建基础分群;
- 生成初步的礼物创意;
- 最终重排与文本说明。
在每个阶段,我们都可以发送 job.progress,并在合适的时候发送 job.partial,让 UI 先展示部分礼物。
示意性的工作进程:
async function doDeepGiftAnalysis(jobId: string, payload: GiftJob['payload']) {
await emitJobEvent(jobId, 'job.progress', { step: 1, totalSteps: 4 });
const normalized = await collectAndNormalizeData(payload);
await emitJobEvent(jobId, 'job.progress', { step: 2, totalSteps: 4 });
const roughGifts = await generateInitialGifts(normalized);
await emitJobEvent(jobId, 'job.partial', { gifts: roughGifts.slice(0, 3) });
await emitJobEvent(jobId, 'job.progress', { step: 3, totalSteps: 4 });
const finalGifts = await rerankAndBeautify(roughGifts);
await emitJobEvent(jobId, 'job.progress', { step: 4, totalSteps: 4 });
return finalGifts;
}
小部件在接收事件时,可以先展示 3 个“草案”礼物并标注“仍在细化”, 在收到 job.completed 后再更新列表并移除加载指示器。 这完全契合我们在第 3 讲中讨论过的 UX 模式。
8. 工作进程的重试(retry)逻辑
接下来是最让人紧张的部分:错误与重试。
设想工作进程在处理任务时调用了外部商品列表 API,而它时不时返回 500 或 429。 第一次出错就放弃——并不合理。但无限重试也不可取:你会对自己或第三方服务发起 DDoS。
我们需要带指数退避(exponential backoff)且有尝试次数上限的重试策略。
先做一个错误分类,这在后续课程也会用到:
- 临时性(transient)—— 超时、500、503、429;
- 永久性(permanent)—— 错误输入、资源不存在;
- 致命(bug)—— 代码缺陷、TypeError、意外异常。
只有临时性错误才值得重试。其余应如实标记为 'failed'。
我们先简化,做两个 helper:
// openai/jobs/retry.ts
export function shouldRetry(error: unknown): boolean {
if (!(error instanceof Error)) return false;
// 约定:HTTP 5xx 或 429
return /5\d\d|429/.test(error.message);
}
export function getDelayMs(base: number, attempt: number): number {
const jitter = Math.random() * 100; // 少量抖动
return base * 2 ** attempt + jitter; // 指数退避
}
现在更新工作进程,让它使用 GiftJob 中的 attempts:
// openai/jobs/worker.ts
import { getJob, updateJob } from './queue';
import { emitJobEvent } from './events';
import { shouldRetry, getDelayMs } from './retry';
const MAX_ATTEMPTS = 5;
export async function processJob(jobId: string) {
const job = getJob(jobId);
if (!job) return;
updateJob(jobId, { status: 'in_progress' });
try {
const result = await doDeepGiftAnalysis(job.id, job.payload);
updateJob(jobId, { status: 'completed', result });
await emitJobEvent(jobId, 'job.completed', {
resultSummary: summarize(result),
});
} catch (err) {
const attempts = job.attempts + 1;
const error = err as Error;
if (attempts <= MAX_ATTEMPTS && shouldRetry(error)) {
const delay = getDelayMs(1000, attempts); // 1s,2s,4s...
updateJob(jobId, { attempts, status: 'pending', error: error.message });
setTimeout(() => {
// 在真实队列中,你会用延迟把该 job 重新入队
processJob(jobId);
}, delay);
await emitJobEvent(jobId, 'job.progress', {
retry: attempts,
nextAttemptInMs: delay,
});
} else {
updateJob(jobId, { status: 'failed', error: error.message });
await emitJobEvent(jobId, 'job.failed', {
error: '多次尝试后仍未完成分析',
});
}
}
}
这里有几个关键点。
首先,attempts 存在任务本身中——便于记录与可观测性 (你可以在图表上清楚看到带重试的任务数量)。
其次,每次重试我们都会发送包含尝试次数的 job.progress。 模型可以用这条信息向用户解释:“礼物服务响应不稳定,我会再试一次。”
第三,我们保证最终要么发送 job.completed,要么发送 job.failed。 不会出现“半死不活”的悬挂任务。
取消('canceled')也是一个重要状态。教学示例中我们不实现, 但生产中通常由用户发起(小部件上的“取消”按钮)或由超时触发。 此时工作进程在下一次从队列取任务时会看到 status: 'canceled', 不会启动处理,而 MCP 服务器会发送最终事件 job.canceled。
9. 幂等与重试:别在同一块石头上连摔两次
一旦引入重试,立刻就有“把同一件事情做两遍”的风险。在交易模块里这尤其致命(比如重复扣款), 在 GiftGenius 也有问题:给朋友发了两封一模一样的邮件、在内部分析系统里重复写入等。
因此要遵循两个原则。
第一:job 处理器必须是幂等的。
如果你用同一个 jobId 多次调用它(来自重试或误操作),世界不应崩坏。做到这一点:
- 所有副作用(写数据库、发邮件、创建订单)都要绑定在 jobId 或其他天然标识上, 这样在代码里可以快速判断我们是否已经完成过该步骤;
- 如果 job.status 已是 'completed' 或 'failed', 重复调用要么忽略,要么直接返回已有结果。
简单的保护示例:
export async function processJob(jobId: string) {
const job = getJob(jobId);
if (!job) return;
if (job.status === 'completed' || job.status === 'failed') {
// 任务已经成功或最终失败
return;
}
// ... 其余代码
}
第二:事件也应具备幂等性。
我们已经提到 event_id,客户端可以用它过滤重复事件, 但服务器端也要谨慎:在工作进程重启或从队列恢复时,不要无意义地向客户端刷屏 同样的 job.progress。
10. 队列与工作进程在你的架构中位于何处
图上看起来很优雅,但工作进程物理上跑在哪里?有几种典型选择。
一体化工作进程:MCP 服务器与工作进程是同一进程/同一部署。 它既接收 tool‑call,也启动 worker loop。优点是简单:更少的服务、更容易部署。 缺点是扩展性:为了增加工作进程数量,你不得不同时扩容整个 MCP 服务器。
独立工作进程:MCP 服务器是一个服务,工作进程是另一个。 二者之间是队列,可能还有用于事件的 Pub/Sub。很多 BullMQ/Redis 与 MCP 事件的文章都讲这种模式: MCP 服务器订阅 Redis 渠道 'mcp:events',工作进程向其中发布事件。
组合方案:一个 MCP 服务器实例同时运行工作进程,其它实例只负责 HTTP/SSE。 当你部署到 Vercel 或其他 serverless 平台、而常驻后台进程并不友好时,这会很有用。
在我们的教学版 GiftGenius 中,先采用第一种:MCP 服务器 + 一个简单的进程内工作进程。 等到生产与扩展模块时,再把工作进程迁移到独立服务。
11. 示例:GiftGenius 的完整 async 流水线
连贯地过一遍,当用户在聊天里写道:
“我需要为一位太空迷做复杂的礼物挑选,并结合他的过往购买记录。”
- 模型决定调用工具 start_deep_analysis,并传入收礼者档案与预算参数。
- 工具在数据库中创建状态为 'pending' 的 GiftJob, 把它放入队列,并返回 jobId + 确认消息。
- ChatGPT 向用户说明分析已启动,并可把 jobId 传给 GiftGenius 小部件。
- 小部件通过 SSE 订阅该 jobId 的事件, 显示进度条与“正在收集与分析数据”的状态。
- 工作进程看到队列中新任务,把状态更新为 'in_progress' 并发送 job.started。
- 处理过程中多次发送 job.progress(阶段)以及 job.partial(前 2–3 个礼物)。
- 如果外部 API 中途出错,工作进程会使用指数退避重试,更新 attempts 并发送包含重试信息的事件。
- 最终它要么发送带简要总结与最终推荐的 job.completed, 要么发送带清晰说明的 job.failed。
- 小部件根据事件更新 UI,而 ChatGPT 可以生成文字小结并给出后续建议: “再多看些点子”“缩小预算”“更换礼物类型”。
对用户来说,这是一个“活的”长期过程,并且可控。 对后端而言——就是带队列、工作进程与重试的正常 async 流水线。
12. 小练习(自行实践)
如果想巩固,试着为 GiftGenius:
- 设计真实数据库中的 jobs 表结构: 需要哪些索引、哪些字段会用于筛选(按用户、按状态、按创建时间);
- 起草用于 /api/jobs/:id 的 TypeScript 类型, 以便小部件在 SSE 不可用时能退化为轮询状态;
- 描述重试策略:尝试次数、基准延迟、对始终失败的任务如何处理 (简单的 dead‑letter 表,或日志记录 + 告警)。
这个练习会在后面关于生产与可观测性的模块中派上用场,届时我们会讨论类似 “有多少任务在 pending 状态下卡住超过 N 分钟”这样的指标。
13. 处理异步任务的常见错误
错误 1:把一切都做成同步 tool‑call。
最常见的陷阱是试图把所有重活都塞进一个不带队列的 MCP 工具。 在请求量小的时候似乎还行。一旦负载上来或外部 API 变慢,你就会遇到超时、 聊天卡顿以及非常糟糕的 UX。任何可能持续几十秒以上的操作, 都应从一开始就设计为带 jobId 的 async‑job。
错误 2:没有显式的 Job 模型。
有时开发者尝试“只用队列消息”,而不把任务状态存储在数据库里。结果连最基本的问题都难以回答: “任务状态是什么?”“我们尝试执行了几次?”“为什么失败了?” 清晰的 Job 模型以及 status、attempts、 error、createdAt 等字段,是调试、监控与 UX 的基础。
错误 3:没有重试,或无限重试。
有人根本不重试,第一次 500 就失败;也有人写 while (!success) 不设上限。前者会因为短暂故障丢掉大量任务,后者会制造“负载风暴” 并可能把外部 API 打挂。合理的中间道路是:有限次数 + 指数退避 + 区分临时与永久错误。
错误 4:非幂等处理器。
如果每次尝试你都在外部系统里不加检查地创建新记录、执行同一笔支付或发送同一封邮件—— 重试很快就会变成问题。处理器要能判断这个 jobId 的任务已经成功完成, 从而不再重复危险的副作用。
错误 5:出错时没有事件。
有时工作进程抛出意外异常,只在控制台打日志,然后什么也不做。用户一直等着 job.completed,却不知道一切早已崩溃。任何以错误结束的分支, 最终都必须导致 job.failed 并更新数据库中的 Job 状态。 否则你的 MCP 流就会变成单向的“黑箱”。
错误 6:进度事件过于频繁。
“讲真话”的冲动让你在每增加 1% 时就发一次 job.progress,这会压垮网络、 客户端和 MCP 服务器。最好在阶段变更或较大增量(例如每 10%)时发送, 其余细粒度进度仅写入内部日志。
错误 7:在生产中使用内存队列。
用 queue: string[] 和 Map 的教学示例有助于理解架构, 但在真实生产系统中,一次进程重启或服务器宕机就会把它击碎。 严肃的生产需要外部队列与存储:SQS、Pub/Sub、RabbitMQ、Redis Streams 等。 内存方案只适合本地开发与简单演示。
GO TO FULL VERSION