CodeGym /Kursy /C# SELF /Channel: producent–konsument (

Channel: producent–konsument ( Channel)

C# SELF
Poziom 62 , Lekcja 3
Dostępny

1. Wprowadzenie

Dziś wchodzimy na nowy poziom! Czas poznać całkiem specyficzne narzędzie — Channel. Ta rzecz została stworzona specjalnie dla nowoczesnych asynchronicznych aplikacji w .NET: tam, gdzie zwykłe blokady albo nie pomagają, albo mocno spowalniają cię na drodze do szczęścia.

Czeka cię spotkanie ze wzorcem "Producent-Konsument" (Producer-Consumer), który nie traci na popularności od lat 60. XX wieku. Stworzysz prosty asynchroniczny "pipeline", gdzie jedne wątki lub zadania coś produkują (np. pobierają pliki, liczą liczby, czekają na zdarzenia), a inne je przetwarzają (np. zapisują, wrzucają do bazy, aktualizują UI).

Dlaczego powstał Channel?

  1. Wzorzec "Producent-Konsument" od dawna rozwiązywano kolejkami: producent wkłada zadania do kolejki, konsument je pobiera. Ale! BlockingCollection<T>, kolejki z ConcurrentQueue<T>, nawet ręczna synchronizacja za pomocą lock — to wszystko nie jest asynchroniczne. Innymi słowy wątek może tylko się zablokować w oczekiwaniu na dane, zamiast zwrócić sterowanie schedulerowi async/await.
  2. Asynchroniczność w .NET to nie tylko modne słówko na piątki, ale fundament nowoczesnej architektury. Blokowanie wątków czekając na elementy jest kosztowne i nieefektywne. Trzeba umieć czekać na pojawienie się danych bez blokowania — to zadanie, które rozwiązuje Channel.
  3. Elastyczność: za pomocą kanałów można zbudować skomplikowane pipeline'y przetwarzania danych, rozdzielać logikę wątków, dodawać kroki pośrednie i balansować obciążenie — i to bez bólu synchronizacji niskiego poziomu.

Co to jest Channel? (Analogia i architektura)

Wyobraź sobie, że masz pałeczkę sztafetową (albo taśmę na transporterze), po której można przekazywać obiekty z jednego miejsca do drugiego i nikt nie musi spotykać się osobiście z kolegą z sąsiedniego działu. Najważniejsze, żeby pałeczka nie zaginęła po drodze.

Channel to wbudowane w .NET narzędzie do asynchronicznego przesyłania danych między różnymi zadaniami (tasks), wątkami lub częściami programu. Implementuje asynchroniczną kolejkę z obsługą „czekania” zarówno na wstawianie, jak i pobieranie elementów.

  • Producent wkłada do kanału elementy (np. żądania do przetworzenia);
  • Konsument pobiera elementy — i po sprawie!

2. Klasa Channel<T> i jej budowa

Wszystko zaczyna się od przestrzeni nazw:

using System.Threading.Channels;

W przeciwieństwie do znanych kolekcji, Channel to fabryka, która tworzy specjalne obiekty do przesyłania danych.

Podstawowe typy:

  • ChannelWriter<T> — "writer" (producent). Tylko wstawia elementy.
  • ChannelReader<T> — "reader" (konsument). Tylko pobiera elementy.
  • Kanał (Channel) rozdziela odpowiedzialność: writer nic nie wie o readerze i odwrotnie.

W .NET jest kilka implementacji kanału, każda ze swoimi cechami: unbounded (bez limitu rozmiaru), bounded (z limitem elementów), single-producer-single-consumer (SPSC), multi-producer-multi-consumer (MPMC) itd. Zaczniemy od najbardziej uniwersalnej.

Prosty przykład: asynchroniczna kolejka zadań

using System;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        // Tworzymy kanał bez limitu rozmiaru
        var channel = Channel.CreateUnbounded<int>();

        // Zadanie-producer
        var producer = Task.Run(async () =>
        {
            for (int i = 0; i < 10; i++)
            {
                Console.WriteLine($"Producent: Wkłada {i} do kanału");
                await channel.Writer.WriteAsync(i); // Asynchroniczny zapis!
                await Task.Delay(100); // Symulacja pracy
            }
            channel.Writer.Complete(); // Mówimy, że więcej nie będziemy pisać
        });

        // Zadanie-consumer
        var consumer = Task.Run(async () =>
        {
            await foreach (var item in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"Konsument: Otrzymał {item} z kanału");
                await Task.Delay(200); // Symulacja przetwarzania
            }
            Console.WriteLine("Konsument: Kanał zamknięty");
        });

        await Task.WhenAll(producer, consumer);
    }
}

Co tu się dzieje?

  • Channel.CreateUnbounded<int>() — tworzymy kanał bez ograniczeń rozmiaru kolejki.
  • Producent zapisuje liczby od 0 do 9 do kanału za pomocą WriteAsync.
  • Po zakończeniu zapisu wywoływane jest Complete() — sygnał "Nie będzie więcej elementów!".
  • Konsument iteruje po wszystkich elementach przez ReadAllAsync() (również asynchronicznie!), dopóki kanał się nie zamknie.
  • Opóźnienia (Task.Delay) symulują rzeczywistą pracę: widać — liczby są zapisywane szybciej niż czytane.

3. Dlaczego to wszystko działa asynchronicznie?

Zwykłe blokujące kolejki (np. BlockingCollection lub te chronione lock) mogą tylko zablokować wątek. To oznacza — tracimy cenne zasoby, jeśli mamy dużo zadań lub chcemy być maksymalnie wydajni.

Z kanałami:

  • Jeśli producent jest szybszy, kanał gromadzi elementy (ograniczony tylko pamięcią lub maksymalnym rozmiarem, jeśli ustawiony).
  • Jeśli konsument jest szybszy, będzie czekał, aż coś się pojawi (i nie zablokuje wątku na amen, tylko odda go schedulerowi).

To idealne rozwiązanie dla scenariuszy, gdy nie wiesz z góry, kto będzie szybszy — producenci czy konsumenci.

Zastosowania w praktyce

  • Asynchroniczne logowanie: zapisywanie wiadomości do pliku/bazy odbywa się w oddzielnym wątku;
  • Przetwarzanie zapytań web: jedno zadanie pobiera paczkę stron, drugie analizuje ich zawartość;
  • Skanning i indeksowanie folderów: jedne zadania przeszukują system plików, inne liczą statystyki dla plików;
  • Kompleksowe pipeline'y przetwarzania danych: np. w zadaniach ETL (Extract–Transform–Load) jeden krok zamienia surowiec w półprodukt, inny robi z tego produkt.

4. Ograniczony kanał (Bounded Channel)

„Nieograniczone” kanały są fajne, ale pamięć wciąż nie jest nieskończona (nawet jeśli twój komputer wydaje się ogromny).

Ograniczony kanał (bounded) pozwala ustawić maksymalną liczbę elementów, które mogą jednocześnie znajdować się wewnątrz. Jeśli kanał jest pełny — producent czeka, aż konsument coś pobierze.

Przykład:

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(3)
{
    FullMode = BoundedChannelFullMode.Wait // (domyślnie) - czekać, aż zwolni się miejsce
});

Tutaj tylko trzy elementy mogą jednocześnie znajdować się w kanale. Jeśli producent spróbuje zapisać czwarty — będzie czekał.

Kilka producentów i konsumentów

var channel = Channel.CreateUnbounded<int>();

// 2 producentów
for (int producerId = 0; producerId < 2; producerId++)
{
    Task.Run(async () =>
    {
        for (int i = 0; i < 5; i++)
        {
            int value = producerId * 100 + i;
            Console.WriteLine($"Producent {producerId}: Wkłada {value}");
            await channel.Writer.WriteAsync(value);
            await Task.Delay(50);
        }
        // Każdy producent woła Complete() — niebezpieczne!
    });
}
// Sztuczka: Complete() powinno być wywołane tylko raz, kiedy WSZYSCY producenci zakończyli pracę.
// Dla przykładu zostawimy jedno zadanie-konsument:
Task.Run(async () =>
{
    await foreach (var item in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Konsument otrzymał {item}");
        await Task.Delay(100);
    }
});

Uwaga! Kanał powinien być zamknięty (przez Complete()) dopiero po zakończeniu pracy wszystkich producentów. Inaczej ktoś może spróbować zapisać, a kanał będzie już zamknięty! W praktyce zazwyczaj używa się licznika zadań lub Task.WhenAll.

5. Praktyka: Przetwarzanie obrazów przez kanał

Dodajmy odrobinę trudności! Wyobraź sobie, że masz folder z obrazami. Jedno zadanie znajduje obrazy i wkłada ich ścieżki do kanału, drugie — pobiera ścieżkę i robi z tym plikiem coś użytecznego (np. oblicza rozmiar albo konwertuje).

Wyjaśnienie: Dla prostoty przykład będzie operował na nazwach plików (bez rzeczywistej manipulacji obrazami), ale idea jest identyczna.

using System;
using System.IO;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        var channel = Channel.CreateBounded<string>(5);

        // Producent: szuka plików .jpg w folderze
        var producer = Task.Run(async () =>
        {
            foreach (var file in Directory.EnumerateFiles(@"images", "*.jpg"))
            {
                await channel.Writer.WriteAsync(file);
                Console.WriteLine($"Dodano do kolejki: {file}");
                await Task.Delay(50); // symulujemy opóźnienie wyszukiwania
            }
            channel.Writer.Complete(); // koniec kolejki
        });

        // Konsument: czyta i "przetwarza" pliki
        var consumer = Task.Run(async () =>
        {
            await foreach (var file in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"Przetwarzanie pliku: {file}");
                await Task.Delay(200); // symulacja przetwarzania
            }
            Console.WriteLine("Wszystkie obrazy przetworzone!");
        });

        await Task.WhenAll(producer, consumer);
    }
}

6. Konfigurowanie Channel: opcje i niuanse

Kanały można konfigurować przy tworzeniu — oto główne parametry dla bounded kanałów:

Opcja Opis
Capacity
Maksymalna liczba elementów, które mogą być jednocześnie w kanale
SingleWriter
true, jeśli masz tylko jednego producenta (przyspiesza działanie)
SingleReader
true, jeśli masz tylko jednego konsumenta (przyspiesza działanie)
FullMode
Co robić, gdy kanał się zapełni? Możliwe wartości: Wait, DropWrite, DropOldest, DropNewest

Przykład z opcjami:

var options = new BoundedChannelOptions(10)
{
    SingleWriter = false,
    SingleReader = true,
    FullMode = BoundedChannelFullMode.Wait
};
var channel = Channel.CreateBounded<string>(options);

7. Metody asynchroniczne: ReadAsync, WriteAsync, ReadAllAsync

Dlaczego async jest tak ważny?

Metody WriteAsync i ReadAsync nie blokują wątku! Jeśli nic nie ma do czytania — zadanie zostaje wstrzymane, zwalniając wątek dla innych zadań. To szczególnie ważne w aplikacjach serwerowych i UI, gdzie dodatkowe blokady mogą powodować "zamrożenia".

ReadAllAsync — wygoda współczesnego C#

Można iterować asynchronicznie:

await foreach (var item in channel.Reader.ReadAllAsync())
{
    // Pracujemy z item
}

Channel<T> a kolekcje bezpieczne dla wątków: gdzie jest różnica?

ConcurrentQueue<T>/BlockingCollection<T> są dobre dla scenariuszy z wątkami, ale nie nadają się do czystej asynchroniczności (await-scenariusze).

Channel<T> został zaprojektowany właśnie dla asynchronicznych aplikacji pipeline. Pod względem bezpieczeństwa wątków, obie grupy kolekcji radzą sobie dobrze, ale kanały dają elastyczność i integrację z nowoczesnymi możliwościami C# (IAsyncEnumerable i inne).

8. Błędy i typowe pułapki

Nie zapomnij wywołać Complete() u writera, kiedy wszystkie elementy zostały dodane! W przeciwnym razie konsument będzie wisiał w oczekiwaniu na nowe elementy na zawsze.

Nie wywołuj Complete() wielokrotnie, jeśli jest wielu writerów — rób to dopiero, gdy absolutnie wszyscy producenci zakończą pracę.

Po zamknięciu kanału nie można już pisać elementów, ale można czytać pozostałe.

Race condition przy jednoczesnym zapisie: jeśli kanał zostanie zamknięty, a ktoś nadal próbuje zapisać — dostaniesz wyjątek.

Komentarze
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION