CodeGym /Kursy /ChatGPT Apps /Niezawodność strumieni: rate limits, backpressure i monit...

Niezawodność strumieni: rate limits, backpressure i monitoring zdarzeń

ChatGPT Apps
Poziom 13 , Lekcja 4
Dostępny

1. Dlaczego strumienie są szczególnie wrażliwe na obciążenie

W poprzednich częściach omówiliśmy już, jak działają zdarzenia MCP, statusy job.progress/job.completed, async‑joby i kanały strumieniowe (SSE/HTTP-stream) dla GiftGenius. Teraz ważne jest, aby zobaczyć, co dzieje się z tą architekturą pod realnym obciążeniem.

Dopóki macie jednego użytkownika, który od czasu do czasu uruchamia dobór prezentu, wszystko wygląda świetnie. Ale gdy tylko GiftGenius trafia na produkcję i jednocześnie spływają setki żądań o „dobranie prezentów wszystkim pracownikom na firmowy event”, nagle odkrywacie, że:

  • na serwerze są setki długotrwałych połączeń SSE;
  • workery radośnie wysyłają job.progress przy byle okazji;
  • logi rosną o gigabajty dziennie;
  • UI u użytkownika zaczyna się przycinać, chociaż „serwer niby nie pada”.

Klasyczne żądanie HTTP żyje milisekundy lub sekundy. Strumień SSE lub HTTP‑stream może żyć minuty, a nawet godziny. Trzyma połączenie, pamięć, deskryptory plików. Każde wysłane zdarzenie — to serializacja JSON, kopiowanie po sieci, praca GC. Jeśli patrzeć na to jak na „to tylko jeszcze jeden console.log na backendzie”, system bardzo szybko zamieni się w grzejnik.

Zdarzenia MCP mają jeszcze jedną cechę: często są generowane wielokrotnie dla tego samego zadania. Worker, który aktualizuje progres co 0.1 %, daje imponującą liczbę zdarzeń na jedno zadanie. W efekcie otrzymujecie „szum”: ogromną liczbę drobnych komunikatów, które:

  • obciążają sieć i CPU;
  • zapychają kolejki i bufory;
  • czynią debugowanie i analizę logów bolesnymi.

Dlatego zarówno do strumieni, jak i do zdarzeń MCP należy podchodzić tak samo poważnie jak do zapytań do bazy czy wywołań modeli: to zasób kosztowny, który wymaga normowania, kontroli i monitoringu.

Aby sobie z tym poradzić, miejmy w głowie trzy duże tematy:

  1. Rate‑limits — ograniczamy, ile i jak często możemy sobie pozwolić generować i wysyłać zdarzenia/strumienie.
  2. Backpressure — reagujemy na sytuację, gdy konsument nie nadąża za producentem.
  3. Monitoring i metryki — mierzymy, co się dzieje, i na czas dostrzegamy, że wszystko zaczęło się gotować.

2. Rate limiting strumieni i zdarzeń

Zacznijmy od najbardziej oczywistego — limitów.

Ważne jest zrozumienie, że w scenariuszach strumieniowych rolę „groźnego naruszyciela” często gra nie klient, a serwer. W zwykłych REST‑API ograniczacie liczbę żądań do serwera, aby użytkownik nie zamienił się w DDoS. W świecie MCP i strumieni bardzo łatwo urządzić odwrotny DDoS: worker albo serwer MCP bombarduje klienta tysiącami zdarzeń na sekundę.

Jakie limity są potrzebne

Zwykle myśli się w trzech płaszczyznach.

Po pierwsze, limity na użytkownika lub sesję. Nie można pozwolić jednemu użytkownikowi otworzyć dwudziestu równoległych widżetów‑masterów GiftGenius, każdy ze swoim strumieniem SSE. Rozsądne ograniczenie to kilka aktywnych strumieni na sesję i ograniczenie liczby zadań w statusie running dla jednego użytkownika lub tenanta.

Po drugie, limity na jedno zadanie. Interesuje nas tu częstotliwość zdarzeń. W zupełności wystarczy wysyłać job.progress nie częściej niż raz na N ms albo tylko przy zauważalnej zmianie, na przykład co 5 % postępu. Nie ma potrzeby wysyłać komunikatu po każdym przetworzonym produkcie w katalogu. Ma też sens ograniczanie rozmiaru payloadu: zdarzenie progresu nie powinno nieść megabajtów tekstu.

Po trzecie, limity na IP lub organizację. To już ochrona przed nadużyciami, gdy ktoś uruchamia skrypt spamujący zadaniami albo gdy wasza aplikacja staje się niespodziewanie popularna. Tu do gry wchodzą znane mechanizmy API gatewayów i proxy.

Prosta implementacja limitu częstotliwości zdarzeń

Rozważmy workera GiftGenius, który w tle dobiera prezenty po długiej liście odbiorców i okresowo wysyła progres poprzez MCP‑powiadomienie event/progress. Chcemy, aby zdarzenia wysyłały się nie częściej niż raz na 500 ms i tylko przy zmianie procentu co najmniej o 5 punktów.

Umowny pseudokod TS dla workera:

// załóżmy, że mamy jakiś mcpClient.sendNotification(...)
let lastSentPercent = 0;
let lastSentAt = 0;

function reportProgress(jobId: string, percent: number, message: string) {
  const now = Date.now();
  const percentDelta = percent - lastSentPercent;
  const timeDelta = now - lastSentAt;

  // wysyłamy tylko jeśli minęło >= 500 ms LUB wzrosło >= o 5%
  if (percentDelta >= 5 || timeDelta >= 500) {
    mcpClient.sendNotification("event/progress", {
      jobId,
      percent,
      message,
    });
    lastSentPercent = percent;
    lastSentAt = now;
  }
}

Takie podejście nazywa się throttlingiem: „rozrzedzamy” strumień zdarzeń w czasie i według zmiany wartości.

Jeśli dzielicie pracę na etapy („Etap 1 z 3”, „Etap 2 z 3”), logika jest jeszcze prostsza: wysyłać zdarzenia tylko przy zmianie etapu.

Limit liczby jednocześnie otwartych strumieni

Po stronie serwera MCP najpewniej macie HTTP‑handler SSE:

// app/api/events/[userId]/route.ts (Next.js 16 App Router)
export async function GET(
  req: Request,
  { params }: { params: { userId: string } },
) {
  const userId = params.userId;

  if (!canOpenMoreStreams(userId)) {
    return new Response("Too many streams", { status: 429 });
  }

  const stream = new ReadableStream({
    start(controller) {
      registerSseClient(userId, controller);
    },
    cancel() {
      unregisterSseClient(userId);
    },
  });

  return new Response(stream, {
    headers: { "Content-Type": "text/event-stream" },
  });
}

Funkcja canOpenMoreStreams może sprawdzać bieżącą liczbę otwartych połączeń dla użytkownika i porównywać ją z progiem (na przykład nie więcej niż trzy równoległe strumienie). Jeśli limit jest przekroczony, zwracamy 429 i w instrukcjach GPT wyjaśniamy modelowi, że w takiej sytuacji lepiej nie uruchamiać kolejnego długiego „mastera”, tylko podpowiedzieć użytkownikowi, że „istnieje już aktywne wyszukiwanie prezentów, poczekajmy na jego zakończenie”.

W małych systemach podobne sprawdzenia można zaimplementować w pamięci procesu. W poważniejszej infrastrukturze trafia to do MCP‑gatewaya lub osobnego serwisu rate‑limit.

3. Backpressure: co robić, gdy konsument nie nadąża

Rate‑limits ograniczają, ile chcemy produkować zdarzeń. Ale nawet przy ostrożnych limitach możliwa jest sytuacja, gdy konsument się „krztusi”: użytkownik ma słaby internet mobilny, karta w przeglądarce wisi, ChatGPT jest w danej chwili mocno obciążony.

Backpressure to reakcja systemu na to, że konsument nie nadąża. Zamiast bez końca gromadzić dane i prędzej czy później paść z OOM, świadomie:

  • zwalniamy;
  • agregujemy zdarzenia;
  • odrzucamy mniej ważne.

Gdzie powstaje ciśnienie

Typowy scenariusz dla GiftGenius może wyglądać tak. Worker zapisuje zdarzenia do kolejki (np. Redis Streams lub po prostu tabeli w bazie danych), serwer MCP czyta je i pcha do kanału SSE. Jeśli klient jest wolny (3G, stary laptop, mnóstwo innych kart), bufor TCP zaczyna się wypełniać, proces Node nie nadąża całkowicie opróżniać kolejki i w efekcie gromadzi zdarzenia w pamięci. Dalej widzicie znajomy komunikat:

FATAL ERROR: Ineffective mark-compacts near heap limit

Backpressure na poziomie sieci (TCP) już macie, ale on nie zna waszych dziedzinowych bytów. On po prostu mówi: „Hej, zwolnij, bufor jest zapchany”. Nasze zadanie — zinterpretować to na poziomie zdarzeń MCP.

Buforowanie z limitem i odrzucanie zdarzeń

W przypadku progresu i statusów mamy przyjemną właściwość: nie wszystkie zdarzenia są równie cenne. Użytkownik chce znać ostatni aktualny procent, a nie historię wszystkich pośrednich „51%, 52%, 53%, 54%”. To znaczy, że możemy śmiało dropować część zdarzeń i wysyłać tylko ostatnie.

Załóżmy, że mamy warstwę, która odbiera zdarzenia progresu od workerów i wkłada je do bufora dla każdego jobId:

type ProgressEvent = { jobId: string; percent: number; message: string };

const progressBuffers = new Map<string, ProgressEvent[]>();
const MAX_BUFFER = 10;

function bufferProgress(event: ProgressEvent) {
  const buffer = progressBuffers.get(event.jobId) ?? [];
  buffer.push(event);

  // ograniczamy rozmiar bufora
  if (buffer.length > MAX_BUFFER) {
    // zostawiamy tylko kilka ostatnich zdarzeń
    progressBuffers.set(event.jobId, buffer.slice(-MAX_BUFFER));
  } else {
    progressBuffers.set(event.jobId, buffer);
  }
}

Oddzielny timer, np. co 500 ms, patrzy na bufor i wysyła tylko ostatnie zdarzenie, ignorując pozostałe:

setInterval(() => {
  for (const [jobId, buffer] of progressBuffers.entries()) {
    if (!buffer.length) continue;

    const last = buffer[buffer.length - 1];
    sendProgressToClient(last); // SSE/MCP notification

    progressBuffers.set(jobId, []); // czyścimy
  }
}, 500);

To przykład taktyki conflation: połączenie kilku aktualizacji w jedną aktualną. Dla progresu — złoty wzorzec.

Dla zdarzeń typu „log” lub partial_result strategia może być inna. Tam utrata zdarzeń jest często niedopuszczalna: tekst logów jest ważny, a zaginiony fragment JSON może złamać strukturę danych. W takich przypadkach można:

  • agregować komunikat (skleić kilka linii logów w jeden pakiet);
  • albo wysłać do workera sygnał sterujący „zwolnij generowanie logów”.

W systemach asynchronicznych druga opcja jest trudniejsza, ale warto o niej choćby myśleć.

Ograniczenie głębokości kolejek

Backpressure nie ogranicza się do bufora zdarzeń tuż przed wysyłką. Trzeba patrzeć na wszystkie kolejki w systemie:

  • kolejkę zadań oczekujących na workera;
  • kolejkę zdarzeń między workerem a serwerem MCP;
  • bufory wewnątrz bibliotek strumieniowych po stronie serwera.

Dla każdej kolejki ważne jest wyznaczenie rozsądnego limitu głębokości. Jeśli kolejka się przepełnia, albo zaczynacie odpowiadać klientom „system przeciążony, spróbuj później”, albo odrzucacie mniej ważne zadania, albo przenosicie część scenariuszy do „trybu offline” (np. generujecie raport i wysyłacie do niego link później).

Ciekawą techniką jest priorytetyzacja typów zdarzeń. Przy przeciążeniu możecie zacząć wysyłać tylko job.completed i job.failed, a job.progress obniżyć priorytetowo lub wyłączyć całkowicie.

4. Monitoring strumieni i zdarzeń

Bez pomiarów cała ta piękna koncepcja z rate‑limits i backpressure zamienia się w szamanizm. Trzeba widzieć, że strumieni zrobiło się podejrzanie dużo, zdarzenia idą z opóźnieniem, a klienci masowo odpadają.

Strumienie zachowują się inaczej niż zwykłe żądania HTTP: ich czas trwania może być liczony w minutach i godzinach, więc klasyczne metryki „żądania na sekundę” i „średnia latencja” nie dają pełnego obrazu.

Kluczowe metryki

Dla strumieni SSE lub HTTP/stream warto śledzić kilka grup wskaźników.

  1. Metryki połączeń. Ile jest teraz aktywnych strumieni SSE? Jak długo średnio żyje jedno połączenie? Jaki procent strumieni kończy się błędem lub timeoutem? Nagły skok liczby aktywnych połączeń mówi o potencjalnym sztormie ruchu albo wycieku zasobów (klienci nie zamykają połączeń). Gwałtowny spadek — o masowym zerwaniu (np. problemy w sieci lub krytyczny bug na serwerze).
  2. Metryki zdarzeń. Ile zdarzeń wysyłacie na sekundę we wszystkich strumieniach (EPS — events per second, w istocie liczba zdarzeń na sekundę)? Jaki jest średni rozmiar zdarzenia? Ile widzicie błędów deserializacji lub walidacji payloadu? Jeśli nagle widzicie wzrost rozmiaru zdarzeń — możliwe, że ktoś zaczął wysyłać w job.progress zamiast krótkiego tekstu cały raport.
  3. Metryki zadań (jobów). Rozkład po statusach (pending, running, completed, failed, canceled), średni czas wykonania per typ zadania, procent zadań wpadających w retry lub dead‑letter. To pomaga zrozumieć, że problem nie jest tylko na poziomie sieci, ale i w workerach: zewnętrzne API zwolniło, pojawiły się masowe błędy.
  4. Metryki backpressure i wskaźniki systemowe. W systemach strumieniowych często patrzy się na głębokość buforów i kolejek między komponentami, a także na procent czasu, gdy strumień jest zablokowany, czekając aż konsument zwolni miejsce. Jeśli wasze kolejki prawie zawsze są wypełnione po brzegi, to jasny sygnał, że system pracuje na granicy. Ważne jest też śledzenie wskaźników systemowych: CPU i pamięci na serwerach, które zajmują się streamingiem, oraz błędów/timeoutów na poziomie sieci. Czasem to przepustowość sieci między serwerem MCP a ChatGPT staje się wąskim gardłem.

W sumie te cztery grupy dają odpowiedź na trzy pytania: ile strumieni żyje teraz, ile danych przesyłacie, jak zachowują się zadania i gdzie dokładnie system zaczyna się krztusić.

Co logować

Logi — drugi filar obserwowalności. Ważne jest logowanie zdarzeń i połączeń tak, aby potem dało się złożyć historię dla konkretnego zadania.

Zwykle do logów dla każdego zdarzenia i strumienia dodaje się:

  • jobId i/lub eventId;
  • userId i sessionId (jeśli jest multi‑tenancy);
  • typ zdarzenia (progress, completed, failed, resource.updated);
  • typ kanału (SSE lub HTTP/stream);
  • znacznik czasu wysyłki i, jeśli to możliwe, znacznik czasu powstania zdarzenia u workera.

W ten sposób można policzyć lag: różnicę między czasem, kiedy worker wygenerował zdarzenie, a czasem, kiedy wyszło ono do gniazda. Wzrost tego opóźnienia (lagu) to dobry wskaźnik problemów z backpressure.

Trzeba uważać, aby same logi nie stały się źródłem przeciążenia. Dla wysoko‑częstotliwościowych zdarzeń takich jak job.progress logowanie każdego zdarzenia nie zawsze ma sens; można włączyć sampling — logować co N‑te zdarzenie zamiast wszystkich — albo agregować statystyki.

Kodowo może to wyglądać tak jako prosty helper:

function logEvent(event: {
  type: string;
  jobId: string;
  userId?: string;
  channel: "sse" | "http-stream";
  payload: unknown;
}) {
  console.info({
    ...event,
    timestamp: new Date().toISOString(),
  });
}

W realnym projekcie opakowujecie to w bibliotekę do structured logging, ale idea jest ta sama: maksimum użytecznego kontekstu w każdym wpisie.

5. Alerty i polityki degradacji

Gdy macie już metryki i logi, kolejny krok — ustawić alerty i przemyśleć, jak system ma „degradować”, gdy jest mu źle. Chodzi o to, że lepiej uczciwie działać gorzej, niż nagle się wywrócić.

Przykłady alertów

Dla GiftGenius sensowne jest śledzenie kilku typowych sytuacji.

Po pierwsze, anomalia liczby aktywnych strumieni. Jeśli zwykle macie dziesiątki aktywnych połączeń SSE, a nagle zrobiły się tysiące, warto sprawdzić, co się dzieje. Może staliście się popularni, a może — macie buga i połączenia nie są zamykane.

Po drugie, opóźnienie między faktycznym zakończeniem zadania a otrzymaniem przez klienta job.completed. Jeśli opóźnienie zaczyna przekraczać próg (powiedzmy, 510 sekund), to znaczy, że gdzieś między workerem a klientem gromadzą się zdarzenia lub zgrzytają połączenia.

Po trzecie, wysoki odsetek job.failed lub job.canceled w porównaniu z udanymi. Przyczyna może leżeć zarówno w workerze (zepsute zewnętrzne API, nowy bug), jak i w większej wrażliwości użytkowników na opóźnienia (zaczynają częściej anulować zadania).

Wreszcie, zwiększony poziom błędów połączeń i zerwań strumienia: jeśli rośnie liczba niestandardowych disconnectów, możliwe, że są problemy w sieci lub po stronie klienta i warto pomyśleć o scenariuszach fallbacku.

Wzorce degradacji

Gdy system jest przeciążony, można włączyć „tryb oszczędzania zasobów”. To lepsze niż po prostu zacząć odpowiadać 500 na wszystko.

Najczęstszy wzorzec — adaptacyjna częstotliwość zdarzeń. Jeśli widzicie, że event‑rate (liczba zdarzeń na sekundę) wzleciał dziesięć razy powyżej zwykłego poziomu i w kolejkach zaczyna rosnąć lag, zmniejszcie częstotliwość zdarzeń progresu. Było co 1 % — zróbcie co 10 %. Było co 500 ms — róbcie raz na 23 sekundy. Użytkownik spokojnie przeżyje bez superdokładnego progresu, ale z całkowicie zawieszonym UI — już nie bardzo.

Dla mniej ważnych zdarzeń — na przykład resource.updated przy aktualizacji feedu produktów w tle — można tymczasowo wyłączyć wysyłkę całkiem, póki system jest pod obciążeniem.

Jeszcze jedna technika — przeniesienie części scenariuszy ze strumieni na okresowy polling. Jeśli kanały SSE się sypią, serwer MCP może wysłać do widżetu zdarzenie systemowe typu system.overloaded, a widżet przełączy się na strategię „co N sekund odpytuję REST‑endpoint o status zadania”.

6. Niewielki praktyczny fragment dla GiftGenius

Aby to spiąć w całość, załóżmy, że mamy już:

  • MCP‑tool startGiftSearch, który tworzy zadanie i zwraca jobId;
  • workera, który wykonuje wyszukiwanie i wysyła event/progress oraz event/completed;
  • SSE‑endpoint /api/events/[userId], do którego podłącza się widżet w Next.js.

Dodajmy prostą warstwę ochrony przed „sztormem zdarzeń” i minimalny monitoring.

Ograniczenie progresu po krokach i czasie

W workerze dodajemy throttling i conflation, jak wyżej. Teraz zdarzenia wysyłają się nie częściej niż raz na pół sekundy i przy zmianie co najmniej o 5 %.

Zliczanie aktywnych strumieni

W SSE‑endpoint trzymamy licznik per użytkownik:

const activeStreams = new Map<string, number>();
const STREAM_LIMIT = 3;

function canOpenMoreStreams(userId: string) {
  const current = activeStreams.get(userId) ?? 0;
  return current < STREAM_LIMIT;
}

function registerSseClient(userId: string, controller: ReadableStreamDefaultController) {
  const current = activeStreams.get(userId) ?? 0;
  activeStreams.set(userId, current + 1);

  // tutaj zapisujecie controller do jakiejś struktury,
  // aby później pisać do tego strumienia zdarzenia
}

function unregisterSseClient(userId: string) {
  const current = activeStreams.get(userId) ?? 1;
  activeStreams.set(userId, Math.max(0, current - 1));
}

Metryki dotyczące activeStreams.size serwer może dodatkowo wysyłać do Prometheus/Grafana lub dowolnego innego systemu monitoringu.

Najprostsza metryka event‑rate

Na początek można jakkolwiek policzyć, ile zdarzeń wysyłamy:

let eventsSentLastMinute = 0;

function sendProgressToClient(ev: ProgressEvent) {
  // ... serializacja i zapis do strumienia SSE
  eventsSentLastMinute++;
}

setInterval(() => {
  console.info({
    metric: "events_per_minute",
    value: eventsSentLastMinute,
    timestamp: new Date().toISOString(),
  });
  eventsSentLastMinute = 0;
}, 60_000);

Z czasem można to zastąpić normalnymi licznikami i alertami, ale jako punkt startowy — już nieźle.

Jeśli zebrać wszystko powyżej: limity, backpressure, metryki/alerty i sensowny UX‑fallback sprawiają, że wasz GiftGenius przestaje być „demem do dema” i wytrzymuje realne sztormy ruchu. W kolejnych modułach, gdzie będziemy mówić o gatewayu, architekturze produkcyjnej i pełnoprawnej obserwowalności, te wzorce jeszcze się przydadzą.

7. Typowe błędy przy pracy ze strumieniami, rate limits i monitoringiem

Błąd nr 1: brak limitów na liczbę strumieni i częstotliwość zdarzeń.
Deweloperzy dodali SSE „żeby było ładnie”, workery uczciwie wysyłają progres po każdym przetworzonym obiekcie i wszystko niby działa na demo. Ale przy pierwszym skoku realnych użytkowników serwer zaczyna większość zasobów wydawać na serializację i przesyłanie tysięcy maleńkich zdarzeń, a UI w ChatGPT zamienia się w pokaz slajdów.

Błąd nr 2: próba buforowania „wszystkiego naraz” bez ograniczeń.
W kodzie pojawia się nieograniczona tablica „niewysłanych zdarzeń”, która rośnie, dopóki klient się nie podniesie. Spoiler: nie podniesie się, serwer padnie wcześniej. Każdy bufor musi mieć twarde maksimum, a logika obsługi przepełnienia — być jawna.

Błąd nr 3: jednakowe traktowanie wszystkich typów zdarzeń.
Progres można agregować i dropować (ostatni procent jest ważniejszy niż historia ruchu). Z logami i partial‑rezultatami tak robić nie można — utrata jednego fragmentu może oznaczać uszkodzone dane. Projektując system, z góry grupujcie zdarzenia według ważności i wymyślcie dla każdej grupy strategię na przeciążenie.

Błąd nr 4: brak obserwowalności.
Żadnych metryk aktywnych strumieni, żadnego liczenia event‑rate, w logach — tylko „coś poszło nie tak”. W takiej sytuacji o problemach dowiadujecie się tylko z opinii użytkowników i wykresu obciążenia CPU. Skonfigurowanie chociaż podstawowych metryk i logów po jobId i eventId — to nie luksus, a konieczność.

Błąd nr 5: sztywny UX, nieprzewidujący degradacji.
Widżet i instrukcje GPT zakładają, że strumień jest zawsze dostępny, progres aktualizuje się „w czasie rzeczywistym”, partial‑results przychodzą ściśle według scenariusza. Przy pierwszych problemach sieciowych użytkownik widzi „zamrożony” pasek postępu i żadnego wyjaśnienia. Znacznie lepiej wbudować w UX uczciwy fallback: „Są teraz problemy z live‑aktualizacją, i tak kontynuuję dobór i dam znać, gdy skończę” — i przejść na rzadsze aktualizacje lub polling.

Błąd nr 6: wiara, że „nasi użytkownicy nie utworzą wielu równoczesnych zadań”.
Praktyka pokazuje, że jeśli nie ograniczyliście liczby równoległych zadań i strumieni, ktoś na pewno otworzy pięć kart, w każdej uruchomi dobór prezentów „na maksa” i pójdzie na kawę. Pomysł „jakoś to będzie” na produkcji prawie zawsze kończy się przyspieszonym kursem monitoringu przy akompaniamencie głośnych alertów.

1
Ankieta/quiz
Powiadomienia, poziom 13, lekcja 4
Niedostępny
Powiadomienia
Powiadomienia i scenariusze strumieniowe (zdarzenia MCP)
Komentarze
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION