CodeGym /Kursy /C# SELF /ConcurrentBag i

ConcurrentBag i ConcurrentDictionary

C# SELF
Poziom 58 , Lekcja 2
Dostępny

1. Wstęp

ConcurrentBag<T> — to wątkowo-bezpieczna, nieuporządkowana kolekcja. Jej główną cechą i zaletą jest słowo „Bag” (worek), co oznacza, że kolejność elementów nie jest gwarantowana przy pobieraniu. To znaczy, że element, który wyciągniesz, może nie być tym, którego oczekiwałeś na podstawie kolejności dodania. Za to ConcurrentBag ma unikalną optymalizację, która czyni ją wyjątkowo szybką w pewnych scenariuszach.

Cechy ConcurrentBag

Brak kolejności: W odróżnieniu od kolejek (FIFO) i stosów (LIFO), ConcurrentBag nie obiecuje, że TryTake() zwróci element w jakiejkolwiek konkretnej kolejności względem tego, jak został dodany. To kluczowa różnica.

Optymalizacja dla lokalnego dostępu (Thread-Local Storage): Główny powód istnienia ConcurrentBag — jej wydajność w scenariuszach, gdzie wątek, który dodał element, z dużym prawdopodobieństwem będzie tym samym wątkiem, który go pobierze.

Przykład: ConcurrentBag — dodawanie i pobieranie

using System.Collections.Concurrent;

ConcurrentBag<string> itemBag = new ConcurrentBag<string>();

// Dodawanie elementów
itemBag.Add("Pozycja A");
itemBag.Add("Pozycja B");
itemBag.Add("Pozycja C");

Console.WriteLine($"Elementów w worku: {itemBag.Count}"); // Wyjście: Elementów w worku: 3

// Pobieranie elementów (kolejność nie jest gwarantowana!)
if (itemBag.TryTake(out string item1))
{
    Console.WriteLine($"Pobrano: {item1}"); // Może być "Pozycja C", "Pozycja B" lub "Pozycja A"
}

if (itemBag.TryTake(out string item2))
{
    Console.WriteLine($"Pobrano: {item2}");
}

Console.WriteLine($"Pozostało elementów: {itemBag.Count}"); // Wyjście: Pozostało elementów: 1

Możesz uruchomić ten kod kilka razy i zauważyć, że kolejność pobranych elementów może się zmieniać.

Metody Add(), TryTake()

Add(T item): używana do dodania elementu do ConcurrentBag. Operacja jest wątkowo-bezpieczna.

TryTake(out T item): próba pobrania elementu z ConcurrentBag. Zwraca true, jeśli element został pomyślnie pobrany, oraz false, jeśli worek jest pusty. Ważne, że TryTake nie blokuje wątku.

2. Scenariusze użycia

ConcurrentBag — to nie zamiennik dla ConcurrentQueue lub ConcurrentStack. Błyszczy w specyficznych przypadkach:

Pule obiektów/zasobów: kiedy masz pulę wielokrotnego użytku obiektów i pożądane jest, żeby wątek, który zwrócił obiekt, częściej brał go ponownie. To zmniejsza konkurencję o wspólny zasób.

Dynamika przydziału zadań w TPL: wewnętrzna praca takich konstrukcji jak Parallel.ForEach i Parallel.For używa lokalnych worków i mechanizmu „work-stealing” do efektywnego rozdzielania pracy.

Pula zadań z ConcurrentBag i optymalizacją lokalności

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;

ConcurrentBag<string> taskPool = new ConcurrentBag<string>();

// Wypełniamy pulę początkowymi zadaniami
for (int i = 0; i < 10; i++)
{
    taskPool.Add($"Zadanie {i}");
}

void Worker()
{
    // Każdy wątek próbuje wziąć zadanie
    while (taskPool.TryTake(out string task))
    {
        Console.WriteLine($"Wątek {Thread.CurrentThread.ManagedThreadId}: Przetwarza {task}");
        Thread.Sleep(50); // Symulacja pracy
    }
    Console.WriteLine($"Wątek {Thread.CurrentThread.ManagedThreadId}: Zakończył pracę.");
}

// Uruchamiamy kilka wątków-pracowników
// Task.Run(Worker);
// Task.Run(Worker);
// Task.Run(Worker);
// Thread.Sleep(1000); // Dajemy czas na wykonanie

W tym przykładzie ConcurrentBag pozwala wątkom efektywnie pobierać zadania, minimalizując blokady dzięki wewnętrznej strukturze.

Wewnętrzna mechanika

ConcurrentBag osiąga wysoką wydajność dzięki użyciu lokalnego magazynu dla wątku (TLS). Kiedy wątek dodaje element, trafia on do struktury lokalnej dla tego wątku. Przy TryTake() najpierw czytana jest struktura lokalna; jeśli jest pusta — następuje „work-stealing” z innych wątków lub puli globalnej. To zmniejsza konkurencję i czyni kolekcję świetnym wyborem, kiedy ważna jest lokalność dostępu, a kolejność nie ma znaczenia.

3. Wątkowo-bezpieczny słownik

ConcurrentDictionary<TKey, TValue> — jeden z najczęściej używanych wątkowo-bezpiecznych typów kolekcji: wysokowydajny słownik do bezpiecznego dodawania, odczytu, aktualizacji i usuwania z wielu wątków.

Zwykły Dictionary<TKey, TValue> absolutnie nie jest wątkowo-bezpieczny. Każda zapisowa operacja (dodanie/zmiana/usunięcie) albo nawet odczyt w trakcie zapisu może doprowadzić do wyjątków (InvalidOperationException) lub uszkodzenia danych.

Przykład: problem zwykłego Dictionary (powtórzenie)

using System.Collections.Generic;
using System.Threading.Tasks;

Dictionary<int, int> concurrentDictProblem = new Dictionary<int, int>();

void AddToDict(int start, int count)
{
    for (int i = 0; i < count; i++)
    {
        // Próba równoczesnego dodawania/zmiany
        // Doprowadzi do wyjątków lub niepoprawnego zachowania
        concurrentDictProblem[start + i] = start + i;
    }
}

//Uruchomienie przykładu w Main:
try
{
    Task t1 = Task.Run(() => AddToDict(0, 10000));
    Task t2 = Task.Run(() => AddToDict(5000, 10000)); // Przecinające się klucze
    Task.WaitAll(t1, t2);
    Console.WriteLine($"Elementów w słowniku (problemowym): {concurrentDictProblem.Count}");
}
catch (Exception ex)
{
    Console.WriteLine($"Błąd w zwykłym słowniku: {ex.Message}");
}

Ten kod prawie na pewno wygeneruje wyjątek lub zawiesi się z powodu problemów z wątkowością.

4. Główne operacje

ConcurrentDictionary dostarcza wyspecjalizowane atomowe operacje „sprawdź + działaj”.

TryAdd(TKey key, TValue value): atomowo dodaje parę klucz-wartość. Zwraca true, jeśli klucz został dodany, oraz false, jeśli klucz już istnieje.

ConcurrentDictionary<string, int> scores = new ConcurrentDictionary<string, int>();
if (scores.TryAdd("Alice", 100))
    Console.WriteLine("Alice dodana."); // Wyjście: Alice dodana.
if (!scores.TryAdd("Alice", 150))
    Console.WriteLine("Alice już jest."); // Wyjście: Alice już jest.

TryGetValue(TKey key, out TValue value): atomowo pobiera wartość po kluczu.

if (scores.TryGetValue("Alice", out int aliceScore))
    Console.WriteLine($"Wynik Alice: {aliceScore}"); // Wyjście: Wynik Alice: 100

TryUpdate(TKey key, TValue newValue, TValue comparisonValue): atomowo aktualizuje wartość tylko jeśli aktualna jest równa comparisonValue. Zapobiega race conditions.

// Aktualna wartość Alice = 100
if (scores.TryUpdate("Alice", 120, 100)) // Zaktualizuje 100 na 120
    Console.WriteLine("Wynik Alice zaktualizowany do 120."); // Wyjście: Wynik Alice zaktualizowany do 120.
if (!scores.TryUpdate("Alice", 130, 100)) // Nie zaktualizuje, bo aktualne to 120, a nie 100
    Console.WriteLine("Wynik Alice nie zaktualizowany (przestarzałe dane)."); // Wyjście: ...

TryRemove(TKey key, out TValue value): atomowo usuwa element po kluczu.

if (scores.TryRemove("Alice", out int removedScore))
    Console.WriteLine($"Alice usunięta, wynik był: {removedScore}"); // Wyjście: Alice usunięta, wynik był: 120

5. Zaawansowane atomowe operacje

Te dwie metody są robotnikami roboczymi ConcurrentDictionary, pokrywając wiele scenariuszy.

GetOrAdd(TKey key, TValue valueFactory(TKey key)): atomowo zwraca istniejącą wartość po kluczu albo tworzy i dodaje nową przez fabrykę. Idealne dla cache'ów i unikalnych encji.

// Załóżmy, że cachujemy ciężkie obiekty
ConcurrentDictionary<int, HeavyObject> objectCache = new ConcurrentDictionary<int, HeavyObject>();

HeavyObject GetOrCreateHeavyObject(int id)
{
    // Jeśli już jest — zwróci go, inaczej stworzy i doda
    return objectCache.GetOrAdd(id, (key) =>
    {
        Console.WriteLine($"Tworzymy nowy HeavyObject dla ID: {key}");
        return new HeavyObject(key); // Symulacja tworzenia kosztownego obiektu
    });
}

// W Main:
HeavyObject obj1 = GetOrCreateHeavyObject(1); // Stworzy nowy
HeavyObject obj2 = GetOrCreateHeavyObject(2); // Stworzy nowy
HeavyObject obj3 = GetOrCreateHeavyObject(1); // Zwróci istniejący obj1

AddOrUpdate(TKey key, TValue addValue, Func<TKey, TValue, TValue> updateValueFactory): atomowo dodaje wartość, jeśli klucz nie istnieje, lub aktualizuje istniejącą przez fabrykę.

  • addValue: wartość do dodania, jeśli klucz nie zostanie znaleziony.
  • updateValueFactory: funkcja, obliczająca nową wartość na podstawie klucza i aktualnej wartości.
// Zliczanie odsłon strony
ConcurrentDictionary<string, int> pageViews = new ConcurrentDictionary<string, int>();

void IncrementPageView(string page)
{
    pageViews.AddOrUpdate(page, 1, // Jeśli strona nowa, dodaj 1
                          (key, existingVal) => existingVal + 1); // Inaczej zwiększ o 1
    Console.WriteLine($"Strona '{page}' odwiedzona {pageViews[page]} razy.");
}

// W Main:
IncrementPageView("Home");   // Home: 1
IncrementPageView("About");  // About: 1
IncrementPageView("Home");   // Home: 2
IncrementPageView("Home");   // Home: 3
IncrementPageView("Contact"); // Contact: 1

6. Przykłady użycia dla cache'owania lub zarządzania stanami

Cache'owanie danych: ConcurrentDictionary — świetny wybór dla in-memory cache: GetOrAdd zapobiega wielokrotnemu tworzeniu kosztownych obiektów.

Zarządzanie sesjami użytkowników: bezpieczne przechowywanie i aktualizacja danych sesji z różnych żądań.

Zliczanie statystyk: z pomocą AddOrUpdate wygodnie inkrementować liczniki zdarzeń, odsłon, głosów itp.

Rejestry/Service Locator: przechowywanie zarejestrowanych serwisów lub pluginów dostępnych z różnych wątków.

ConcurrentDictionary<TKey, TValue> — wysokooptymalizowana kolekcja, znacząco upraszczająca wielowątkowy development ze słownikami dzięki zestawowi atomowych operacji bez ręcznej synchronizacji.

Komentarze
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION