CodeGym /Kurse /C# SELF /Channel: Produzent–Konsument (

Channel: Produzent–Konsument ( Channel)

C# SELF
Level 62 , Lektion 3
Verfügbar

1. Einführung

Heute steigen wir eine Stufe höher! Es ist Zeit, ein ganz besonderes Werkzeug kennenzulernen — den Channel. Das Ding wurde speziell für moderne asynchrone Anwendungen in .NET entwickelt: dort, wo normale Locks entweder nicht helfen oder dich stark ausbremsen.

Dich erwartet das Producer-Consumer-Pattern, das seit den 60er Jahren des 20. Jahrhunderts nichts an Relevanz verloren hat. Du baust eine einfache asynchrone "Pipeline", in der einige Threads oder Tasks Dinge produzieren (z.B. Dateien herunterladen, Zahlen berechnen, auf Events warten) und andere sie verarbeiten (z.B. speichern, in eine DB schreiben, im UI anzeigen).

Warum ist der Channel entstanden?

  1. Das Producer-Consumer-Pattern wurde lange mit Queues gelöst: der Producer legt Aufgaben in eine Queue, der Consumer holt sie raus. Aber! BlockingCollection<T>, Queues auf Basis von ConcurrentQueue<T> oder manuelle Synchronisation mit lock — das ist nicht asynchron. Threads können nur blockieren und geben nicht die Steuerung an den Scheduler bei async/await zurück.
  2. Asynchronität in .NET ist nicht nur ein Modewort, sondern die Grundlage moderner Architektur. Threads für das Warten auf Elemente zu blockieren ist teuer und ineffizient. Man muss warten können, ohne zu blockieren — genau das löst der Channel.
  3. Flexibilität: mit Channels kannst du komplexe Daten-Pipelines bauen, Logik zwischen Threads trennen, Zwischenstufen und Load-Balancing einfügen — und das alles ohne low-level Synchronisierungs-Schmerzen.

Was ist ein Channel? (Analogie und Architektur)

Stell dir vor, du hast einen Staffelstab (oder ein Förderband), über das Objekte von A nach B übergeben werden, ohne dass sich zwei Personen persönlich treffen müssen. Hauptsache, der Stab geht nicht unterwegs verloren.

Der Channel ist ein in .NET eingebautes Mittel zur asynchronen Übergabe von Daten zwischen verschiedenen Tasks, Threads oder Programmteilen. Er implementiert eine asynchrone Queue mit Unterstützung für "await" sowohl beim Einfügen als auch beim Entnehmen von Items.

  • Produzent legt Items in den Channel (z.B. Requests zur Verarbeitung);
  • Konsument holt Items heraus — und fertig!

2. Die Klasse Channel<T> und ihr Aufbau

Alles beginnt mit dem Namespace:

using System.Threading.Channels;

Im Gegensatz zu vertrauten Collections ist der Channel eher eine Fabrik, die spezielle Objekte zur Datenübergabe erzeugt.

Haupttypen:

  • ChannelWriter<T> — der "Writer" (Produzent). Schreibt nur Items.
  • ChannelReader<T> — der "Reader" (Konsument). Liest nur Items.
  • Der Kanal (Channel) trennt Verantwortlichkeiten: der Writer weiß nichts über den Reader und umgekehrt.

In .NET gibt es mehrere Implementierungen von Channels mit unterschiedlichen Eigenschaften: unbounded (ohne Größenlimit), bounded (mit Kapazitätslimit), single-producer-single-consumer (SPSC), multi-producer-multi-consumer (MPMC) usw. Wir fangen mit der universellsten Variante an.

Ein einfaches Beispiel: asynchrone Task-Queue

using System;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        // Erstellen eines unbounded Channels
        var channel = Channel.CreateUnbounded<int>();

        // Producer-Task
        var producer = Task.Run(async () =>
        {
            for (int i = 0; i < 10; i++)
            {
                Console.WriteLine($"Producer: Legt {i} in den Channel");
                await channel.Writer.WriteAsync(i); // Asynchrone Write!
                await Task.Delay(100); // Arbeit simulieren
            }
            channel.Writer.Complete(); // Signal: wir schreiben nichts mehr
        });

        // Consumer-Task
        var consumer = Task.Run(async () =>
        {
            await foreach (var item in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"Consumer: Hat {item} aus dem Channel erhalten");
                await Task.Delay(200); // Verarbeitung simulieren
            }
            Console.WriteLine("Consumer: Channel ist geschlossen");
        });

        await Task.WhenAll(producer, consumer);
    }
}

Was passiert hier?

  • Channel.CreateUnbounded<int>() — erstellt einen Channel ohne Queue-Limit.
  • Der Producer schreibt die Zahlen von 0 bis 9 mit WriteAsync in den Channel.
  • Nach dem Schreiben ruft er Complete() auf — das Signal "Es kommen keine Elemente mehr!".
  • Der Consumer iteriert über alle Items via ReadAllAsync() (ebenfalls asynchron), bis der Channel geschlossen ist.
  • Die Delays (Task.Delay) simulieren echte Arbeit: die Produktion kann schneller sein als das Konsumieren.

3. Warum funktioniert das asynchron?

Normale blockierende Queues (z.B. BlockingCollection oder Strukturen mit lock) können nur den Thread blockieren. Das bedeutet Ressourcenverlust, wenn viele Tasks laufen oder man maximale Performance will.

Mit Channels:

  • Wenn Producer schneller sind, sammelt der Channel Items an (limitiert nur durch Memory oder durch gesetzte Kapazität).
  • Wenn Consumer schneller sind, warten sie auf neue Items — ohne den Thread dauerhaft zu blockieren, der dem Scheduler zurückgegeben wird.

Das ist ideal, wenn du nicht vorher weißt, wer schneller ist — Producer oder Consumer.

Anwendungsfälle in der Praxis

  • Asynchrones Logging: Schreiben von Logs in Datei/DB in einem separaten Task;
  • Verarbeitung von Web-Requests: eine Task lädt Seiten, eine andere analysiert sie;
  • Scanning und Indexing von Ordnern: einige Tasks traversieren das Filesystem, andere berechnen Statistiken;
  • Komplexe Data-Pipelines: z.B. in ETL-Szenarien wandelt eine Stufe Rohdaten in Halbzeuge, die nächste Stufe macht das Endprodukt.

4. Beschränkter Channel (Bounded Channel)

Unbegrenzte Channels sind witzig, aber RAM ist nicht unendlich (auch wenn dein Rechner groß wirkt).

Ein bounded Channel erlaubt, die maximale Anzahl von Items festzulegen, die simultan im Channel sein dürfen. Ist der Channel voll, wartet der Producer, bis der Consumer Platz macht.

Beispiel:

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(3)
{
    FullMode = BoundedChannelFullMode.Wait // (Default) - warten, bis Platz frei wird
});

Hier können nur drei Items gleichzeitig im Channel sein. Versucht der Producer ein viertes zu schreiben, muss er warten.

Mehrere Producer und Consumer

var channel = Channel.CreateUnbounded<int>();

// 2 Producer
for (int producerId = 0; producerId < 2; producerId++)
{
    Task.Run(async () =>
    {
        for (int i = 0; i < 5; i++)
        {
            int value = producerId * 100 + i;
            Console.WriteLine($"Producer {producerId}: Legt {value}");
            await channel.Writer.WriteAsync(value);
            await Task.Delay(50);
        }
        // Jeder Producer ruft Complete() auf — gefährlich!
    });
}
// Trick: Complete() darf nur einmal aufgerufen werden, wenn ALLE Producer fertig sind.
// In der Praxis benutzt man dafür normalerweise einen Task-Zähler oder Task.WhenAll.
Task.Run(async () =>
{
    await foreach (var item in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Consumer hat {item} erhalten");
        await Task.Delay(100);
    }
});

Achtung! Der Channel darf nur dann geschlossen werden (mit Complete()), wenn wirklich alle Producer ihre Arbeit beendet haben. Sonst versuchen andere zu schreiben, während der Channel schon geschlossen ist. In der Praxis verwendet man dafür oft Zähler oder Task.WhenAll.

5. Praxis: Bildverarbeitung über einen Channel

Machen wir das Beispiel etwas komplexer! Stell dir vor, wir haben einen Ordner mit Bildern. Eine Task sucht Bilder und legt ihre Pfade in den Channel, eine andere Task nimmt Pfade und macht etwas Nützliches damit (z.B. Dateigröße bestimmen oder konvertieren).

Hinweis: Der Einfachheit halber arbeitet das Beispiel mit Dateinamen (ohne echte Bildverarbeitung), aber das Prinzip ist dasselbe.

using System;
using System.IO;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        var channel = Channel.CreateBounded<string>(5);

        // Producer: sucht .jpg Dateien im Ordner
        var producer = Task.Run(async () =>
        {
            foreach (var file in Directory.EnumerateFiles(@"images", "*.jpg"))
            {
                await channel.Writer.WriteAsync(file);
                Console.WriteLine($"Zur Queue hinzugefügt: {file}");
                await Task.Delay(50); // Suche-Verzögerung simulieren
            }
            channel.Writer.Complete(); // Ende der Queue
        });

        // Consumer: liest und "verarbeitet" Dateien
        var consumer = Task.Run(async () =>
        {
            await foreach (var file in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"Verarbeite Datei: {file}");
                await Task.Delay(200); // Verarbeitung simulieren
            }
            Console.WriteLine("Alle Bilder wurden verarbeitet!");
        });

        await Task.WhenAll(producer, consumer);
    }
}

6. Konfigurieren des Channel: Optionen und Feinheiten

Channels lassen sich beim Erstellen mit Optionen konfigurieren — hier die wichtigsten Parameter für bounded Channels:

Option Beschreibung
Capacity
Maximale Anzahl von Items, die gleichzeitig im Channel sein dürfen
SingleWriter
true, wenn nur ein Producer existiert (beschleunigt)
SingleReader
true, wenn nur ein Consumer existiert (beschleunigt)
FullMode
Was tun, wenn der Channel voll ist? Mögliche Werte: Wait, DropWrite, DropOldest, DropNewest

Beispiel mit Optionen:

var options = new BoundedChannelOptions(10)
{
    SingleWriter = false,
    SingleReader = true,
    FullMode = BoundedChannelFullMode.Wait
};
var channel = Channel.CreateBounded<string>(options);

7. Asynchrone Methoden: ReadAsync, WriteAsync, ReadAllAsync

Warum ist async so wichtig?

Die Methoden WriteAsync und ReadAsync blockieren den Thread nicht! Wenn nichts zu lesen da ist, wird die Task pausiert und der Thread freigegeben. Das ist besonders wichtig für Server- und UI-Apps, wo Blockieren zu "Freezes" führen kann.

ReadAllAsync — Komfort in modernem C#

Man kann asynchron iterieren:

await foreach (var item in channel.Reader.ReadAllAsync())
{
    // Arbeite mit item
}

Channel<T> vs. thread-safe Collections: wo liegt der Unterschied?

ConcurrentQueue<T>/BlockingCollection<T> sind gut für Thread-Szenarien, aber nicht ideal für reine Async-/await-Szenarien.

Channel<T> wurde für asynchrone Pipeline-Anwendungen entworfen. In Sachen Thread-Safety tun beide Ansatzmöglichkeiten ihren Job, aber Channels bieten Flexibilität und Integration mit modernen C#-Features (z.B. IAsyncEnumerable).

8. Fehler und typische Fallen

Vergiss nicht, Complete() am Writer aufzurufen, wenn alle Items hinzugefügt sind! Sonst hängt der Consumer ewig auf neue Items.

Rufe Complete() nicht mehrfach auf, wenn es mehrere Writer gibt — tu das erst, wenn wirklich alle Producer fertig sind.

Nach dem Schließen des Channels kannst du nicht mehr schreiben, aber noch vorhandene Items können gelesen werden.

Race-Condition beim gleichzeitigen Schreiben: wenn der Channel geschlossen ist und jemand trotzdem schreibt — bekommst du eine Exception.

Kommentare
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION