CodeGym /Corsi /ChatGPT Apps /Attività asincrone: code, worker, ritentativi (retry)

Attività asincrone: code, worker, ritentativi (retry)

ChatGPT Apps
Livello 13 , Lezione 3
Disponibile

1. Perché servono le attività asincrone in ChatGPT App

Se il mondo fosse ideale, qualsiasi tuo MCP tool finirebbe in poche centinaia di millisecondi. Ma nella realtà tutto ciò che è interessante è lungo e pesante:

  • analisi di un grande CSV con la cronologia degli acquisti dell’utente;
  • aggregazione di dati da più API esterni, ciascuno dei quali a volte dorme, a volte risponde con 503;
  • costruzione di raccomandazioni complesse con molti passaggi intermedi;
  • generazione di report e presentazioni di grandi dimensioni.

Se cerchi di infilare tutto in una sola chiamata sincrona (tool‑call), ti scontri con tre problemi.

Primo, i timeout. La sessione di ChatGPT, l’infrastruttura HTTP, il client MCP — tutto questo non è pensato per risposte «tra cinque minuti». Un server che tiene la connessione troppo a lungo sembrerà «bloccato» sia per ChatGPT sia per l’utente.

Secondo, la gestione del carico. Se cento utenti avviano contemporaneamente la «super‑analisi dei regali per Capodanno», non vuoi che il server MCP mantenga sincronicamente cento job lunghi direttamente nei thread HTTP. Ti serve uno strato in grado di assorbire i picchi, mettere in coda i job e processarli con più worker.

Terzo, UX. L’utente preme un pulsante nel widget GiftGenius e fissa un unico spinner per 40 secondi — una sensazione da vecchi internet banking. È molto meglio il modello «risposta rapida + avanzamento + possibilità di annullare».

Questi problemi si risolvono con lo schema generale: «avvio → coda → background → eventi».

2. Architettura di base degli async job nel contesto MCP

Prendiamo il nostro GiftGenius. Supponiamo sia comparso un nuovo scenario pesante: «Analisi approfondita delle preferenze sulla base della cronologia acquisti e dei social dell’amico». Una cosa del genere può durare diversi minuti, quindi:

  1. Lo strumento MCP (tool) riceve i parametri della richiesta dal modello.
  2. Invece di calcolare tutto subito, crea un record Job nel database.
  3. Mette il job in coda.
  4. Risponde immediatamente a ChatGPT: «Analisi avviata, ecco il jobId».
  5. Un worker in background prende il job dalla coda, esegue il lavoro pesante e, nel frattempo, invia eventi MCP job.progress e job.partial, e alla fine — job.completed o job.failed.

Dal punto di vista architetturale appare così:

flowchart LR
    subgraph ChatGPT
      U[Utente] --> GPT[Modello + ChatGPT UI]
    end

    GPT -->|call_tool analyze_preferences| MCP[Server MCP]

    subgraph Backend
      MCP -->|crea Job| DB[(DB dei job)]
      MCP -->|enqueue| Q[Coda]
      W[Worker] -->|take job| Q
      W -->|update status/progress| DB
      W -->|Eventi MCP: job.progress/job.completed| MCP
    end

    MCP -->|Eventi SSE| GPT

Punto chiave: il server MCP non è necessariamente un monolite. Spesso funge da facciata sulla tua infrastruttura asincrona interna: accetta le tool‑call, crea i job e invia eventi, mentre il lavoro pesante è svolto da processi worker separati.

3. Modello dati per un job asincrono

Partiamo da un semplice modello Job. Useremo TypeScript e un ipotetico server Node/MCP, così vedi subito come si inserisce nello stack.

Un modello semplicissimo in memoria/DB può essere così:

// openai/jobs/model.ts
export type JobStatus =
  | 'pending'
  | 'in_progress'
  | 'completed'
  | 'failed'
  | 'canceled';

export interface GiftJob {
  id: string;                // jobId
  type: 'deep_gift_analysis';
  status: JobStatus;
  payload: {
    recipientProfile: string;  // testo/ID del profilo
    budget: number;
  };
  result?: unknown;          // raccomandazioni finali
  error?: string;            // causa dell'errore
  attempts: number;          // quante volte si è provato a eseguire
  createdAt: Date;
  updatedAt: Date;
}

In un progetto reale conserverai GiftJob in Postgres, DynamoDB, Firestore o altrove, ma per la lezione ci interessano i campi:

  • status — stato corrente del job, che si riflette sia negli eventi sia nell’UX;
  • attempts — contatore per il retry;
  • error — per log e debug;
  • payload — i dati in ingresso che il worker usa per l’elaborazione.

4. Strumento MCP che crea un async job

Immaginiamo lo strumento start_deep_analysis. Prima avrebbe fatto tutto in modo sincrono, ora invece mette solo il job in coda e restituisce il jobId.

// openai/tools/startDeepAnalysis.ts
import { v4 as uuid } from 'uuid';
import { createJobAndEnqueue } from '../jobs/queue';

// Pseudo‑tipi per l’SDK MCP
type StartDeepAnalysisInput = {
  recipientProfile: string;
  budget: number;
};

type StartDeepAnalysisOutput = {
  jobId: string;
  message: string;
};

export async function startDeepAnalysisTool(
  input: StartDeepAnalysisInput
): Promise<StartDeepAnalysisOutput> {
  const jobId = uuid();

  await createJobAndEnqueue({
    id: jobId,
    type: 'deep_gift_analysis',
    status: 'pending',
    payload: {
      recipientProfile: input.recipientProfile,
      budget: input.budget,
    },
    attempts: 0,
    createdAt: new Date(),
    updatedAt: new Date(),
  });

  return {
    jobId,
    message: `Ho avviato l’analisi approfondita. ID del job: ${jobId}. Invierò aggiornamenti man mano che saranno disponibili.`,
  };
}

Qui è importante che:

  • lo strumento MCP lavori rapidamente: al massimo un paio di richieste a DB/coda;
  • restituisca una risposta strutturata con jobId, che ChatGPT può usare nella propria «spiegazione all’utente» e che il widget GiftGenius può salvare nel proprio widgetState.

La tua JSON Schema per questo strumento descrive semplicemente jobId come stringa e message come testo leggibile — il modello capirà che è l’identificatore del job e potrà riferirsi ad esso nei passi successivi del dialogo.

5. Coda semplice e worker: versione didattica

Per non trascinarci ora Redis, RabbitMQ e tutto il resto, creiamo una coda in‑memory semplificata. In produzione, ovviamente, sarà un servizio a parte (SQS/BullMQ/Cloud Tasks ecc.), ma la logica resterà la stessa.

Prima la bozza della coda:

// openai/jobs/queue.ts
import type { GiftJob } from './model';

const jobs = new Map<string, GiftJob>();   // "DB" in memoria
export const queue: string[] = [];         // coda semplificata per id

export async function createJobAndEnqueue(job: GiftJob) {
  jobs.set(job.id, job);
  queue.push(job.id);
}

export function getJob(id: string): GiftJob | undefined {
  return jobs.get(id);
}

export function updateJob(id: string, patch: Partial<GiftJob>) {
  const job = jobs.get(id);
  if (!job) return;
  const updated: GiftJob = { ...job, ...patch, updatedAt: new Date() };
  jobs.set(id, updated);
}

Ora un worker primitivo che periodicamente guarda la coda, prende un job e lo elabora:

// openai/jobs/worker.ts
import { getJob, updateJob } from './queue';
import { emitJobEvent } from './events';

async function processJob(jobId: string) {
  const job = getJob(jobId);
  if (!job) return;

  updateJob(jobId, { status: 'in_progress' });
  await emitJobEvent(jobId, 'job.started', {});

  try {
    // Qui invochiamo la logica di business lunga
    const result = await doDeepGiftAnalysis(job.id, job.payload);

    updateJob(jobId, { status: 'completed', result });
    await emitJobEvent(jobId, 'job.completed', { resultSummary: summarize(result) });
  } catch (err) {
    updateJob(jobId, {
      status: 'failed',
      error: (err as Error).message,
    });
    await emitJobEvent(jobId, 'job.failed', { error: 'Internal error' });
  }
}

E il worker «ciclico» vero e proprio, che puoi avviare all’avvio dell’applicazione:

// openai/jobs/workerLoop.ts
import { queue } from './queue';
import { processJob } from './worker';

export function startWorkerLoop() {
  setInterval(async () => {
    const jobId = queue.shift(); // in teoria serve protezione dalle race condition
    if (!jobId) return;

    await processJob(jobId);
  }, 1000); // controlliamo la coda una volta al secondo
}

Questo è un esempio didattico. Nel mondo reale al posto di setInterval userai una coda vera, che «sveglia» il worker quando arriva un nuovo messaggio. Ma l’idea generale è chiara: il worker è separato dallo strumento MCP, lavora in background e parla con il server MCP via eventi.

6. Generazione di eventi MCP dal worker

Nelle lezioni precedenti hai già visto il formato degli eventi MCP: tipo (type), event_id univoco, timestamp, job_id e payload. Ora mostriamo come il worker possa chiamare l’helper emitJobEvent, che poi recapita gli eventi a ChatGPT tramite il canale SSE del server MCP.

Esempio di helper semplice:

// openai/jobs/events.ts
import { randomUUID } from 'crypto';
import { sendMcpEvent } from '../mcp/eventBus';

export async function emitJobEvent(
  jobId: string,
  type: 'job.started' | 'job.progress' | 'job.completed' | 'job.failed',
  payload: unknown
) {
  const event = {
    event_id: randomUUID(),
    type,
    job_id: jobId,
    timestamp: new Date().toISOString(),
    payload,
  };

  await sendMcpEvent(event);
}

E sendMcpEvent all’interno del server MCP sa già come inoltrare l’evento a SSEServerTransport dell’SDK MCP: per esempio tramite un bus eventi locale o Redis Pub/Sub, come abbiamo visto nel modulo 12.

Idea chiave: il worker non parla direttamente con ChatGPT. Parla con il server MCP, che mantiene le connessioni SSE e inoltra gli eventi ai client.

7. Progress e risultati parziali dal worker

Arriviamo alla parte più interessante: avanzamento e risultati parziali. In GiftGenius l’analisi lunga si può dividere in fasi:

  • raccolta e normalizzazione dei dati;
  • costruzione dei segmenti di base;
  • generazione delle prime idee regalo;
  • ranking finale e spiegazione testuale.

A ogni fase possiamo inviare job.progress e, a volte, job.partial, così la UI mostra già i primi regali.

Worker ipotetico:

async function doDeepGiftAnalysis(jobId: string, payload: GiftJob['payload']) {
  await emitJobEvent(jobId, 'job.progress', { step: 1, totalSteps: 4 });

  const normalized = await collectAndNormalizeData(payload);
  await emitJobEvent(jobId, 'job.progress', { step: 2, totalSteps: 4 });

  const roughGifts = await generateInitialGifts(normalized);
  await emitJobEvent(jobId, 'job.partial', { gifts: roughGifts.slice(0, 3) });

  await emitJobEvent(jobId, 'job.progress', { step: 3, totalSteps: 4 });

  const finalGifts = await rerankAndBeautify(roughGifts);
  await emitJobEvent(jobId, 'job.progress', { step: 4, totalSteps: 4 });

  return finalGifts;
}

Il widget, ascoltando gli eventi, può prima mostrare 3 regali «bozza» con l’etichetta «Stiamo ancora affinando», e dopo job.completed — aggiornare l’elenco e togliere l’indicatore di caricamento. Tutto ciò si inserisce perfettamente nei pattern di UX di cui abbiamo parlato nella lezione 3.

8. Logica di retry per i worker

Ora la parte più delicata: errori e ripetizioni.

Immagina che il worker, durante l’elaborazione del job, chiami un’API esterna del catalogo prodotti che a volte risponde 500 o 429. Buttare via il job al primo errore è strano. Ma nemmeno rilanciare all’infinito va bene: rischi un DDoS a te stesso o a un servizio terzo.

Ci serve una strategia di retry con ritardo esponenziale e limite al numero di tentativi.

Partiamo da una classificazione degli errori, utile anche più avanti nel corso:

  • temporanei (transient) — timeout, 500, 503, 429;
  • permanenti (permanent) — input non valido, risorsa inesistente;
  • fatali (bug) — bug del codice, TypeError, eccezione inattesa.

Ha senso ripetere solo gli errori temporanei. Gli altri vanno onestamente contrassegnati come 'failed'.

Semplifichiamo e facciamo un helper:

// openai/jobs/retry.ts
export function shouldRetry(error: unknown): boolean {
  if (!(error instanceof Error)) return false;
  // Indicativamente: HTTP 5xx o 429
  return /5\d\d|429/.test(error.message);
}

export function getDelayMs(base: number, attempt: number): number {
  const jitter = Math.random() * 100;   // piccolo jitter
  return base * 2 ** attempt + jitter; // backoff esponenziale
}

Ora aggiorniamo il worker in modo che tenga conto di attempts in GiftJob:

// openai/jobs/worker.ts
import { getJob, updateJob } from './queue';
import { emitJobEvent } from './events';
import { shouldRetry, getDelayMs } from './retry';

const MAX_ATTEMPTS = 5;

export async function processJob(jobId: string) {
  const job = getJob(jobId);
  if (!job) return;

  updateJob(jobId, { status: 'in_progress' });

  try {
    const result = await doDeepGiftAnalysis(job.id, job.payload);

    updateJob(jobId, { status: 'completed', result });
    await emitJobEvent(jobId, 'job.completed', {
      resultSummary: summarize(result),
    });
  } catch (err) {
    const attempts = job.attempts + 1;
    const error = err as Error;

    if (attempts <= MAX_ATTEMPTS && shouldRetry(error)) {
      const delay = getDelayMs(1000, attempts); // 1s,2s,4s...

      updateJob(jobId, { attempts, status: 'pending', error: error.message });

      setTimeout(() => {
        // In una coda reale faresti il re‑enqueue del job con ritardo
        processJob(jobId);
      }, delay);

      await emitJobEvent(jobId, 'job.progress', {
        retry: attempts,
        nextAttemptInMs: delay,
      });
    } else {
      updateJob(jobId, { status: 'failed', error: error.message });
      await emitJobEvent(jobId, 'job.failed', {
        error: 'Impossibile completare l’analisi dopo diversi tentativi',
      });
    }
  }
}

Qui sono importanti alcuni aspetti.

Primo, attempts è conservato nel job stesso — è comodo sia per il logging sia per l’osservabilità (sul grafico si vede chiaramente quante attività passano con retry).

Secondo, a ogni retry inviamo job.progress con indicazione esplicita che si tratta del tentativo n. N. Il modello può usare questa informazione per spiegare all’utente che «il server dei regali risponde in modo instabile, sto riprovando».

Terzo, garantiamo che in ogni caso si invii o job.completed oppure job.failed. Nessun job appeso «né vivo né morto».

L’annullamento ('canceled') è un altro stato importante. Negli esempi didattici non lo implementiamo, ma in produzione di solito viene impostato su iniziativa dell’utente (pulsante «Annulla» nel widget) o per timeout. In tal caso, il worker al successivo prelievo dalla coda vede status: 'canceled', non avvia l’elaborazione, e il server MCP invia l’evento finale job.canceled.

9. Idempotenza e retry: evitare di ripetere due volte le stesse azioni

Quando introduci il retry, nasce subito il rischio di «fare la stessa cosa due volte». Nei moduli commerce è critico (per esempio doppio addebito), ma anche in GiftGenius ci sono scenari in cui il doppio invio è un problema: invio di due email identiche all’amico, duplicazione di una riga nella tua analitica interna ecc.

Punta quindi a due principi.

Primo: l’handler del job deve essere idempotente.

Se lo chiami con lo stesso jobId più volte (nell’ambito del retry o per errore), il mondo non deve rompersi. Per riuscirci:

  • tutti gli effetti collaterali (scrittura nel DB, invio email, creazione ordini) devono basarsi su jobId o un altro identificatore naturale, in modo da poter verificare velocemente se quel passo sia già stato eseguito;
  • se job.status è già 'completed' o 'failed', la chiamata ripetuta si può ignorare o semplicemente restituire il risultato già pronto.

Esempio di protezione semplice:

export async function processJob(jobId: string) {
  const job = getJob(jobId);
  if (!job) return;

  if (job.status === 'completed' || job.status === 'failed') {
    // Il job è già terminato con successo o definitivamente
    return;
  }

  // ... resto del codice
}

Secondo: anche gli eventi devono essere idempotenti.

Abbiamo già parlato di event_id e del fatto che il client può filtrare i duplicati, ma lato server conviene comunque fare attenzione: al riavvio del worker o al ripristino dalla coda non spammare il client con gli stessi job.progress senza necessità.

10. Dove si trovano code e worker nella tua architettura

Nel diagramma tutto è bello, ma dove gira fisicamente il worker? Ci sono alcune opzioni tipiche.

Worker integrato: server MCP e worker sono lo stesso processo/deploy. Riceve le tool‑call e avvia anche il worker loop. Pro: semplicità, meno servizi, deploy più facile. Contro: scalabilità — per aggiungere worker devi scalare l’intero server MCP.

Worker dedicato: il server MCP è un servizio, i worker un altro. Tra i due — una coda e, possibilmente, un Pub/Sub per gli eventi. È ciò di cui si parla spesso nel contesto BullMQ/Redis ed eventi MCP: il server MCP è sottoscritto al canale Redis 'mcp:events', i worker pubblicano lì gli eventi.

Variante combinata: un’istanza del server MCP esegue anche un worker, le altre istanze — solo HTTP/SSE. Può essere utile se fai deploy su Vercel o un’altra piattaforma serverless, dove i processi in background persistenti non sono il massimo.

Nel nostro GiftGenius didattico va bene la prima opzione: server MCP + un worker semplice nello stesso processo. Quando arriverai ai moduli su produzione e scalabilità, potrai migrare i worker in un servizio separato.

11. Esempio: pipeline async completa di GiftGenius

Ricapitoliamo in modo coerente cosa succede quando l’utente scrive in chat:

«Mi serve una selezione complessa di regali per un fan dello spazio, tenendo conto dei suoi acquisti passati».

  1. Il modello decide di chiamare lo strumento start_deep_analysis con i parametri del profilo del destinatario e del budget.
  2. Lo strumento crea un GiftJob nel DB con stato 'pending', lo mette in coda e restituisce jobId + un messaggio di conferma.
  3. ChatGPT spiega all’utente che l’analisi è stata avviata e può passare il jobId al widget GiftGenius.
  4. Il widget si sottoscrive agli eventi per quel jobId tramite SSE, mostra una barra di avanzamento e lo stato «Raccogliamo e analizziamo i dati».
  5. Il worker, vedendo un nuovo job in coda, aggiorna lo stato a 'in_progress' e invia job.started.
  6. Durante il lavoro invia più volte job.progress (fasi) e job.partial (i primi 23 regali).
  7. Se lungo il percorso un’API esterna cade, il worker riprova con backoff esponenziale, aggiornando attempts e inviando un evento con le informazioni sul nuovo tentativo.
  8. Alla fine invia job.completed con un breve sommario e le raccomandazioni finali, oppure job.failed con una spiegazione chiara.
  9. Il widget, in base a questi eventi, aggiorna la UI e ChatGPT può creare un riepilogo testuale e proporre follow‑up: «Mostrare altre idee», «Restringere il budget», «Cambiare tipo di regalo».

Dal punto di vista dell’utente è un processo lungo ma «vivo» e sotto controllo. Dal punto di vista del backend — una normale pipeline asincrona con coda, worker e retry.

12. Un piccolo esercizio (per pratica autonoma)

Se vuoi fissare il materiale, prova per GiftGenius a:

  • ideare lo schema della tabella jobs per un DB reale: quali indici ti servono, quali campi parteciperanno al filtraggio (per utente, per stato, per data di creazione);
  • abbozzare il tipo TypeScript per l’endpoint HTTP /api/jobs/:id, così che il widget, in casi estremi, possa fare polling dello stato se SSE non è disponibile;
  • descrivere la policy di retry: quanti tentativi, ritardo base, cosa fare con i job che continuano a fallire (semplice tabella dead‑letter o logging + alert).

Questo esercizio tornerà utile più avanti, quando nei moduli su produzione e osservabilità parleremo di metriche tipo «quanti job sono rimasti nello stato pending per più di N minuti».

13. Errori tipici nel lavoro con job asincroni

Errore n. 1: fare tutto in modo sincrono nella tool‑call.
La trappola più comune — cercare di infilare tutto il lavoro pesante in un unico strumento MCP senza coda. Finché le richieste sono poche, sembra funzionare. Appena il carico cresce o le API esterne rallentano, prendi timeout, la chat si blocca e l’UX diventa nervosa. Qualsiasi operazione che potenzialmente può durare decine di secondi o più meglio progettarla come async job con jobId.

Errore n. 2: assenza di un modello Job esplicito.
A volte gli sviluppatori provano a cavarsela con «solo messaggi in coda», senza conservare lo stato dei job nel DB. Diventa difficile rispondere a domande basilari: «qual è lo stato del job?», «quante volte abbiamo provato a eseguirlo?», «perché è fallito?». Un modello Job chiaro con campi status, attempts, error, createdAt — è la base per debug, monitoraggio e UX.

Errore n. 3: assenza di retry oppure retry infiniti.
C’è chi non fa per nulla retry e cade al primo 500, c’è chi fa while (!success) e non limita il numero di tentativi. Nel primo caso perdi un mucchio di job per guasti temporanei, nel secondo — crei «tempeste» di carico e rischi di bloccare le API esterne. Serve una via di mezzo sensata: numero di tentativi limitato + ritardo esponenziale + distinzione tra errori temporanei e permanenti.

Errore n. 4: handler non idempotenti.
Se a ogni tentativo, ad esempio, crei una nuova riga in un sistema esterno senza controllo, esegui lo stesso pagamento o invii la stessa email — il retry diventa presto un problema. L’handler deve saper capire che un job con quel jobId è già stato completato con successo e non ripetere effetti collaterali pericolosi.

Errore n. 5: assenza di eventi in caso di errori.
Capita che il worker cada con un’eccezione inattesa, la logghi in console e basta. L’utente resta ad aspettare per sempre job.completed, senza sapere che tutto è morto da tempo. Qualsiasi ramo in cui l’elaborazione termina con errore deve alla fine portare a job.failed e all’aggiornamento dello stato Job nel DB. Senza questo, i tuoi flussi MCP si trasformano in una «scatola nera» unidirezionale.

Errore n. 6: eventi di avanzamento troppo frequenti.
La voglia di «essere onesti» e inviare job.progress a ogni singolo punto percentuale porta a sovraccaricare la rete, il client e il server MCP. Meglio inviare l’avanzamento al cambio di fase o su grandi delta (ad esempio ogni 10 %), e il resto tenerlo solo nei log interni.

Errore n. 7: uso di una coda in‑memory in produzione.
L’esempio didattico con queue: string[] e Map — è utile per capire l’architettura, ma in un sistema di produzione si romperà al primo riavvio del processo o crash del server. Per un esercizio serio servono code e storage esterni: SQS, Pub/Sub, RabbitMQ, Redis Streams ecc. Le varianti in‑memory vanno bene solo per sviluppo locale e demo semplici.

Commenti
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION