CodeGym /Kursy /C# SELF /Wprowadzenie do Concurrent...

Wprowadzenie do Concurrent-kolekcji

C# SELF
Poziom 58 , Lekcja 0
Dostępny

1. Tło problemu

W aplikacji jednowątkowej kolekcje takie jak List<T>, Dictionary<T> działają przewidywalnie. Ale gdy ta sama kolekcja zaczyna być używana jednocześnie przez kilka wątków, pojawia się znany problem: race conditions.

Jeśli kilka wątków próbuje czytać i/lub zapisywać dane do tej samej kolekcji bez odpowiedniej synchronizacji, możesz dostać:

  • Niepoprawne dane: element mógł zostać usunięty przez jeden wątek, podczas gdy inny próbował go zaktualizować.
  • Utrata danych: jeden wątek dodał element, a inny go nadpisał, nie wiedząc o poprzednim zapisie.
  • Wyjątki: kolekcja może znaleźć się w nieprawidłowym stanie i dostaniesz InvalidOperationException (np. "Collection was modified; enumeration operation may not execute.") albo nawet NullReferenceException.

Przykład 1: Race condition w List<T> (proste dodawanie)

Dwa wątki jednocześnie inkrementują ten sam element listy.

using System.Collections.Generic;
using System.Threading.Tasks; // Dla Task.Run

class RaceConditionExample
{
    static List<int> numbers = new List<int> { 0 }; // Lista z jednym elementem

    static void Main(string[] args)
    {
        Console.WriteLine("Początkowa wartość: " + numbers[0]); // 0

        // Uruchamiamy dwa wątki, z których każdy inkrementuje numbers[0]
        Task task1 = Task.Run(() => IncrementNumbers(500_000));
        Task task2 = Task.Run(() => IncrementNumbers(500_000));

        Task.WaitAll(task1, task2); // Czekamy na zakończenie obu wątków

        Console.WriteLine("Końcowa wartość: " + numbers[0]); // Oczekujemy 1_000_000, ale...
        // Wynik niemal zawsze będzie mniejszy niż 1_000_000!
    }

    static void IncrementNumbers(int count)
    {
        for (int i = 0; i < count; i++)
        {
            // Ta operacja "numbers[0]++" w rzeczywistości składa się z 3 kroków:
            // 1. Odczytać numbers[0]
            // 2. Zwiększyć wartość o 1
            // 3. ZAPISAĆ nową wartość z powrotem do numbers[0]
            numbers[0]++; 
        }
    }
}

Dlaczego to race? Jeśli wątek A odczytał numbers[0] (wartość 0), a potem wątek B odczytał numbers[0] (także 0) zanim A zdążył zapisać 1, to oba wątki zwiększą 0 do 1 i zapiszą 1. Jedno inkrementowanie zaginie. Operacja numbers[0]++ nie jest atomowa.

Przykład 2: InvalidOperationException przy modyfikacji Dictionary

Jeden wątek iteruje słownik, inny — go modyfikuje.

using System.Collections.Generic;
using System.Threading; // Dla Thread.Sleep

class DictionaryRaceExample
{
    static Dictionary<int, string> users = new Dictionary<int, string>();

    static void Main(string[] args)
    {
        // Inicjalizacja słownika
        for (int i = 0; i < 5; i++) users.Add(i, $"User {i}");

        // Wątek-czytelnik
        Thread readerThread = new Thread(() =>
        {
            try
            {
                foreach (var user in users) // Iteracja po słowniku
                {
                    Console.WriteLine($"Czytelnik: {user.Key} - {user.Value}");
                    Thread.Sleep(10); // Symulacja pracy
                }
            }
            catch (InvalidOperationException ex)
            {
                Console.WriteLine($"Czytelnik: BŁĄD! {ex.Message}");
            }
        });

        // Wątek-zapisujący
        Thread writerThread = new Thread(() =>
        {
            Thread.Sleep(5); // Dajemy czytelnikowi trochę czasu żeby zacząć
            for (int i = 5; i < 10; i++)
            {
                users.Add(i, $"Nowy User {i}"); // Dodajemy elementy
                Console.WriteLine($"Pisarz: Dodał User {i}");
                Thread.Sleep(15);
            }
        });

        readerThread.Start();
        writerThread.Start();

        readerThread.Join(); // Czekamy na zakończenie wątków
        writerThread.Join();
        Console.WriteLine("Przykład zakończony.");
    }
}

Dlaczego występuje błąd? Dictionary<TKey, TValue> (jak i List<T>) nie jest przeznaczony do jednoczesnego czytania i zapisu przez różne wątki bez synchronizacji. Gdy wątek-zapisujący zmienia wewnętrzną strukturę, wątek-czytelnik kontynuuje foreach po już zmienionych danych, co prowadzi do InvalidOperationException.

2. Dlaczego proste blokady (lock) nie zawsze są optymalne?

Pomysł „owinąć wszystko w lock” wygląda prosto, ale ma swoje minusy:

// Zły przykład: za duża blokada
// (Tylko do demonstracji, tak nie robić!)
static object _lock = new object();
static List<int> _sharedList = new List<int>();

void AddItem(int item)
{
    lock (_lock)
    {
        _sharedList.Add(item);
    }
}

int GetItemCount()
{
    lock (_lock)
    {
        return _sharedList.Count;
    }
}
  • Wydajność (bottleneck): lock blokuje dostęp do całej kolekcji. Przy 100 wątkach 99 będzie czekać na jednego, nawet jeśli operacje nie konfliktują bezpośrednio.
  • Złożoność: trzeba pamiętać o lock w każdym miejscu użycia kolekcji. Jedno zapomniane miejsce — i race wraca.
  • Deadlocki: kilka lock na różnych obiektach łatwo prowadzi do deadlock.
  • Iteratory: foreach nie ratuje, jeśli inny wątek modyfikuje kolekcję.

Dlatego w .NET wprowadzono specjalne wątko-bezpieczne kolekcje.

Operacje atomowe

Wątko-bezpieczna kolekcja — gwarantuje poprawną pracę przy jednoczesnym dostępie z wielu wątków bez zewnętrznych blokad po stronie użytkownika. Klucz — operacje atomowe: działanie wykonuje się w całości albo wcale — inne wątki nie widzą „pół-stanów”.

  • Dodawanie, usuwanie, odczyt — zachowują się tak, jakby wykonywały się po kolei.
  • W środku używane są niskopoziomowe techniki: operacje interlocked (Interlocked), Compare-And-Swap (CAS), lekkie lockingi — zamiast globalnej blokady całej kolekcji.

3. Przegląd System.Collections.Concurrent

Namespace System.Collections.Concurrent dostarcza zestaw kolekcji stworzonych od podstaw dla wielowątkowości. Ich filozofia — maksymalny paralelizm i minimum blokad.

  • Wydajność: skaluje się wraz ze wzrostem liczby rdzeni.
  • Prostota: nie trzeba ręcznie stosować lock wokół każdej operacji.
  • Mniej błędów: znika klasa problemów z ręczną synchronizacją.
  • Optymalizacja pod konkurencję: efektywnie działają przy jednoczesnych dodaniach/usunięciach.

4. Główne klasy

ConcurrentQueue<T> (wątko-bezpieczna kolejka)

Zasada: FIFO — „kto pierwszy, ten pierwszy wychodzi”. Scenariusze: producer–consumer, logging, kolejki zadań.

using System.Collections.Concurrent;

ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();

void Producer() => messageQueue.Enqueue("Soobshchenie 1");

void Consumer()
{
    if (messageQueue.TryDequeue(out string message))
    {
        Console.WriteLine($"Przetworzone: {message}");
    }
    else
    {
        Console.WriteLine("Kolejka pusta.");
    }
}

ConcurrentStack<T> (wątko-bezpieczny stos)

Zasada: LIFO — „ostatni przyszedł, pierwszy wyszedł”. Scenariusze: historia akcji, DFS, pule obiektów.

using System.Collections.Concurrent;

ConcurrentStack<int> historyStack = new ConcurrentStack<int>();

void PushAction(int value) => historyStack.Push(value);

void PopAction()
{
    if (historyStack.TryPop(out int action))
    {
        Console.WriteLine($"Cofnięto akcję: {action}");
    }
    else
    {
        Console.WriteLine("Stos pusty.");
    }
}

ConcurrentBag<T> (wątko-bezpieczny "worek")

Nieuporządkowana kolekcja, kolejność nie jest gwarantowana. Zoptymalizowana pod scenariusz „wątek częściej bierze to, co sam włożył”. Świetna do pul.

using System.Collections.Concurrent;

ConcurrentBag<System.Guid> objectPool = new ConcurrentBag<System.Guid>();

void AddObject() => objectPool.Add(System.Guid.NewGuid());

void TakeObject()
{
    if (objectPool.TryTake(out System.Guid obj))
    {
        Console.WriteLine($"Wzięto obiekt: {obj}");
    }
    else
    {
        Console.WriteLine("Pula pusta.");
    }
}

ConcurrentDictionary<TKey, TValue> (wątko-bezpieczny słownik)

Wspiera atomowe operacje dodawania, aktualizacji i pobierania wartości po kluczu. Świetny do cache'ów, sesji, liczników.

using System.Collections.Concurrent;

ConcurrentDictionary<string, int> userScores = new ConcurrentDictionary<string, int>();

void UpdateScore(string user, int score)
{
    // Atomowo doda, jeśli nie ma, lub zaktualizuje, jeśli jest
    userScores.AddOrUpdate(user, score, (key, existingVal) => existingVal + score);
    Console.WriteLine($"Wynik {user}: {userScores[user]}");
}

void GetScore(string user)
{
    if (userScores.TryGetValue(user, out int score))
    {
        Console.WriteLine($"Bieżący wynik {user}: {score}");
    }
    else
    {
        Console.WriteLine($"Użytkownik {user} nie znaleziony.");
    }
}

5. Kiedy używać tych kolekcji zamiast zwykłych?

  • Aplikacja wielowątkowa: przy jednym wątku zwykłe kolekcje są szybsze (brak narzutu).
  • Jedna współdzielona kolekcja dla wielu wątków: to główny sygnał do użycia System.Collections.Concurrent.
  • Potrzebujesz wysokiej wydajności i skalowalności: kolekcje zaprojektowane pod minimalne oczekiwanie.
  • Chcesz uprościć kod: bez ręcznych lock-ów wokół każdej operacji.
  • Potrzebne operacje atomowe: dodanie/usunięcie/pobranie nie pozostawi kolekcji w niespójnym stanie.

Nie używaj Concurrent-kolekcji, gdy:

  • Aplikacja jest ściśle jednowątkowa.
  • Potrzebna jest "transakcyjność" wielu powiązanych operacji (może być potrzebna zewnętrzna synchronizacja lub inne mechanizmy).
  • Ważny jest ścisły porządek wyciągania tam, gdzie on nie jest gwarantowany (np. w ConcurrentBag<T>).
Komentarze
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION