CodeGym /Corsi /C# SELF /ConcurrentBag e

ConcurrentBag e ConcurrentDictionary

C# SELF
Livello 58 , Lezione 2
Disponibile

1. Introduzione

ConcurrentBag<T> è una collezione thread-safe, non ordinata. La sua caratteristica principale e il vantaggio stanno nella parola "Bag" (sacco), che implica che l'ordine degli elementi non è garantito al momento dell'estrazione. Questo significa che l'elemento che estrai potrebbe non essere quello che ti aspettavi in base all'ordine di inserimento. Però ConcurrentBag ha un'ottimizzazione unica che la rende estremamente veloce in certi scenari.

Caratteristiche di ConcurrentBag

Assenza di ordine: A differenza di code (FIFO) e stack (LIFO), ConcurrentBag non garantisce che TryTake() ti restituisca un elemento in un ordine specifico relativo a come è stato aggiunto. Questa è una differenza chiave.

Ottimizzazione per accesso locale (Thread-Local Storage): La ragione principale dell'esistenza di ConcurrentBag è le sue prestazioni in scenari dove il thread che ha aggiunto un elemento molto probabilmente sarà lo stesso thread che lo estrarrà.

Esempio: ConcurrentBag — aggiunta ed estrazione

using System.Collections.Concurrent;

ConcurrentBag<string> itemBag = new ConcurrentBag<string>();

// Aggiunta di elementi
itemBag.Add("Punto A");
itemBag.Add("Punto B");
itemBag.Add("Punto C");

Console.WriteLine($"Elementi nel sacco: {itemBag.Count}"); // Output: Elementi nel sacco: 3

// Estrazione degli elementi (l'ordine non è garantito!)
if (itemBag.TryTake(out string item1))
{
    Console.WriteLine($"Estratto: {item1}"); // Può essere "Punto C", "Punto B" o "Punto A"
}

if (itemBag.TryTake(out string item2))
{
    Console.WriteLine($"Estratto: {item2}");
}

Console.WriteLine($"Elementi rimasti: {itemBag.Count}"); // Output: Elementi rimasti: 1

Puoi eseguire questo codice più volte e notare che l'ordine degli elementi estratti può cambiare.

Metodi Add(), TryTake()

Add(T item): usato per aggiungere un elemento a ConcurrentBag. L'operazione è thread-safe.

TryTake(out T item): tenta di estrarre un elemento da ConcurrentBag. Ritorna true se l'elemento è stato estratto con successo, e false se il sacco è vuoto. Importante: TryTake non blocca il thread.

2. Scenari d'uso

ConcurrentBag non è un sostituto di ConcurrentQueue o ConcurrentStack. Brilla in casi specifici:

Pool di oggetti/risorse: quando hai un pool di oggetti riutilizzabili, ed è preferibile che il thread che ha restituito l'oggetto lo riprenda più spesso. Questo riduce la contesa su una risorsa condivisa.

Distribuzione dinamica dei task in TPL: il funzionamento interno di costrutti come Parallel.ForEach e Parallel.For usa sacchi locali e meccanismi di "work-stealing" per distribuire efficacemente il lavoro.

Pool di task con ConcurrentBag e ottimizzazione della località

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;

ConcurrentBag<string> taskPool = new ConcurrentBag<string>();

// Riempiamo il pool con task iniziali
for (int i = 0; i < 10; i++)
{
    taskPool.Add($"Task {i}");
}

void Worker()
{
    // Ogni thread prova a prendere un task
    while (taskPool.TryTake(out string task))
    {
        Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: Elabora {task}");
        Thread.Sleep(50); // Simulazione di lavoro
    }
    Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: Ha finito il lavoro.");
}

// Avviamo alcuni worker
// Task.Run(Worker);
// Task.Run(Worker);
// Task.Run(Worker);
// Thread.Sleep(1000); // Diamo tempo all'esecuzione

In questo esempio ConcurrentBag permette ai thread di prendere task in modo efficiente, minimizzando i lock grazie alla struttura interna.

Meccanica interna

ConcurrentBag ottiene alte prestazioni usando lo storage thread-local (TLS). Quando un thread aggiunge un elemento, questo viene messo in una struttura locale al thread. Alla chiamata di TryTake() si legge prima la struttura locale; se è vuota — avviene il "work-stealing" dagli altri thread o dal pool globale. Questo riduce la contesa e rende la collezione una scelta eccellente quando la località d'accesso è importante e l'ordine non lo è.

3. Dizionario thread-safe

ConcurrentDictionary<TKey, TValue> è una delle collezioni thread-safe più usate: un dizionario ad alte prestazioni per operazioni sicure di aggiunta, lettura, aggiornamento e rimozione da più thread.

Un normale Dictionary<TKey, TValue> non è assolutamente thread-safe. Qualsiasi scrittura (aggiunta/modifica/rimozione) o anche una lettura durante una scrittura può portare a eccezioni (InvalidOperationException) o corruzione dei dati.

Esempio: problema del normale Dictionary (ripetizione)

using System.Collections.Generic;
using System.Threading.Tasks;

Dictionary<int, int> concurrentDictProblem = new Dictionary<int, int>();

void AddToDict(int start, int count)
{
    for (int i = 0; i < count; i++)
    {
        // Tentativo di aggiunta/modifica simultanea
        // Porterà a eccezioni o comportamento incorretto
        concurrentDictProblem[start + i] = start + i;
    }
}

// Esecuzione dell'esempio in Main:
try
{
    Task t1 = Task.Run(() => AddToDict(0, 10000));
    Task t2 = Task.Run(() => AddToDict(5000, 10000)); // Sovrapposizione di chiavi
    Task.WaitAll(t1, t2);
    Console.WriteLine($"Elementi nel dizionario (problematico): {concurrentDictProblem.Count}");
}
catch (Exception ex)
{
    Console.WriteLine($"Errore nel dizionario normale: {ex.Message}");
}

Questo codice quasi sicuramente lancerà un'eccezione o si bloccherà a causa di problemi di thread-safety.

4. Operazioni principali

ConcurrentDictionary fornisce operazioni atomiche "check + act" specializzate.

TryAdd(TKey key, TValue value): aggiunge atomica-mente una coppia chiave-valore. Ritorna true se la chiave è stata aggiunta, e false se la chiave esiste già.

ConcurrentDictionary<string, int> scores = new ConcurrentDictionary<string, int>();
if (scores.TryAdd("Alice", 100))
    Console.WriteLine("Alice aggiunta."); // Output: Alice aggiunta.
if (!scores.TryAdd("Alice", 150))
    Console.WriteLine("Alice è già presente."); // Output: Alice è già presente.

TryGetValue(TKey key, out TValue value): ottiene atomica-mente il valore per una chiave.

if (scores.TryGetValue("Alice", out int aliceScore))
    Console.WriteLine($"Punteggio di Alice: {aliceScore}"); // Output: Punteggio di Alice: 100

TryUpdate(TKey key, TValue newValue, TValue comparisonValue): aggiorna atomica-mente il valore solo se l'attuale è uguale a comparisonValue. Previene race condition.

// Valore corrente di Alice = 100
if (scores.TryUpdate("Alice", 120, 100)) // Aggiorna 100 a 120
    Console.WriteLine("Punteggio di Alice aggiornato a 120."); // Output: Punteggio di Alice aggiornato a 120.
if (!scores.TryUpdate("Alice", 130, 100)) // Non aggiorna perché l'attuale è 120, non 100
    Console.WriteLine("Punteggio di Alice non aggiornato (dati obsoleti)."); // Output: ...

TryRemove(TKey key, out TValue value): rimuove atomica-mente l'elemento per chiave.

if (scores.TryRemove("Alice", out int removedScore))
    Console.WriteLine($"Alice rimossa, il punteggio era: {removedScore}"); // Output: Alice rimossa, il punteggio era: 120

5. Operazioni atomiche avanzate

Questi due metodi sono i cavalli da lavoro di ConcurrentDictionary, coprendo molti scenari.

GetOrAdd(TKey key, TValue valueFactory(TKey key)): ritorna atomica-mente il valore esistente per la chiave oppure crea e aggiunge un nuovo valore tramite la factory. Ideale per cache e entità uniche.

// Supponiamo di fare caching di oggetti pesanti
ConcurrentDictionary<int, HeavyObject> objectCache = new ConcurrentDictionary<int, HeavyObject>();

HeavyObject GetOrCreateHeavyObject(int id)
{
    // Se esiste già — lo ritorna, altrimenti lo crea e aggiunge
    return objectCache.GetOrAdd(id, (key) =>
    {
        Console.WriteLine($"Creazione di un nuovo HeavyObject per ID: {key}");
        return new HeavyObject(key); // Simulazione di creazione di un oggetto costoso
    });
}

// In Main:
HeavyObject obj1 = GetOrCreateHeavyObject(1); // Crea nuovo
HeavyObject obj2 = GetOrCreateHeavyObject(2); // Crea nuovo
HeavyObject obj3 = GetOrCreateHeavyObject(1); // Restituisce l'esistente obj1

AddOrUpdate(TKey key, TValue addValue, Func<TKey, TValue, TValue> updateValueFactory): aggiunge atomica-mente il valore se la chiave non esiste, oppure aggiorna l'esistente tramite una factory.

  • addValue: valore da aggiungere se la chiave non è trovata.
  • updateValueFactory: funzione che calcola il nuovo valore basandosi sulla chiave e sul valore attuale.
// Conteggio delle visite a pagine
ConcurrentDictionary<string, int> pageViews = new ConcurrentDictionary<string, int>();

void IncrementPageView(string page)
{
    pageViews.AddOrUpdate(page, 1, // Se la pagina è nuova, aggiungi 1
                          (key, existingVal) => existingVal + 1); // Altrimenti incrementa di 1
    Console.WriteLine($"La pagina '{page}' è stata visitata {pageViews[page]} volte.");
}

// In Main:
IncrementPageView("Home");   // Home: 1
IncrementPageView("About");  // About: 1
IncrementPageView("Home");   // Home: 2
IncrementPageView("Home");   // Home: 3
IncrementPageView("Contact"); // Contact: 1

6. Esempi d'uso per caching o gestione degli stati

Caching dei dati: ConcurrentDictionary è un'ottima scelta per cache in-memory: GetOrAdd evita la ricreazione di oggetti costosi.

Gestione delle sessioni utente: memorizzazione sicura e aggiornamento dei dati di sessione provenienti da richieste diverse.

Conteggio delle statistiche: con AddOrUpdate è comodo incrementare contatori di eventi, visualizzazioni, voti ecc.

Registri/Service Locator: conservare servizi o plugin registrati, accessibili da diversi thread.

ConcurrentDictionary<TKey, TValue> è una collezione altamente ottimizzata che semplifica notevolmente lo sviluppo multithread con dizionari grazie a un set di operazioni atomiche senza bisogno di sincronizzazione manuale.

Commenti
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION