CodeGym /Kursy /C# SELF /Kolejki i stosy Producer-Consumer

Kolejki i stosy Producer-Consumer

C# SELF
Poziom 58 , Lekcja 1
Dostępny

1. Wprowadzenie

Zacznijmy od podstaw. Kolejka to fundamentalna struktura danych działająca na zasadzie FIFO (First-In, First-Out), czyli "kto pierwszy, ten pierwszy obsłużony". Wyobraź sobie zwykłą kolejkę w sklepie: kto pierwszy stanął, ten pierwszy jest obsłużony.

W programowaniu wielowątkowym wzorzec Producent-Konsument (Producer-Consumer) jest jednym z najczęściej spotykanych i najpotężniejszych.

  • Producenci (Producers) — to wątki lub części aplikacji, które tworzą dane lub zadania i umieszczają je w wspólnej kolejce. Oni "produkują" pracę.
  • Konsumenci (Consumers) — to wątki lub części aplikacji, które pobierają dane lub zadania z kolejki i je przetwarzają. Oni " konsumują" pracę.

Ten wzorzec pomaga kontrolować przepływ danych, oddziela komponenty systemu (producent nie musi wiedzieć, kto i jak przetworzy dane), czyni go bardziej responsywnym i pomaga równomiernie rozkładać obciążenie między wątkami.

Przykład: ConcurrentQueue — dodawanie i pobieranie

Zobaczmy, jak dodawać i pobierać elementy z ConcurrentQueue<T>.

using System.Collections.Concurrent;

ConcurrentQueue<string> tasks = new ConcurrentQueue<string>();

// Dodawanie elementów (producent)
tasks.Enqueue("Pobrać plik");
tasks.Enqueue("Przetworzyć obraz");
Console.WriteLine($"Zadań w kolejce: {tasks.Count}"); // Wyjście: Zadań w kolejce: 2

// Pobieranie elementów (konsument)
if (tasks.TryDequeue(out string task1))
{
    Console.WriteLine($"Zadanie wykonane: {task1}"); // Wyjście: Zadanie wykonane: Pobrać plik
}

if (tasks.TryDequeue(out string task2))
{
    Console.WriteLine($"Zadanie wykonane: {task2}"); // Wyjście: Zadanie wykonane: Przetworzyć obraz
}

if (!tasks.TryDequeue(out string emptyTask))
{
    Console.WriteLine("Kolejka pusta, brak nowych zadań."); // Wyjście: Kolejka pusta, brak nowych zadań.
}

Podstawy działania: Enqueue(), TryDequeue()

Enqueue(T item): Używane do dodawania elementu na koniec kolejki. Ta operacja jest bezpieczna dla wątków. Możesz jednocześnie wywoływać Enqueue z 10 różnych wątków i wszystkie elementy zostaną poprawnie dodane.

TryDequeue(out T item): Używane do próby pobrania elementu z początku kolejki. To kluczowa metoda dla konsumentów. Zwraca true, jeśli element został pomyślnie pobrany (wartość trafia do parametru wyjściowego item), i false, jeśli kolejka jest pusta. Ważne, że TryDequeue nie blokuje wątku, jeśli kolejka jest pusta.

2. Znaczenie TryDequeue() i atomowość operacji

Metoda TryDequeue() nie jest tylko wygodna; jest krytycznie ważna dla poprawnej, wątkowo-bezpiecznej pracy. Jest atomowa: sprawdzenie, czy kolejka jest pusta, i samo pobranie elementu odbywają się jako jedna niepodzielna operacja.

Gdybyśmy mieli osobne metody IsEmpty (sprawdzić, czy kolejka jest pusta) i Dequeue (pobrać element), to między ich wywołaniami inny wątek mógłby opróżnić kolejkę. W efekcie twój Dequeue rzuciłby wyjątek lub zwrócił niepoprawne dane. TryDequeue całkowicie eliminuje taką sytuację.

Przykład: Producer-Consumer z kilkoma wątkami

Tutaj uruchamiamy dwa wątki-producenci i jeden wątek-konsument.

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> dataQueue = new ConcurrentQueue<int>();
bool producersDone = false; // Flaga sygnalizująca konsumentowi

void Producer(int start, int count)
{
    for (int i = 0; i < count; i++)
    {
        dataQueue.Enqueue(start + i);
        Console.WriteLine($"[P] Dodałem: {start + i}");
        Thread.Sleep(10); 
    }
}

void Consumer()
{
    while (!producersDone || dataQueue.Count > 0) // Kontynuujemy, dopóki są dane lub producenci pracują
    {
        if (dataQueue.TryDequeue(out int item))
        {
            Console.WriteLine($"[K] Przetworzyłem: {item}");
        }
        else
        {
            Thread.Sleep(50); // Czekamy, jeśli kolejka pusta
        }
    }
    Console.WriteLine("[K] Zakończyłem pracę.");
}

// Uruchomienie przykładu w Main:
// Task.Run(() => Producer(1, 5));
// Task.Run(() => Producer(100, 5)); // Drugi producent
// Task.Run(() => Consumer());
// Thread.Sleep(600); // Dajemy wątkom czas na pracę
// producersDone = true; // Sygnalizujemy, że producenci skończyli
// Thread.Sleep(200); // Dajemy czas konsumentowi na pobranie reszty

Zwróć uwagę, że w tym prostym przykładzie flaga producersDone i Thread.Sleep są użyte do imitacji zakończenia. W realnych aplikacjach do bardziej niezawodnej synchronizacji zakończenia często stosuje się CancellationTokenSource lub BlockingCollection<T>.

ConcurrentQueue<T> idealnie nadaje się do scenariuszy, gdzie:

  • Kolejność przetwarzania elementów jest ważna (FIFO).
  • Wiele wątków dodaje elementy i/lub wiele wątków je pobiera.
  • Potrzebna jest wysoka wydajność bez ręcznego zarządzania lockami.

3. Stos dla producer-consumer (LIFO)

Stos to inna fundamentalna struktura danych, działająca na zasadzie LIFO (Last-In, First-Out), czyli "ostatni przyszedł – pierwszy wyszedł". Wyobraź sobie stos talerzy: zawsze bierzesz ten na wierzchu, a nowy talerz kładziesz na samą górę stosu.

ConcurrentStack<T> jest równie bezpieczny dla wątków jak ConcurrentQueue<T> i też może być użyty we wzorcu "producent-konsument", ale z odwróconą kolejnością przetwarzania.

Przykład: ConcurrentStack — dodawanie i pobieranie

using System.Collections.Concurrent;

ConcurrentStack<string> commandStack = new ConcurrentStack<string>();

// Dodawanie komend (producent)
commandStack.Push("Zaznaczyć tekst");
commandStack.Push("Zmienić font");
commandStack.Push("Zapisz dokument");
Console.WriteLine($"Komend na stosie: {commandStack.Count}"); // Wyjście: Komend na stosie: 3

// Pobieranie komend (konsument)
if (commandStack.TryPop(out string cmd1))
{
    Console.WriteLine($"Cofnięto komendę: {cmd1}"); // Wyjście: Cofnięto komendę: Zapisz dokument
}

if (commandStack.TryPop(out string cmd2))
{
    Console.WriteLine($"Cofnięto komendę: {cmd2}"); // Wyjście: Cofnięto komendę: Zmienić font
}

if (!commandStack.TryPop(out string emptyCmd))
{
    Console.WriteLine("Stos komend pusty."); // Wyjście: Stos komend pusty.
}

4. Podstawy działania: Push(), TryPop()

Push(T item): Używane do dodawania elementu na wierzch stosu. Operacja jest bezpieczna dla wątków.

TryPop(out T item): Używane do próby pobrania elementu z wierzchu stosu. Zwraca true, jeśli element został pomyślnie pobrany, i false, jeśli stos jest pusty. Podobnie jak TryDequeue, to atomowa operacja zapobiegająca race condition.

Przykład: użycie ConcurrentStack dla poola obiektów

Stos świetnie nadaje się do implementacji pooli obiektów: bierzesz — używasz — zwracasz.

using System.Collections.Concurrent;

class Connection { /* Prosty stub */ public Guid Id { get; } = Guid.NewGuid(); }

ConcurrentStack<Connection> connectionPool = new ConcurrentStack<Connection>();

// Wypełniamy pool początkowymi połączeniami
for (int i = 0; i < 3; i++)
{
    connectionPool.Push(new Connection());
}
Console.WriteLine($"Połączeń w poolu: {connectionPool.Count}"); // Wyjście: Połączeń w poolu: 3

void UseConnection()
{
    if (connectionPool.TryPop(out Connection conn))
    {
        Console.WriteLine($"[Pool] Użyto połączenie: {conn.Id}");
        // Imitacja pracy z połączeniem
        Thread.Sleep(50); 
        connectionPool.Push(conn); // Zwracamy do poola
        Console.WriteLine($"[Pool] Zwrócono połączenie: {conn.Id}. W poolu: {connectionPool.Count}");
    }
    else
    {
        Console.WriteLine("[Pool] Pool połączeń pusty. Tworzymy nowe.");
        // Zwykle tutaj tworzy się nowe połączenie, jeśli pool jest pusty
        connectionPool.Push(new Connection()); 
    }
}

// Uruchomienie przykładu w Main:
Task.Run(() => UseConnection());
Task.Run(() => UseConnection());
Task.Run(() => UseConnection());
Thread.Sleep(500);

W tym przykładzie kilka wątków może bezpiecznie pobierać i zwracać połączenia do wspólnego poola.

5. Przykłady zastosowań i porównanie z ConcurrentQueue

ConcurrentStack<T> stosuje się, gdy:

  • Kolejność LIFO jest krytyczna (np. historia operacji do funkcji "Cofnij").
  • Potrzebny jest szybki dostęp do najnowszych dodanych elementów (często są "gorące" w cache CPU).
  • Implementowane są algorytmy oparte na stosach (przegląd grafu w głąb, analiza składni wyrażeń).

Porównanie i wybór odpowiedniej kolekcji

Kolekcja Kolejność Zalety Typowe scenariusze
ConcurrentQueue
FIFO (Pierwszy przyszedł, pierwszy wyszedł) Zapewnia uczciwe przetwarzanie w kolejności przyjścia Kolejki zadań, logowanie, obsługa przychodzących żądań, event busy
ConcurrentStack
LIFO (Ostatni przyszedł, pierwszy wyszedł) Szybki dostęp do ostatnio dodanych elementów Historia działań (Undo/Redo), pool-e obiektów, algorytmy przeglądu (DFS)

Wybór między ConcurrentQueue a ConcurrentStack zależy całkowicie od wymaganego porządku przetwarzania elementów w twoim scenariuszu producer-consumer. Obie kolekcje zapewniają wysoką wydajność i bezpieczeństwo wielowątkowe "out of the box", odciążając cię od ręcznej synchronizacji i pomagając budować skalowalne systemy wielowątkowe.

Komentarze
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION