1. Introducción
ConcurrentBag<T> es una colección thread-safe, no ordenada. Su característica y ventaja principal está en la palabra "Bag" (bolsa), lo que implica que el orden de los elementos no está garantizado al extraerlos. Esto significa que el elemento que saques puede no ser el que esperabas según el orden de inserción. A cambio, ConcurrentBag tiene una optimización única que la hace extremadamente rápida en ciertos escenarios.
Características de ConcurrentBag
Sin orden: A diferencia de colas (FIFO) y pilas (LIFO), ConcurrentBag no promete que TryTake() te devuelva un elemento en un orden específico respecto a cómo fue añadido. Esta es una diferencia clave.
Optimización para acceso local (Thread-Local Storage): La razón principal de existencia de ConcurrentBag es su rendimiento en escenarios donde el hilo que añadió el elemento probablemente será el mismo que lo saque.
Ejemplo: ConcurrentBag — añadir y extraer
using System.Collections.Concurrent;
ConcurrentBag<string> itemBag = new ConcurrentBag<string>();
// Añadir elementos
itemBag.Add("Punto A");
itemBag.Add("Punto B");
itemBag.Add("Punto C");
Console.WriteLine($"Elementos en la bolsa: {itemBag.Count}"); // Salida: Elementos en la bolsa: 3
// Extraer elementos (¡el orden no está garantizado!)
if (itemBag.TryTake(out string item1))
{
Console.WriteLine($"Extraído: {item1}"); // Puede ser "Punto C", "Punto B" o "Punto A"
}
if (itemBag.TryTake(out string item2))
{
Console.WriteLine($"Extraído: {item2}");
}
Console.WriteLine($"Elementos restantes: {itemBag.Count}"); // Salida: Elementos restantes: 1
Puedes ejecutar este código varias veces y notar que el orden de los elementos extraídos puede cambiar.
Métodos Add(), TryTake()
Add(T item): se usa para añadir un elemento al ConcurrentBag. La operación es thread-safe.
TryTake(out T item): intenta extraer un elemento del ConcurrentBag. Devuelve true si el elemento fue extraído con éxito, y false si la bolsa está vacía. Es importante que TryTake no bloquea el hilo.
2. Escenarios de uso
ConcurrentBag no es un reemplazo de ConcurrentQueue o ConcurrentStack. Brilla en casos específicos:
Pools de objetos/recursos: cuando tienes un pool de objetos reutilizables, y es deseable que el hilo que devolvió el objeto lo tome de nuevo la mayoría de las veces. Esto reduce la competencia por el recurso compartido.
Distribución dinámica de tareas en TPL: la implementación interna de constructos como Parallel.ForEach y Parallel.For usa bolsas locales y el mecanismo de "work-stealing" para distribuir trabajo eficientemente.
Pool de tareas con ConcurrentBag y optimización de localidad
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;
ConcurrentBag<string> taskPool = new ConcurrentBag<string>();
// Rellenamos el pool con tareas iniciales
for (int i = 0; i < 10; i++)
{
taskPool.Add($"Tarea {i}");
}
void Worker()
{
// Cada hilo intenta tomar una tarea
while (taskPool.TryTake(out string task))
{
Console.WriteLine($"Hilo {Thread.CurrentThread.ManagedThreadId}: Procesando {task}");
Thread.Sleep(50); // Simulación de trabajo
}
Console.WriteLine($"Hilo {Thread.CurrentThread.ManagedThreadId}: Terminó.");
}
// Lanzamos varios workers
// Task.Run(Worker);
// Task.Run(Worker);
// Task.Run(Worker);
// Thread.Sleep(1000); // Damos tiempo para ejecutar
En este ejemplo ConcurrentBag permite a los hilos tomar tareas de forma eficiente, minimizando bloqueos gracias a su estructura interna.
Mecánica interna
ConcurrentBag alcanza alto rendimiento usando almacenamiento thread-local (TLS). Cuando un hilo añade un elemento, se coloca en una estructura local al hilo. En TryTake() primero se lee la estructura local; si está vacía — se realiza "work-stealing" desde otros hilos o desde un pool global. Esto reduce la contención y hace a la colección una excelente opción cuando la localidad de acceso es importante y el orden no lo es.
3. Diccionario thread-safe
ConcurrentDictionary<TKey, TValue> es una de las colecciones thread-safe más usadas: un diccionario de alto rendimiento para operaciones seguras de añadir, leer, actualizar y eliminar desde varios hilos.
Un Dictionary<TKey, TValue> normal no es thread-safe en absoluto. Cualquier escritura (añadir/modificar/eliminar) o incluso lectura durante una escritura puede llevar a excepciones (InvalidOperationException) o corrupción de datos.
Ejemplo: problema con un Dictionary normal (repetición)
using System.Collections.Generic;
using System.Threading.Tasks;
Dictionary<int, int> concurrentDictProblem = new Dictionary<int, int>();
void AddToDict(int start, int count)
{
for (int i = 0; i < count; i++)
{
// Intento de añadir/modificar simultáneamente
// Provocará excepciones o comportamiento incorrecto
concurrentDictProblem[start + i] = start + i;
}
}
// Ejecución del ejemplo en Main:
try
{
Task t1 = Task.Run(() => AddToDict(0, 10000));
Task t2 = Task.Run(() => AddToDict(5000, 10000)); // Colisión de claves
Task.WaitAll(t1, t2);
Console.WriteLine($"Elementos en el diccionario (problemático): {concurrentDictProblem.Count}");
}
catch (Exception ex)
{
Console.WriteLine($"Error en un diccionario normal: {ex.Message}");
}
Este código casi con seguridad lanzará una excepción o se bloqueará debido a problemas de thread-safety.
4. Operaciones principales
ConcurrentDictionary proporciona operaciones atómicas de tipo "comprobar + actuar".
TryAdd(TKey key, TValue value): añade atómicamente un par clave-valor. Devuelve true si la clave fue añadida, y false si la clave ya existe.
ConcurrentDictionary<string, int> scores = new ConcurrentDictionary<string, int>();
if (scores.TryAdd("Alice", 100))
Console.WriteLine("Alice añadida."); // Salida: Alice añadida.
if (!scores.TryAdd("Alice", 150))
Console.WriteLine("Alice ya existe."); // Salida: Alice ya existe.
TryGetValue(TKey key, out TValue value): obtiene atómicamente el valor por clave.
if (scores.TryGetValue("Alice", out int aliceScore))
Console.WriteLine($"Puntuación de Alice: {aliceScore}"); // Salida: Puntuación de Alice: 100
TryUpdate(TKey key, TValue newValue, TValue comparisonValue): actualiza atómicamente el valor solo si el actual es igual a comparisonValue. Previene condiciones de carrera.
// Valor actual de Alice = 100
if (scores.TryUpdate("Alice", 120, 100)) // Cambiará 100 por 120
Console.WriteLine("Puntuación de Alice actualizada a 120."); // Salida: Puntuación de Alice actualizada a 120.
if (!scores.TryUpdate("Alice", 130, 100)) // No actualizará, porque el actual es 120, no 100
Console.WriteLine("Puntuación de Alice no actualizada (datos obsoletos)."); // Salida: ...
TryRemove(TKey key, out TValue value): elimina atómicamente el elemento por clave.
if (scores.TryRemove("Alice", out int removedScore))
Console.WriteLine($"Alice eliminada, la puntuación era: {removedScore}"); // Salida: Alice eliminada, la puntuación era: 120
5. Operaciones atómicas avanzadas
Estos dos métodos son el caballo de batalla de ConcurrentDictionary, cubriendo muchos escenarios.
GetOrAdd(TKey key, TValue valueFactory(TKey key)): devuelve atómicamente el valor existente por clave o crea y añade uno nuevo con la fábrica. Ideal para caches y entidades únicas.
// Supongamos que cacheamos objetos pesados
ConcurrentDictionary<int, HeavyObject> objectCache = new ConcurrentDictionary<int, HeavyObject>();
HeavyObject GetOrCreateHeavyObject(int id)
{
// Si ya existe — lo devuelve, sino lo crea y añade
return objectCache.GetOrAdd(id, (key) =>
{
Console.WriteLine($"Creando nuevo HeavyObject para ID: {key}");
return new HeavyObject(key); // Simulación de creación costosa
});
}
// En Main:
HeavyObject obj1 = GetOrCreateHeavyObject(1); // Creará uno nuevo
HeavyObject obj2 = GetOrCreateHeavyObject(2); // Creará uno nuevo
HeavyObject obj3 = GetOrCreateHeavyObject(1); // Devolverá el existente obj1
AddOrUpdate(TKey key, TValue addValue, Func<TKey, TValue, TValue> updateValueFactory): añade atómicamente el valor si la clave no existe, o actualiza el existente mediante una fábrica.
- addValue: valor para añadir si la clave no se encuentra.
- updateValueFactory: función que calcula el nuevo valor en base a la clave y el valor actual.
// Conteo de visitas a páginas
ConcurrentDictionary<string, int> pageViews = new ConcurrentDictionary<string, int>();
void IncrementPageView(string page)
{
pageViews.AddOrUpdate(page, 1, // Si la página es nueva, añadir 1
(key, existingVal) => existingVal + 1); // Si no, incrementar en 1
Console.WriteLine($"La página '{page}' fue visitada {pageViews[page]} veces.");
}
// En Main:
IncrementPageView("Home"); // Home: 1
IncrementPageView("About"); // About: 1
IncrementPageView("Home"); // Home: 2
IncrementPageView("Home"); // Home: 3
IncrementPageView("Contact"); // Contact: 1
6. Ejemplos de uso para caching o gestión de estados
Cache de datos: ConcurrentDictionary es una excelente opción para caches en memoria: GetOrAdd evita recrear objetos costosos.
Gestión de sesiones de usuarios: almacenamiento y actualización segura de datos de sesión desde distintas peticiones.
Conteo de estadísticas: con AddOrUpdate es sencillo incrementar contadores de eventos, vistas, votos, etc.
Registros/Service Locator: almacenar servicios o plugins registrados, accesibles desde varios hilos.
ConcurrentDictionary<TKey, TValue> es una colección altamente optimizada que facilita mucho el desarrollo multihilo con diccionarios gracias a su conjunto de operaciones atómicas sin necesidad de sincronización manual.
GO TO FULL VERSION