CodeGym /课程 /ChatGPT Apps /异步任务:队列、工作进程、重试(retry)

异步任务:队列、工作进程、重试(retry)

ChatGPT Apps
第 13 级 , 课程 3
可用

1. 为什么 ChatGPT App 需要异步任务

如果世界足够理想,你的每个 MCP 工具调用都能在几百毫秒内完成。但现实中有趣的事情往往——既耗时又沉重:

  • 解析包含用户购买历史的大型 CSV;
  • 聚合多个外部 API 的数据,而它们时常要么睡着了,要么以 503 回应;
  • 构建包含大量中间步骤的复杂推荐;
  • 生成体量很大的报表与演示文稿。

如果尝试把这些塞进一个同步的 tool‑call,你会遇到三类问题。

首先,超时。ChatGPT 会话、HTTP 基础设施、MCP 客户端——都不是为了“几分钟后再回复”而设计的。长时间保持连接的服务器,在 ChatGPT 和用户看来都会像是“卡住了”。

其次,负载管理。如果有一百位用户同时启动“新年礼物超级分析”,你绝不希望 MCP 服务器在 HTTP 线程里同步持有一百个长任务。你需要一层能够接住流量高峰、将任务排队并由多个工作进程处理的中间层。

第三,UX。用户在 GiftGenius 小部件里点了按钮,然后盯着一个转圈圈看上 40 秒——体验就像早年的网银。更好的模式是“快速响应 + 进度 + 可取消”。

这些问题可以用一个通用方案来解决:“启动 → 队列 → 后台 → 事件”。

2. MCP 语境下的 async‑job 基本架构

以我们的 GiftGenius 为例。假设新增了一个重型场景:“基于购买历史和好友社交数据的深度偏好分析”。这类任务可能需要几分钟,因此:

  1. MCP 工具(tool)接收来自模型的请求参数。
  2. 它不立刻开始计算,而是在数据库中创建 Job 记录。
  3. 把任务放入队列。
  4. 立刻回复 ChatGPT:“分析已启动,这是 jobId”。
  5. 后台工作进程从队列取出任务、执行耗时工作,期间发送 MCP 事件 job.progressjob.partial, 最后发送 job.completedjob.failed

从架构角度看,大致如下:

flowchart LR
    subgraph ChatGPT
      U[用户] --> GPT[模型 + ChatGPT UI]
    end

    GPT -->|call_tool analyze_preferences| MCP[MCP 服务器]

    subgraph Backend
      MCP -->|创建 Job| DB[(任务数据库)]
      MCP -->|enqueue| Q[队列]
      W[工作进程] -->|take job| Q
      W -->|update status/progress| DB
      W -->|MCP events: job.progress/job.completed| MCP
    end

    MCP -->|SSE events| GPT

关键点:MCP 服务器不一定是单体。它常常充当你内部异步基础设施的门面:接收 tool‑call、创建 job、发送事件,而耗时工作由独立的工作进程完成。

3. 异步任务的数据模型

从一个简单的 Job 模型开始。我们使用 TypeScript 和一个示意的 Node/MCP 服务器,便于你直接融入自己的技术栈。

内存/数据库中的最简模型可以是这样:

// openai/jobs/model.ts
export type JobStatus =
  | 'pending'
  | 'in_progress'
  | 'completed'
  | 'failed'
  | 'canceled';

export interface GiftJob {
  id: string;                // jobId
  type: 'deep_gift_analysis';
  status: JobStatus;
  payload: {
    recipientProfile: string;  // 文本/档案 ID
    budget: number;
  };
  result?: unknown;          // 最终推荐
  error?: string;            // 错误原因
  attempts: number;          // 已尝试执行的次数
  createdAt: Date;
  updatedAt: Date;
}

在真实项目里,你会把 GiftJob 存在 Postgres、DynamoDB、Firestore 等处,但这里我们关注的字段是:

  • status —— 任务当前状态,会体现在事件与 UX 中;
  • attempts —— 重试计数;
  • error —— 用于日志与调试;
  • payload —— 工作进程处理时所需的输入数据。

4. 创建 async‑job 的 MCP 工具

来看工具 start_deep_analysis。过去它也许会同步完成全部工作,现在只负责把任务放入队列并返回 jobId

// openai/tools/startDeepAnalysis.ts
import { v4 as uuid } from 'uuid';
import { createJobAndEnqueue } from '../jobs/queue';

// MCP SDK 的伪类型
type StartDeepAnalysisInput = {
  recipientProfile: string;
  budget: number;
};

type StartDeepAnalysisOutput = {
  jobId: string;
  message: string;
};

export async function startDeepAnalysisTool(
  input: StartDeepAnalysisInput
): Promise<StartDeepAnalysisOutput> {
  const jobId = uuid();

  await createJobAndEnqueue({
    id: jobId,
    type: 'deep_gift_analysis',
    status: 'pending',
    payload: {
      recipientProfile: input.recipientProfile,
      budget: input.budget,
    },
    attempts: 0,
    createdAt: new Date(),
    updatedAt: new Date(),
  });

  return {
    jobId,
    message: `已启动深度分析。任务 ID:${jobId}。我会在处理过程中持续推送更新。`,
  };
}

这里要点如下:

  • MCP 工具需要快速返回:最多只做几次对数据库/队列的调用;
  • 它返回一个带有 jobId 的结构化响应,ChatGPT 可据此“向用户解释”, 而 GiftGenius 小部件也可以将其保存到 widgetState 中。

对此工具的 JSON Schema 只需把 jobId 描述为字符串、message 描述为可读文本—— 模型会理解它是任务标识,并能在后续对话步骤中引用它。

5. 简易队列与工作进程:教学版本

先不引入 Redis、RabbitMQ 等,做一个简化的内存队列。生产环境当然会使用专门的服务(SQS/BullMQ/Cloud Tasks 等),但逻辑相同。

先准备队列骨架:

// openai/jobs/queue.ts
import type { GiftJob } from './model';

const jobs = new Map<string, GiftJob>();   // 内存中的 "DB"
export const queue: string[] = [];         // 简化的按 id 的队列

export async function createJobAndEnqueue(job: GiftJob) {
  jobs.set(job.id, job);
  queue.push(job.id);
}

export function getJob(id: string): GiftJob | undefined {
  return jobs.get(id);
}

export function updateJob(id: string, patch: Partial<GiftJob>) {
  const job = jobs.get(id);
  if (!job) return;
  const updated: GiftJob = { ...job, ...patch, updatedAt: new Date() };
  jobs.set(id, updated);
}

接着写一个原始的工作进程,定期查看队列、取出并处理任务:

// openai/jobs/worker.ts
import { getJob, updateJob } from './queue';
import { emitJobEvent } from './events';

async function processJob(jobId: string) {
  const job = getJob(jobId);
  if (!job) return;

  updateJob(jobId, { status: 'in_progress' });
  await emitJobEvent(jobId, 'job.started', {});

  try {
    // 在这里调用耗时的业务逻辑
    const result = await doDeepGiftAnalysis(job.id, job.payload);

    updateJob(jobId, { status: 'completed', result });
    await emitJobEvent(jobId, 'job.completed', { resultSummary: summarize(result) });
  } catch (err) {
    updateJob(jobId, {
      status: 'failed',
      error: (err as Error).message,
    });
    await emitJobEvent(jobId, 'job.failed', { error: 'Internal error' });
  }
}

以及一个“循环”的工作进程,在应用启动时某处运行它:

// openai/jobs/workerLoop.ts
import { queue } from './queue';
import { processJob } from './worker';

export function startWorkerLoop() {
  setInterval(async () => {
    const jobId = queue.shift(); // 严格来说需要并发竞争保护
    if (!jobId) return;

    await processJob(jobId);
  }, 1000); // 每秒检查一次队列
}

这只是教学示例。真实世界中不会用 setInterval,而是让真正的队列在有新消息时“唤醒”工作进程。 但核心思路很清晰:工作进程与 MCP 工具解耦、在后台运行,并通过事件与 MCP 服务器通信。

6. 从工作进程生成 MCP 事件

你在前面的课程中已经见过 MCP 事件的格式:类型(type)、唯一的 event_idtimestampjob_id 以及 payload。 现在我们展示一下,工作进程如何调用 helper emitJobEvent, 而它又如何通过 MCP 服务器的 SSE 通道把事件送达 ChatGPT。

一个简单的 helper 示例:

// openai/jobs/events.ts
import { randomUUID } from 'crypto';
import { sendMcpEvent } from '../mcp/eventBus';

export async function emitJobEvent(
  jobId: string,
  type: 'job.started' | 'job.progress' | 'job.completed' | 'job.failed',
  payload: unknown
) {
  const event = {
    event_id: randomUUID(),
    type,
    job_id: jobId,
    timestamp: new Date().toISOString(),
    payload,
  };

  await sendMcpEvent(event);
}

而 MCP 服务器内部的 sendMcpEvent 已经知道如何把事件推送到 MCP SDK 的 SSEServerTransport:例如通过本地事件总线或 Redis Pub/Sub,正如我们在第 12 模块中讲过的那样。

要点:工作进程不直接与 ChatGPT 通信。它与 MCP 服务器通信,而 MCP 服务器维护 SSE 连接并把事件转发给客户端。

7. 工作进程中的进度与部分结果(partial results)

说点更有趣的:进度与部分结果。在 GiftGenius 中,耗时分析可以拆成多个阶段:

  • 数据收集与规范化;
  • 构建基础分群;
  • 生成初步的礼物创意;
  • 最终重排与文本说明。

在每个阶段,我们都可以发送 job.progress,并在合适的时候发送 job.partial,让 UI 先展示部分礼物。

示意性的工作进程:

async function doDeepGiftAnalysis(jobId: string, payload: GiftJob['payload']) {
  await emitJobEvent(jobId, 'job.progress', { step: 1, totalSteps: 4 });

  const normalized = await collectAndNormalizeData(payload);
  await emitJobEvent(jobId, 'job.progress', { step: 2, totalSteps: 4 });

  const roughGifts = await generateInitialGifts(normalized);
  await emitJobEvent(jobId, 'job.partial', { gifts: roughGifts.slice(0, 3) });

  await emitJobEvent(jobId, 'job.progress', { step: 3, totalSteps: 4 });

  const finalGifts = await rerankAndBeautify(roughGifts);
  await emitJobEvent(jobId, 'job.progress', { step: 4, totalSteps: 4 });

  return finalGifts;
}

小部件在接收事件时,可以先展示 3 个“草案”礼物并标注“仍在细化”, 在收到 job.completed 后再更新列表并移除加载指示器。 这完全契合我们在第 3 讲中讨论过的 UX 模式。

8. 工作进程的重试(retry)逻辑

接下来是最让人紧张的部分:错误与重试。

设想工作进程在处理任务时调用了外部商品列表 API,而它时不时返回 500429。 第一次出错就放弃——并不合理。但无限重试也不可取:你会对自己或第三方服务发起 DDoS。

我们需要带指数退避(exponential backoff)且有尝试次数上限的重试策略。

先做一个错误分类,这在后续课程也会用到:

  • 临时性(transient)—— 超时、500503429
  • 永久性(permanent)—— 错误输入、资源不存在;
  • 致命(bug)—— 代码缺陷、TypeError、意外异常。

只有临时性错误才值得重试。其余应如实标记为 'failed'

我们先简化,做两个 helper:

// openai/jobs/retry.ts
export function shouldRetry(error: unknown): boolean {
  if (!(error instanceof Error)) return false;
  // 约定:HTTP 5xx 或 429
  return /5\d\d|429/.test(error.message);
}

export function getDelayMs(base: number, attempt: number): number {
  const jitter = Math.random() * 100;   // 少量抖动
  return base * 2 ** attempt + jitter; // 指数退避
}

现在更新工作进程,让它使用 GiftJob 中的 attempts

// openai/jobs/worker.ts
import { getJob, updateJob } from './queue';
import { emitJobEvent } from './events';
import { shouldRetry, getDelayMs } from './retry';

const MAX_ATTEMPTS = 5;

export async function processJob(jobId: string) {
  const job = getJob(jobId);
  if (!job) return;

  updateJob(jobId, { status: 'in_progress' });

  try {
    const result = await doDeepGiftAnalysis(job.id, job.payload);

    updateJob(jobId, { status: 'completed', result });
    await emitJobEvent(jobId, 'job.completed', {
      resultSummary: summarize(result),
    });
  } catch (err) {
    const attempts = job.attempts + 1;
    const error = err as Error;

    if (attempts <= MAX_ATTEMPTS && shouldRetry(error)) {
      const delay = getDelayMs(1000, attempts); // 1s,2s,4s...

      updateJob(jobId, { attempts, status: 'pending', error: error.message });

      setTimeout(() => {
        // 在真实队列中,你会用延迟把该 job 重新入队
        processJob(jobId);
      }, delay);

      await emitJobEvent(jobId, 'job.progress', {
        retry: attempts,
        nextAttemptInMs: delay,
      });
    } else {
      updateJob(jobId, { status: 'failed', error: error.message });
      await emitJobEvent(jobId, 'job.failed', {
        error: '多次尝试后仍未完成分析',
      });
    }
  }
}

这里有几个关键点。

首先,attempts 存在任务本身中——便于记录与可观测性 (你可以在图表上清楚看到带重试的任务数量)。

其次,每次重试我们都会发送包含尝试次数的 job.progress。 模型可以用这条信息向用户解释:“礼物服务响应不稳定,我会再试一次。”

第三,我们保证最终要么发送 job.completed,要么发送 job.failed。 不会出现“半死不活”的悬挂任务。

取消('canceled')也是一个重要状态。教学示例中我们不实现, 但生产中通常由用户发起(小部件上的“取消”按钮)或由超时触发。 此时工作进程在下一次从队列取任务时会看到 status: 'canceled', 不会启动处理,而 MCP 服务器会发送最终事件 job.canceled

9. 幂等与重试:别在同一块石头上连摔两次

一旦引入重试,立刻就有“把同一件事情做两遍”的风险。在交易模块里这尤其致命(比如重复扣款), 在 GiftGenius 也有问题:给朋友发了两封一模一样的邮件、在内部分析系统里重复写入等。

因此要遵循两个原则。

第一:job 处理器必须是幂等的。

如果你用同一个 jobId 多次调用它(来自重试或误操作),世界不应崩坏。做到这一点:

  • 所有副作用(写数据库、发邮件、创建订单)都要绑定在 jobId 或其他天然标识上, 这样在代码里可以快速判断我们是否已经完成过该步骤;
  • 如果 job.status 已是 'completed''failed', 重复调用要么忽略,要么直接返回已有结果。

简单的保护示例:

export async function processJob(jobId: string) {
  const job = getJob(jobId);
  if (!job) return;

  if (job.status === 'completed' || job.status === 'failed') {
    // 任务已经成功或最终失败
    return;
  }

  // ... 其余代码
}

第二:事件也应具备幂等性。

我们已经提到 event_id,客户端可以用它过滤重复事件, 但服务器端也要谨慎:在工作进程重启或从队列恢复时,不要无意义地向客户端刷屏 同样的 job.progress

10. 队列与工作进程在你的架构中位于何处

图上看起来很优雅,但工作进程物理上跑在哪里?有几种典型选择。

一体化工作进程:MCP 服务器与工作进程是同一进程/同一部署。 它既接收 tool‑call,也启动 worker loop。优点是简单:更少的服务、更容易部署。 缺点是扩展性:为了增加工作进程数量,你不得不同时扩容整个 MCP 服务器。

独立工作进程:MCP 服务器是一个服务,工作进程是另一个。 二者之间是队列,可能还有用于事件的 Pub/Sub。很多 BullMQ/Redis 与 MCP 事件的文章都讲这种模式: MCP 服务器订阅 Redis 渠道 'mcp:events',工作进程向其中发布事件。

组合方案:一个 MCP 服务器实例同时运行工作进程,其它实例只负责 HTTP/SSE。 当你部署到 Vercel 或其他 serverless 平台、而常驻后台进程并不友好时,这会很有用。

在我们的教学版 GiftGenius 中,先采用第一种:MCP 服务器 + 一个简单的进程内工作进程。 等到生产与扩展模块时,再把工作进程迁移到独立服务。

11. 示例:GiftGenius 的完整 async 流水线

连贯地过一遍,当用户在聊天里写道:

“我需要为一位太空迷做复杂的礼物挑选,并结合他的过往购买记录。”

  1. 模型决定调用工具 start_deep_analysis,并传入收礼者档案与预算参数。
  2. 工具在数据库中创建状态为 'pending'GiftJob, 把它放入队列,并返回 jobId + 确认消息。
  3. ChatGPT 向用户说明分析已启动,并可把 jobId 传给 GiftGenius 小部件。
  4. 小部件通过 SSE 订阅该 jobId 的事件, 显示进度条与“正在收集与分析数据”的状态。
  5. 工作进程看到队列中新任务,把状态更新为 'in_progress' 并发送 job.started
  6. 处理过程中多次发送 job.progress(阶段)以及 job.partial(前 23 个礼物)。
  7. 如果外部 API 中途出错,工作进程会使用指数退避重试,更新 attempts 并发送包含重试信息的事件。
  8. 最终它要么发送带简要总结与最终推荐的 job.completed, 要么发送带清晰说明的 job.failed
  9. 小部件根据事件更新 UI,而 ChatGPT 可以生成文字小结并给出后续建议: “再多看些点子”“缩小预算”“更换礼物类型”。

对用户来说,这是一个“活的”长期过程,并且可控。 对后端而言——就是带队列、工作进程与重试的正常 async 流水线。

12. 小练习(自行实践)

如果想巩固,试着为 GiftGenius:

  • 设计真实数据库中的 jobs 表结构: 需要哪些索引、哪些字段会用于筛选(按用户、按状态、按创建时间);
  • 起草用于 /api/jobs/:id 的 TypeScript 类型, 以便小部件在 SSE 不可用时能退化为轮询状态;
  • 描述重试策略:尝试次数、基准延迟、对始终失败的任务如何处理 (简单的 dead‑letter 表,或日志记录 + 告警)。

这个练习会在后面关于生产与可观测性的模块中派上用场,届时我们会讨论类似 “有多少任务在 pending 状态下卡住超过 N 分钟”这样的指标。

13. 处理异步任务的常见错误

错误 1:把一切都做成同步 tool‑call。
最常见的陷阱是试图把所有重活都塞进一个不带队列的 MCP 工具。 在请求量小的时候似乎还行。一旦负载上来或外部 API 变慢,你就会遇到超时、 聊天卡顿以及非常糟糕的 UX。任何可能持续几十秒以上的操作, 都应从一开始就设计为带 jobId 的 async‑job。

错误 2:没有显式的 Job 模型。
有时开发者尝试“只用队列消息”,而不把任务状态存储在数据库里。结果连最基本的问题都难以回答: “任务状态是什么?”“我们尝试执行了几次?”“为什么失败了?” 清晰的 Job 模型以及 statusattemptserrorcreatedAt 等字段,是调试、监控与 UX 的基础。

错误 3:没有重试,或无限重试。
有人根本不重试,第一次 500 就失败;也有人写 while (!success) 不设上限。前者会因为短暂故障丢掉大量任务,后者会制造“负载风暴” 并可能把外部 API 打挂。合理的中间道路是:有限次数 + 指数退避 + 区分临时与永久错误。

错误 4:非幂等处理器。
如果每次尝试你都在外部系统里不加检查地创建新记录、执行同一笔支付或发送同一封邮件—— 重试很快就会变成问题。处理器要能判断这个 jobId 的任务已经成功完成, 从而不再重复危险的副作用。

错误 5:出错时没有事件。
有时工作进程抛出意外异常,只在控制台打日志,然后什么也不做。用户一直等着 job.completed,却不知道一切早已崩溃。任何以错误结束的分支, 最终都必须导致 job.failed 并更新数据库中的 Job 状态。 否则你的 MCP 流就会变成单向的“黑箱”。

错误 6:进度事件过于频繁。
“讲真话”的冲动让你在每增加 1% 时就发一次 job.progress,这会压垮网络、 客户端和 MCP 服务器。最好在阶段变更或较大增量(例如每 10%)时发送, 其余细粒度进度仅写入内部日志。

错误 7:在生产中使用内存队列。
queue: string[]Map 的教学示例有助于理解架构, 但在真实生产系统中,一次进程重启或服务器宕机就会把它击碎。 严肃的生产需要外部队列与存储:SQS、Pub/Sub、RabbitMQ、Redis Streams 等。 内存方案只适合本地开发与简单演示。

评论
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION