CodeGym /Cursos /C# SELF /Patrones avanzados y particularidades de las colecciones ...

Patrones avanzados y particularidades de las colecciones Concurrent

C# SELF
Nivel 58 , Lección 3
Disponible

1. «Productor‑Consumidor» + Concurrent

Ya tocamos el patrón «Productor‑Consumidor» al hablar de ConcurrentQueue. Vamos a verlo con más detalle para el caso de varios productores y consumidores, y también la señalización correcta de finalización.

La principal ventaja de ConcurrentQueue (y de otras colecciones Concurrent) es que se ocupa de la seguridad entre hilos por sí misma. No hace falta envolver Enqueue o TryDequeue en un lock — varios hilos pueden interactuar con seguridad a través de la cola compartida.

Ejemplo: Varios productores y varios consumidores

Varios hilos trabajadores generan tareas, y varios hilos distintos las procesan.

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();
CancellationTokenSource cts = new CancellationTokenSource(); // Para cancelar el trabajo de los consumidores

// Método para el productor
void Producer(string name, int count)
{
    for (int i = 0; i < count; i++)
    {
        string task = $"Zadacha_{name}_{i}";
        taskQueue.Enqueue(task);
        Console.WriteLine($"[P:{name}] Dobavil: {task}");
        Thread.Sleep(10); 
    }
}

// Método para el consumidor
void Consumer(string name)
{
    while (!cts.Token.IsCancellationRequested || taskQueue.Count > 0)
    {
        if (taskQueue.TryDequeue(out string task))
        {
            Console.WriteLine($"[C:{name}] Obrabotal: {task}");
            Thread.Sleep(20); 
        }
        else
        {
            Thread.Sleep(50); // Esperamos si la cola está vacía
        }
    }
    Console.WriteLine($"[C:{name}] Zavershil rabotu.");
}

// Inicio del ejemplo en Main:
Task.Run(() => Producer("A", 10)); // Productor A
Task.Run(() => Producer("B", 10)); // Productor B
Task.Run(() => Consumer("1"));    // Consumidor 1
Task.Run(() => Consumer("2"));    // Consumidor 2

Thread.Sleep(1000); // Damos tiempo para que trabajen
cts.Cancel();       // Señal a los consumidores para que terminen
Thread.Sleep(500); // Damos tiempo a los consumidores para procesar lo restante y terminar

Aquí varios productores y consumidores trabajan simultáneamente con una sola ConcurrentQueue sin condiciones de carrera: los métodos Enqueue y TryDequeue son atómicos.

Importancia de las señales de finalización (CancellationTokenSource)

Usamos CancellationTokenSource (cts) para señalar a los consumidores la necesidad de terminar su trabajo. Esto es crítico para el patrón Producer‑Consumer:

  • Los productores han terminado. Cuando la inserción de elementos finaliza, los consumidores no deben esperar indefinidamente una cola vacía.
  • La aplicación se cierra. Hay que detener correctamente a los consumidores.

CancellationTokenSource y CancellationToken dan un mecanismo estándar: el consumidor comprueba periódicamente IsCancellationRequested y si hace falta llama a ThrowIfCancellationRequested().

2. BlockingCollection<T>

Aunque ConcurrentQueue<T> es adecuada para «productor‑consumidor», requiere esperar manualmente cuando la cola está vacía y señalizar la finalización por separado. Para una implementación más cómoda en .NET existe BlockingCollection<T> — no es una colección independiente, sino un envoltorio sobre cualquier IProducerConsumerCollection<T> (por ejemplo, sobre ConcurrentQueue).

Ventajas de BlockingCollection:

  • Operaciones bloqueantes. Add()/Take() bloquean el hilo si la colección está llena/vacía. No hace falta comprobar manualmente IsEmpty.
  • Límite de tamaño. Se puede fijar la capacidad (Capacity). Add() se bloqueará cuando se alcance el límite, — útil para controlar la memoria.
  • Finalización cómoda. CompleteAdding() señala el fin de las adiciones, y GetConsumingEnumerable() permite al consumidor procesar elementos hasta la finalización completa.

Ejemplo: Producer‑Consumer con BlockingCollection<T>

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

// BlockingCollection por defecto usa ConcurrentQueue
BlockingCollection<int> numbers = new BlockingCollection<int>(capacity: 10); // Cola con límite 10

void ProducerBC(int count)
{
    for (int i = 0; i < count; i++)
    {
        numbers.Add(i); // Bloquea si la cola está llena
        Console.WriteLine($"[P] Dobavil: {i}");
        Thread.Sleep(50);
    }
    numbers.CompleteAdding(); // Señalamos que el productor terminó
    Console.WriteLine("[P] Proizvoditel zavershil dobavlenie.");
}

void ConsumerBC()
{
    // GetConsumingEnumerable bloquea mientras haya elementos o hasta que CompleteAdding sea llamado
    foreach (var item in numbers.GetConsumingEnumerable())
    {
        Console.WriteLine($"[C] Obrabotal: {item}");
        Thread.Sleep(100);
    }
    Console.WriteLine("[C] Potrebitel zavershil rabotu.");
}

// Inicio del ejemplo en Main:
Task producerTask = Task.Run(() => ProducerBC(15)); // 15 elementos, límite 10
Task consumerTask = Task.Run(ConsumerBC);
Task.WaitAll(producerTask, consumerTask); // Esperamos a que terminen

Fíjate en lo mucho más limpio que queda el código del consumidor gracias a GetConsumingEnumerable(). Si necesitas operaciones bloqueantes o límite de tamaño — BlockingCollection es tu herramienta.

3. Métodos y propiedades adicionales de las colecciones Concurrent

IsEmpty, Count

  • IsEmpty (bool): si la colección está vacía.
  • Count (int): número actual de elementos.

Ejemplo: Uso de IsEmpty y Count

using System.Collections.Concurrent;

ConcurrentQueue<string> q = new ConcurrentQueue<string>();
Console.WriteLine($"Ochered pustaya? {q.IsEmpty}"); // True

q.Enqueue("A");
q.Enqueue("B");
Console.WriteLine($"Elementov v ocheredi: {q.Count}"); // 2
Console.WriteLine($"Ochered pustaya? {q.IsEmpty}"); // False

q.TryDequeue(out var itemA);
Console.WriteLine($"Elementov v ocheredi: {q.Count}"); // 1

Conversión a arrays (ToArray())

Todas las colecciones Concurrent proporcionan el método ToArray(), que devuelve una instantánea de los elementos.

Ejemplo: Uso de ToArray()

using System.Collections.Concurrent;

ConcurrentStack<int> s = new ConcurrentStack<int>();
s.Push(10);
s.Push(20);
s.Push(30);

int[] items = s.ToArray(); // Creará un nuevo array: [30, 20, 10] (para stack LIFO)
Console.WriteLine($"Elementy v masive: {string.Join(", ", items)}");

// La colección permanece sin cambios
Console.WriteLine($"Elementov v steke posle ToArray: {s.Count}"); // 3

Limpieza de colecciones

En .NET 6+ muchas colecciones Concurrent obtuvieron el método Clear() para eliminar todos los elementos.

Ejemplo: Limpiar una colección

using System.Collections.Concurrent;

ConcurrentBag<string> bag = new ConcurrentBag<string>();
bag.Add("Alpha");
bag.Add("Beta");
Console.WriteLine($"Elementov v Bag: {bag.Count}"); // 2

bag.Clear(); // Limpiamos la colección
Console.WriteLine($"Elementov v Bag posle ochistki: {bag.Count}"); // 0
Console.WriteLine($"Bag pust? {bag.IsEmpty}"); // True

4. Particularidades del comportamiento de las colecciones Concurrent

Es importante recordar la instantaneidad de los datos. Propiedades como Count y los resultados de ToArray() reflejan el estado de la colección en un momento concreto. En condiciones de cambios paralelos ese valor puede quedar obsoleto justo después de obtenerlo.

Ejemplo: Count y ToArray() — "instantánea"

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> snapshotQueue = new ConcurrentQueue<int>();

void AddItemsContinuously()
{
    for (int i = 0; i < 1000; i++)
    {
        snapshotQueue.Enqueue(i);
        Thread.Sleep(1); 
    }
}

// Inicio del ejemplo en Main:
Task.Run(AddItemsContinuously); // Hilo que añade elementos continuamente

Thread.Sleep(100); // Damos un poco de tiempo para añadir
Console.WriteLine($"Count actual: {snapshotQueue.Count}"); // Puede ser 50, 80, 120...
Thread.Sleep(100);
Console.WriteLine($"Count actual otra vez: {snapshotQueue.Count}"); // Será otro valor
int[] currentItems = snapshotQueue.ToArray();
Console.WriteLine($"Número de elementos en ToArray(): {currentItems.Length}"); // Puede diferir del último Count

No confíes en Count como una garantía estricta del número actual de elementos durante cambios activos.

5. Matices al iterar colecciones Concurrent

Operaciones individuales (Add, TryTake, TryPop, GetOrAdd etc.) son seguras entre hilos. Pero iterar con foreach sobre una colección que se modifica en paralelo por otros hilos no garantiza que verás todos los elementos ni solo ellos — pueden ocurrir omissions y comportamientos inesperados.

Ejemplo: Iteración durante la modificación

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> iterQueue = new ConcurrentQueue<int>();

// Añadimos elementos iniciales
for (int i = 0; i < 10; i++) iterQueue.Enqueue(i);

// Hilo modificador
void Modifier()
{
    for (int i = 10; i < 20; i++)
    {
        iterQueue.Enqueue(i); // Añadimos nuevos elementos
        Thread.Sleep(50);
    }
}

// Hilo iterador
void Iterator()
{
    Console.WriteLine("Nachinaem iteraciyu...");
    int count = 0;
    foreach (var item in iterQueue) // Iteramos
    {
        Console.Write($"{item} ");
        count++;
        Thread.Sleep(30); // Simulación de trabajo, dejamos que el modificador cambie la colección
    }
    Console.WriteLine($"\nIteraciya zavershena. Prochitano {count} elementov.");
    Console.WriteLine($"Tekuschee kolichestvo v ocheredi: {iterQueue.Count}");
}

// Inicio del ejemplo en Main:
Task.Run(Modifier);
Task.Run(Iterator);
Thread.Sleep(1500); // Damos tiempo para que trabajen

Regla: si necesitas un conjunto fijo de elementos (por ejemplo, para un informe), primero toma una "instantánea" con ToArray(), y luego itera sobre ella:

// Manera correcta de iterar si la colección puede cambiar
int[] snapshot = iterQueue.ToArray();
foreach (var item in snapshot)
{
    // Ahora iteras sobre un array-instantánea inmutable
}

Con esto cerramos nuestra inmersión en patrones avanzados y particularidades de las colecciones Concurrent: vimos Producer‑Consumer con múltiples participantes y BlockingCollection, y cubrimos matices importantes sobre Count, ToArray() y la iteración.

Comentarios
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION