CodeGym /Courses /C# SELF /Queues and Stacks Producer-Consumer

Queues and Stacks Producer-Consumer

C# SELF
Level 58 , Lesson 1
Available

1. Introduction

Let's start with the basics. A queue is a fundamental data structure that works on the FIFO (First-In, First-Out) principle — "first in, first out". Imagine a regular checkout line at a store: whoever got in line first is served first.

In multithreaded programming the Producer-Consumer pattern is one of the most common and powerful patterns.

  • Producers are threads or parts of the app that create data or tasks and put them into a shared queue. They "produce" work.
  • Consumers are threads or parts of the app that take data or tasks from the queue and process them. They "consume" work.

This pattern helps control the flow of data, decouples system components (a producer doesn't need to know who or how will process the data), makes the app more responsive, and helps distribute load evenly across threads.

Example: ConcurrentQueue — adding and removing

Let's see how to add and remove items from a ConcurrentQueue<T>.

using System.Collections.Concurrent;

ConcurrentQueue<string> tasks = new ConcurrentQueue<string>();

// Adding items (producer)
tasks.Enqueue("Skachat file");
tasks.Enqueue("Obrabotat izobrazhenie");
Console.WriteLine($"Zadach v ocheredi: {tasks.Count}"); // Output: Zadach v ocheredi: 2

// Removing items (consumer)
if (tasks.TryDequeue(out string task1))
{
    Console.WriteLine($"Vypolnena zadacha: {task1}"); // Output: Vypolnena zadacha: Skachat file
}

if (tasks.TryDequeue(out string task2))
{
    Console.WriteLine($"Vypolnena zadacha: {task2}"); // Output: Vypolnena zadacha: Obrabotat izobrazhenie
}

if (!tasks.TryDequeue(out string emptyTask))
{
    Console.WriteLine("Ochered pusta, novyh zadach net."); // Output: Ochered pusta, novyh zadach net.
}

Basics: Enqueue(), TryDequeue()

Enqueue(T item): Used to add an item to the end of the queue. This operation is thread-safe. You can call Enqueue from 10 different threads at the same time and all items will be added correctly.

TryDequeue(out T item): Used to attempt to remove an item from the front of the queue. This is the key method for consumers. It returns true if an item was successfully dequeued (the value goes into the out parameter item), and false if the queue is empty. Important: TryDequeue does not block the thread when the queue is empty.

2. Importance of TryDequeue() and operation atomicity

The TryDequeue() method is not just convenient; it's critically important for correct thread-safe behavior. It is atomic: the check for emptiness and the actual removal happen as one indivisible operation.

If we had separate methods IsEmpty (check if the queue is empty) and Dequeue (remove an item), another thread could empty the queue between the calls. As a result your Dequeue would throw an exception or return invalid data. TryDequeue fully prevents this situation.

Example: Producer-Consumer with multiple threads

Here we run two producer threads and one consumer thread.

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> dataQueue = new ConcurrentQueue<int>();
bool producersDone = false; // Flag to signal the consumer

void Producer(int start, int count)
{
    for (int i = 0; i < count; i++)
    {
        dataQueue.Enqueue(start + i);
        Console.WriteLine($"[P] Dobavil: {start + i}");
        Thread.Sleep(10); 
    }
}

void Consumer()
{
    while (!producersDone || dataQueue.Count > 0) // Keep going while there's data or producers are running
    {
        if (dataQueue.TryDequeue(out int item))
        {
            Console.WriteLine($"[C] Obrabotal: {item}");
        }
        else
        {
            Thread.Sleep(50); // Wait if the queue is empty
        }
    }
    Console.WriteLine("[C] Zavershil rabotu.");
}

// Run the example in Main:
// Task.Run(() => Producer(1, 5));
// Task.Run(() => Producer(100, 5)); // Second producer
// Task.Run(() => Consumer());
// Thread.Sleep(600); // Give threads some time to work
// producersDone = true; // Signal that producers finished
// Thread.Sleep(200); // Give consumer time to take the rest

Note that in this simple example the producersDone flag and Thread.Sleep are used to simulate completion. In real apps you often use CancellationTokenSource or BlockingCollection<T> for more reliable shutdown coordination.

ConcurrentQueue<T> is ideal for scenarios where:

  • The processing order matters (FIFO).
  • Many threads are adding items, and/or many threads are taking them.
  • You need high performance without manual lock management.

3. Stack for producer-consumer (LIFO)

A stack is another fundamental data structure that works on the LIFO (Last-In, First-Out) principle — "last in, first out". Imagine a stack of plates: you always take the top one, and you always put a new plate on top.

ConcurrentStack<T> is just as thread-safe as ConcurrentQueue<T>, and can also be used in a producer-consumer pattern, but with reversed processing order.

Example: ConcurrentStack — pushing and popping

using System.Collections.Concurrent;

ConcurrentStack<string> commandStack = new ConcurrentStack<string>();

// Adding commands (producer)
commandStack.Push("Vydelit tekst");
commandStack.Push("Izmenit shrift");
commandStack.Push("Sohranit dokument");
Console.WriteLine($"Komand v steke: {commandStack.Count}"); // Output: Komand v steke: 3

// Removing commands (consumer)
if (commandStack.TryPop(out string cmd1))
{
    Console.WriteLine($"Otmenena komanda: {cmd1}"); // Output: Otmenena komanda: Sohranit dokument
}

if (commandStack.TryPop(out string cmd2))
{
    Console.WriteLine($"Otmenena komanda: {cmd2}"); // Output: Otmenena komanda: Izmenit shrift
}

if (!commandStack.TryPop(out string emptyCmd))
{
    Console.WriteLine("Stek komand pust."); // Output: Stek komand pust.
}

4. Basics: Push(), TryPop()

Push(T item): Used to add an item to the top of the stack. The operation is thread-safe.

TryPop(out T item): Used to attempt to remove an item from the top of the stack. Returns true if an item was successfully removed, and false if the stack is empty. Like TryDequeue, this is an atomic operation that prevents data races.

Example: using ConcurrentStack for an object pool

A stack is great for implementing object pools: take — use — return.

using System.Collections.Concurrent;

class Connection { /* Prostaya zaglushka */ public Guid Id { get; } = Guid.NewGuid(); }

ConcurrentStack<Connection> connectionPool = new ConcurrentStack<Connection>();

// Fill the pool with initial connections
for (int i = 0; i < 3; i++)
{
    connectionPool.Push(new Connection());
}
Console.WriteLine($"Soedineniy v pule: {connectionPool.Count}"); // Output: Soedineniy v pule: 3

void UseConnection()
{
    if (connectionPool.TryPop(out Connection conn))
    {
        Console.WriteLine($"[Pul] Ispolzovano soedinenie: {conn.Id}");
        // Simulate work with the connection
        Thread.Sleep(50); 
        connectionPool.Push(conn); // Return to pool
        Console.WriteLine($"[Pul] Vozvrascheno soedinenie: {conn.Id}. V pule: {connectionPool.Count}");
    }
    else
    {
        Console.WriteLine("[Pul] Pul soedineniy pust. Sozdaem novoe.");
        // Usually you create a new connection if the pool is empty
        connectionPool.Push(new Connection()); 
    }
}

// Run the example in Main:
Task.Run(() => UseConnection());
Task.Run(() => UseConnection());
Task.Run(() => UseConnection());
Thread.Sleep(500);

In this example multiple threads can safely take and return connections to the shared pool.

5. Use cases and comparison with ConcurrentQueue

ConcurrentStack<T> is used when:

  • The LIFO order is critical (for example, undo history).
  • You need fast access to the most recently added items (they often stay "hot" in CPU cache).
  • You're implementing stack-based algorithms (graph DFS, expression parsing).

Comparison and choosing the right collection

Collection Order Advantages Typical scenarios
ConcurrentQueue
FIFO (First in, first out) Provides fair processing in arrival order Task queues, logging, handling incoming requests, event buses
ConcurrentStack
LIFO (Last in, first out) Fast access to recently added items Action history (Undo/Redo), object pools, traversal algorithms (DFS)

The choice between ConcurrentQueue and ConcurrentStack fully depends on the required processing order in your producer-consumer scenario. Both collections offer high performance and thread-safety out of the box, freeing you from manual synchronization and helping build scalable multithreaded systems.

2
Task
C# SELF, level 58, lesson 1
Locked
Producer-Consumer Implementation Using a Queue
Producer-Consumer Implementation Using a Queue
Comments
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION