1. Introduction
Let's start with the basics. A queue is a fundamental data structure that works on the FIFO (First-In, First-Out) principle — "first in, first out". Imagine a regular checkout line at a store: whoever got in line first is served first.
In multithreaded programming the Producer-Consumer pattern is one of the most common and powerful patterns.
- Producers are threads or parts of the app that create data or tasks and put them into a shared queue. They "produce" work.
- Consumers are threads or parts of the app that take data or tasks from the queue and process them. They "consume" work.
This pattern helps control the flow of data, decouples system components (a producer doesn't need to know who or how will process the data), makes the app more responsive, and helps distribute load evenly across threads.
Example: ConcurrentQueue — adding and removing
Let's see how to add and remove items from a ConcurrentQueue<T>.
using System.Collections.Concurrent;
ConcurrentQueue<string> tasks = new ConcurrentQueue<string>();
// Adding items (producer)
tasks.Enqueue("Skachat file");
tasks.Enqueue("Obrabotat izobrazhenie");
Console.WriteLine($"Zadach v ocheredi: {tasks.Count}"); // Output: Zadach v ocheredi: 2
// Removing items (consumer)
if (tasks.TryDequeue(out string task1))
{
Console.WriteLine($"Vypolnena zadacha: {task1}"); // Output: Vypolnena zadacha: Skachat file
}
if (tasks.TryDequeue(out string task2))
{
Console.WriteLine($"Vypolnena zadacha: {task2}"); // Output: Vypolnena zadacha: Obrabotat izobrazhenie
}
if (!tasks.TryDequeue(out string emptyTask))
{
Console.WriteLine("Ochered pusta, novyh zadach net."); // Output: Ochered pusta, novyh zadach net.
}
Basics: Enqueue(), TryDequeue()
Enqueue(T item): Used to add an item to the end of the queue. This operation is thread-safe. You can call Enqueue from 10 different threads at the same time and all items will be added correctly.
TryDequeue(out T item): Used to attempt to remove an item from the front of the queue. This is the key method for consumers. It returns true if an item was successfully dequeued (the value goes into the out parameter item), and false if the queue is empty. Important: TryDequeue does not block the thread when the queue is empty.
2. Importance of TryDequeue() and operation atomicity
The TryDequeue() method is not just convenient; it's critically important for correct thread-safe behavior. It is atomic: the check for emptiness and the actual removal happen as one indivisible operation.
If we had separate methods IsEmpty (check if the queue is empty) and Dequeue (remove an item), another thread could empty the queue between the calls. As a result your Dequeue would throw an exception or return invalid data. TryDequeue fully prevents this situation.
Example: Producer-Consumer with multiple threads
Here we run two producer threads and one consumer thread.
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
ConcurrentQueue<int> dataQueue = new ConcurrentQueue<int>();
bool producersDone = false; // Flag to signal the consumer
void Producer(int start, int count)
{
for (int i = 0; i < count; i++)
{
dataQueue.Enqueue(start + i);
Console.WriteLine($"[P] Dobavil: {start + i}");
Thread.Sleep(10);
}
}
void Consumer()
{
while (!producersDone || dataQueue.Count > 0) // Keep going while there's data or producers are running
{
if (dataQueue.TryDequeue(out int item))
{
Console.WriteLine($"[C] Obrabotal: {item}");
}
else
{
Thread.Sleep(50); // Wait if the queue is empty
}
}
Console.WriteLine("[C] Zavershil rabotu.");
}
// Run the example in Main:
// Task.Run(() => Producer(1, 5));
// Task.Run(() => Producer(100, 5)); // Second producer
// Task.Run(() => Consumer());
// Thread.Sleep(600); // Give threads some time to work
// producersDone = true; // Signal that producers finished
// Thread.Sleep(200); // Give consumer time to take the rest
Note that in this simple example the producersDone flag and Thread.Sleep are used to simulate completion. In real apps you often use CancellationTokenSource or BlockingCollection<T> for more reliable shutdown coordination.
ConcurrentQueue<T> is ideal for scenarios where:
- The processing order matters (FIFO).
- Many threads are adding items, and/or many threads are taking them.
- You need high performance without manual lock management.
3. Stack for producer-consumer (LIFO)
A stack is another fundamental data structure that works on the LIFO (Last-In, First-Out) principle — "last in, first out". Imagine a stack of plates: you always take the top one, and you always put a new plate on top.
ConcurrentStack<T> is just as thread-safe as ConcurrentQueue<T>, and can also be used in a producer-consumer pattern, but with reversed processing order.
Example: ConcurrentStack — pushing and popping
using System.Collections.Concurrent;
ConcurrentStack<string> commandStack = new ConcurrentStack<string>();
// Adding commands (producer)
commandStack.Push("Vydelit tekst");
commandStack.Push("Izmenit shrift");
commandStack.Push("Sohranit dokument");
Console.WriteLine($"Komand v steke: {commandStack.Count}"); // Output: Komand v steke: 3
// Removing commands (consumer)
if (commandStack.TryPop(out string cmd1))
{
Console.WriteLine($"Otmenena komanda: {cmd1}"); // Output: Otmenena komanda: Sohranit dokument
}
if (commandStack.TryPop(out string cmd2))
{
Console.WriteLine($"Otmenena komanda: {cmd2}"); // Output: Otmenena komanda: Izmenit shrift
}
if (!commandStack.TryPop(out string emptyCmd))
{
Console.WriteLine("Stek komand pust."); // Output: Stek komand pust.
}
4. Basics: Push(), TryPop()
Push(T item): Used to add an item to the top of the stack. The operation is thread-safe.
TryPop(out T item): Used to attempt to remove an item from the top of the stack. Returns true if an item was successfully removed, and false if the stack is empty. Like TryDequeue, this is an atomic operation that prevents data races.
Example: using ConcurrentStack for an object pool
A stack is great for implementing object pools: take — use — return.
using System.Collections.Concurrent;
class Connection { /* Prostaya zaglushka */ public Guid Id { get; } = Guid.NewGuid(); }
ConcurrentStack<Connection> connectionPool = new ConcurrentStack<Connection>();
// Fill the pool with initial connections
for (int i = 0; i < 3; i++)
{
connectionPool.Push(new Connection());
}
Console.WriteLine($"Soedineniy v pule: {connectionPool.Count}"); // Output: Soedineniy v pule: 3
void UseConnection()
{
if (connectionPool.TryPop(out Connection conn))
{
Console.WriteLine($"[Pul] Ispolzovano soedinenie: {conn.Id}");
// Simulate work with the connection
Thread.Sleep(50);
connectionPool.Push(conn); // Return to pool
Console.WriteLine($"[Pul] Vozvrascheno soedinenie: {conn.Id}. V pule: {connectionPool.Count}");
}
else
{
Console.WriteLine("[Pul] Pul soedineniy pust. Sozdaem novoe.");
// Usually you create a new connection if the pool is empty
connectionPool.Push(new Connection());
}
}
// Run the example in Main:
Task.Run(() => UseConnection());
Task.Run(() => UseConnection());
Task.Run(() => UseConnection());
Thread.Sleep(500);
In this example multiple threads can safely take and return connections to the shared pool.
5. Use cases and comparison with ConcurrentQueue
ConcurrentStack<T> is used when:
- The LIFO order is critical (for example, undo history).
- You need fast access to the most recently added items (they often stay "hot" in CPU cache).
- You're implementing stack-based algorithms (graph DFS, expression parsing).
Comparison and choosing the right collection
| Collection | Order | Advantages | Typical scenarios |
|---|---|---|---|
|
FIFO (First in, first out) | Provides fair processing in arrival order | Task queues, logging, handling incoming requests, event buses |
|
LIFO (Last in, first out) | Fast access to recently added items | Action history (Undo/Redo), object pools, traversal algorithms (DFS) |
The choice between ConcurrentQueue and ConcurrentStack fully depends on the required processing order in your producer-consumer scenario. Both collections offer high performance and thread-safety out of the box, freeing you from manual synchronization and helping build scalable multithreaded systems.
GO TO FULL VERSION