CodeGym /Kurse /C# SELF /Fortgeschrittene Patterns und Besonderheiten von

Fortgeschrittene Patterns und Besonderheiten von Concurrent-Collections

C# SELF
Level 58 , Lektion 3
Verfügbar

1. „Producer‑Consumer“ + Concurrent

Wir haben das Pattern „Producer‑Consumer“ bereits beim Thema ConcurrentQueue angeschnitten. Schauen wir es uns detaillierter an für den Fall mit mehreren Producers und Consumers sowie korrekter Signalisierung der Beendigung.

Der große Vorteil von ConcurrentQueue (und anderen Concurrent-Collections) ist, dass sie sich um Thread-Safety kümmert. Man muss Enqueue oder TryDequeue nicht mit einem lock umgeben — mehrere Threads können sicher über die gemeinsame Queue arbeiten.

Beispiel: Mehrere Producers und mehrere Consumers

Mehrere Worker-Threads erzeugen Tasks, und mehrere andere Threads verarbeiten sie.

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();
CancellationTokenSource cts = new CancellationTokenSource(); // Zur Abbruch-Signalisierung an die Consumers

// Methode für den Producer
void Producer(string name, int count)
{
    for (int i = 0; i < count; i++)
    {
        string task = $"Zadacha_{name}_{i}";
        taskQueue.Enqueue(task);
        Console.WriteLine($"[P:{name}] Hinzugefügt: {task}");
        Thread.Sleep(10); 
    }
}

// Methode für den Consumer
void Consumer(string name)
{
    while (!cts.Token.IsCancellationRequested || taskQueue.Count > 0)
    {
        if (taskQueue.TryDequeue(out string task))
        {
            Console.WriteLine($"[C:{name}] Verarbeitet: {task}");
            Thread.Sleep(20); 
        }
        else
        {
            Thread.Sleep(50); // Warten, wenn die Queue leer ist
        }
    }
    Console.WriteLine($"[C:{name}] Arbeit beendet.");
}

// Start des Beispiels in Main:
Task.Run(() => Producer("A", 10)); // Producer A
Task.Run(() => Producer("B", 10)); // Producer B
Task.Run(() => Consumer("1"));    // Consumer 1
Task.Run(() => Consumer("2"));    // Consumer 2

Thread.Sleep(1000); // Zeit zum Arbeiten geben
cts.Cancel();       // Signal an Consumers, die Arbeit zu beenden
Thread.Sleep(500); // Zeit geben, damit die Consumers die Reste verarbeiten und schließen

Hier arbeiten mehrere Producers und Consumers gleichzeitig mit einer ConcurrentQueue ohne Datenraces: die Methoden Enqueue und TryDequeue sind atomar.

Wichtigkeit des Beendigungs-Signals (CancellationTokenSource)

Wir verwenden CancellationTokenSource (cts) zur Signalisierung an die Consumers, dass sie die Arbeit beenden sollen. Das ist kritisch für das Producer‑Consumer-Pattern:

  • Producers haben die Arbeit beendet. Wenn das Hinzufügen von Elementen fertig ist, sollen Consumers nicht ewig eine leere Queue abwarten.
  • Die Applikation soll herunterfahren. Consumers müssen sauber gestoppt werden.

CancellationTokenSource und CancellationToken bieten den standardisierten Mechanismus: der Consumer prüft periodisch IsCancellationRequested und ruft ggf. ThrowIfCancellationRequested() auf.

2. BlockingCollection<T>

Obwohl ConcurrentQueue<T> gut für Producer‑Consumer geeignet ist, erfordert sie manuelles Warten bei leerer Queue und eigene Signalisierung zur Beendigung. Für eine bequemere Implementierung gibt es in .NET BlockingCollection<T> — das ist keine eigenständige Collection, sondern ein Wrapper um eine beliebige IProducerConsumerCollection<T> (z.B. um ConcurrentQueue).

Vorteile von BlockingCollection:

  • Blockierende Operationen. Add()/Take() blockieren den Thread, wenn die Collection voll/leer ist. Man muss IsEmpty nicht manuell prüfen.
  • Größenbegrenzung. Man kann eine Kapazität (Capacity) setzen. Add() blockiert, wenn das Limit erreicht ist — praktisch zur Memory-Kontrolle.
  • Bequeme Finalisierung. CompleteAdding() signalisiert das Ende des Hinzufügens, und GetConsumingEnumerable() erlaubt dem Consumer, Elemente bis zum vollständigen Abschluss zu verarbeiten.

Beispiel: Producer‑Consumer mit BlockingCollection<T>

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

// BlockingCollection verwendet standardmäßig ConcurrentQueue
BlockingCollection<int> numbers = new BlockingCollection<int>(capacity: 10); // Queue mit Limit 10

void ProducerBC(int count)
{
    for (int i = 0; i < count; i++)
    {
        numbers.Add(i); // Blockiert, wenn die Queue voll ist
        Console.WriteLine($"[P] Hinzugefügt: {i}");
        Thread.Sleep(50);
    }
    numbers.CompleteAdding(); // Signalisiert, dass der Producer fertig ist
    Console.WriteLine("[P] Producer hat das Hinzufügen beendet.");
}

void ConsumerBC()
{
    // GetConsumingEnumerable blockiert, solange Elemente vorhanden sind oder bis CompleteAdding aufgerufen wurde
    foreach (var item in numbers.GetConsumingEnumerable())
    {
        Console.WriteLine($"[C] Verarbeitet: {item}");
        Thread.Sleep(100);
    }
    Console.WriteLine("[C] Consumer hat die Arbeit beendet.");
}

// Start des Beispiels in Main:
Task producerTask = Task.Run(() => ProducerBC(15)); // 15 Items, Limit 10
Task consumerTask = Task.Run(ConsumerBC);
Task.WaitAll(producerTask, consumerTask); // Auf Beendigung warten

Beachte, wie sauberer der Consumer-Code dank GetConsumingEnumerable() ist. Wenn du blockierende Operationen oder eine Größenbegrenzung brauchst — BlockingCollection ist dein Werkzeug.

3. Zusätzliche Methoden und Eigenschaften von Concurrent-Collections

IsEmpty, Count

  • IsEmpty (bool): ob die Collection leer ist.
  • Count (int): aktuelle Anzahl der Elemente.

Beispiel: Nutzung von IsEmpty und Count

using System.Collections.Concurrent;

ConcurrentQueue<string> q = new ConcurrentQueue<string>();
Console.WriteLine($"Ist die Queue leer? {q.IsEmpty}"); // True

q.Enqueue("A");
q.Enqueue("B");
Console.WriteLine($"Anzahl Elemente in der Queue: {q.Count}"); // 2
Console.WriteLine($"Ist die Queue leer? {q.IsEmpty}"); // False

q.TryDequeue(out var itemA);
Console.WriteLine($"Anzahl Elemente in der Queue: {q.Count}"); // 1

Konvertierung zu Arrays (ToArray())

Alle Concurrent-Collections stellen die Methode ToArray() zur Verfügung, die einen Momentaufnahme der Elemente zurückgibt.

Beispiel: Nutzung von ToArray()

using System.Collections.Concurrent;

ConcurrentStack<int> s = new ConcurrentStack<int>();
s.Push(10);
s.Push(20);
s.Push(30);

int[] items = s.ToArray(); // Erstellt ein neues Array: [30, 20, 10] (für Stack LIFO)
Console.WriteLine($"Elemente im Array: {string.Join(", ", items)}");

// Die Collection bleibt unverändert
Console.WriteLine($"Anzahl Elemente im Stack nach ToArray: {s.Count}"); // 3

Leeren von Collections

In .NET 6+ haben viele Concurrent-Collections die Methode Clear() zum Entfernen aller Elemente bekommen.

Beispiel: Collection leeren

using System.Collections.Concurrent;

ConcurrentBag<string> bag = new ConcurrentBag<string>();
bag.Add("Alpha");
bag.Add("Beta");
Console.WriteLine($"Elemente im Bag: {bag.Count}"); // 2

bag.Clear(); // Collection leeren
Console.WriteLine($"Elemente im Bag nach Clear: {bag.Count}"); // 0
Console.WriteLine($"Bag leer? {bag.IsEmpty}"); // True

4. Besonderheiten des Verhaltens von Concurrent-Collections

Wichtig ist, dass man sich der "Momentaufnahme" der Daten bewusst ist. Eigenschaften wie Count und Ergebnisse von ToArray() spiegeln den Zustand der Collection zu einem bestimmten Zeitpunkt wider. Bei parallelen Änderungen kann dieser Wert sofort veraltet sein.

Beispiel: Count und ToArray() — "Momentaufnahme"

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> snapshotQueue = new ConcurrentQueue<int>();

void AddItemsContinuously()
{
    for (int i = 0; i < 1000; i++)
    {
        snapshotQueue.Enqueue(i);
        Thread.Sleep(1); 
    }
}

// Start in Main:
Task.Run(AddItemsContinuously); // Thread, der kontinuierlich hinzufügt

Thread.Sleep(100); // Etwas Zeit zum Hinzufügen geben
Console.WriteLine($"Aktueller Count: {snapshotQueue.Count}"); // Kann 50, 80, 120... sein
Thread.Sleep(100);
Console.WriteLine($"Aktueller Count nochmal: {snapshotQueue.Count}"); // Wird einen anderen Wert haben
int[] currentItems = snapshotQueue.ToArray();
Console.WriteLine($"Anzahl Elemente in ToArray(): {currentItems.Length}"); // Kann vom letzten Count abweichen

Verlasse dich nicht auf Count als strikte Garantie für die aktuelle Anzahl von Elementen während aktiver Änderungen.

5. Feinheiten der Iteration über Concurrent-Collections

Einzelne Operationen (Add, TryTake, TryPop, GetOrAdd usw.) sind thread-safe. Aber eine Iteration mit foreach über eine Collection, die gleichzeitig von anderen Threads modifiziert wird, garantiert nicht, dass du alle Elemente siehst oder nur die aktuellen — es kann zu Auslassungen und Überraschungen kommen.

Beispiel: Iteration während Modifikation

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> iterQueue = new ConcurrentQueue<int>();

// Anfangselemente hinzufügen
for (int i = 0; i < 10; i++) iterQueue.Enqueue(i);

// Modifier-Thread
void Modifier()
{
    for (int i = 10; i < 20; i++)
    {
        iterQueue.Enqueue(i); // Neue Elemente hinzufügen
        Thread.Sleep(50);
    }
}

// Iterator-Thread
void Iterator()
{
    Console.WriteLine("Beginne Iteration...");
    int count = 0;
    foreach (var item in iterQueue) // Iterieren
    {
        Console.Write($"{item} ");
        count++;
        Thread.Sleep(30); // Arbeit simulieren, damit der Modifier die Collection ändern kann
    }
    Console.WriteLine($"\nIteration beendet. Gelesen: {count} Elemente.");
    Console.WriteLine($"Aktuelle Anzahl in der Queue: {iterQueue.Count}");
}

// Start des Beispiels in Main:
Task.Run(Modifier);
Task.Run(Iterator);
Thread.Sleep(1500); // Zeit zum Arbeiten geben

Regel: Wenn du eine feste Menge an Elementen brauchst (z. B. für einen Bericht), nimm zuerst ein "Snapshot" via ToArray() und iteriere dann darüber:

// Richtiger Weg zu iterieren, wenn die Collection sich ändern kann
int[] snapshot = iterQueue.ToArray();
foreach (var item in snapshot)
{
    // Jetzt iterierst du über ein unveränderliches Array-Snapshot
}

Damit beenden wir unseren Deep-Dive in fortgeschrittene Patterns und Besonderheiten von Concurrent-Collections: Wir haben Producer‑Consumer mit mehreren Teilnehmern und BlockingCollection betrachtet und die wichtigen Feinheiten zu Count, ToArray() und Iteration besprochen.

Kommentare
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION