1. Introduction
ConcurrentBag<T> is a thread-safe, unordered collection. Its main feature and advantage lie in the word "Bag", which implies that element order isn't guaranteed when taking items out. That means the element you take out might not be the one you expected based on insertion order. But ConcurrentBag has a unique optimization that makes it extremely fast in certain scenarios.
Features of ConcurrentBag
No ordering: Unlike queues (FIFO) and stacks (LIFO), ConcurrentBag doesn't promise that TryTake() will return an item in any specific order relative to how it was added. This is a key difference.
Optimization for thread-local access (Thread-Local Storage): The main reason for ConcurrentBag's existence is its performance in scenarios where the thread that added an item is likely to be the same thread that will take it.
Example: ConcurrentBag — adding and taking
using System.Collections.Concurrent;
ConcurrentBag<string> itemBag = new ConcurrentBag<string>();
// Adding items
itemBag.Add("Point A");
itemBag.Add("Point B");
itemBag.Add("Point V");
Console.WriteLine($"Elements in the bag: {itemBag.Count}"); // Output: Elements in the bag: 3
// Taking items (order not guaranteed!)
if (itemBag.TryTake(out string item1))
{
Console.WriteLine($"Taken: {item1}"); // Could be "Point V", "Point B" or "Point A"
}
if (itemBag.TryTake(out string item2))
{
Console.WriteLine($"Taken: {item2}");
}
Console.WriteLine($"Elements left: {itemBag.Count}"); // Output: Elements left: 1
You can run this code several times and notice that the order of taken items can change.
Methods Add(), TryTake()
Add(T item): used to add an item to ConcurrentBag. The operation is thread-safe.
TryTake(out T item): attempts to take an item from ConcurrentBag. Returns true if an item was successfully taken, and false if the bag is empty. Important: TryTake does not block the thread.
2. Use cases
ConcurrentBag is not a replacement for ConcurrentQueue or ConcurrentStack. It shines in specific cases:
Object/resource pools: when you have a pool of reusable objects and it's desirable that the thread that returned an object often takes it again. This reduces competition for a shared resource.
Dynamic task distribution in TPL: internals of constructs like Parallel.ForEach and Parallel.For use local bags and a "work-stealing" mechanism for efficient work distribution.
Task pool with ConcurrentBag and locality optimization
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;
ConcurrentBag<string> taskPool = new ConcurrentBag<string>();
// Fill the pool with initial tasks
for (int i = 0; i < 10; i++)
{
taskPool.Add($"Task {i}");
}
void Worker()
{
// Each thread tries to take a task
while (taskPool.TryTake(out string task))
{
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: Processing {task}");
Thread.Sleep(50); // Simulate work
}
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: Finished work.");
}
// Start multiple worker threads
// Task.Run(Worker);
// Task.Run(Worker);
// Task.Run(Worker);
// Thread.Sleep(1000); // Give time to complete
In this example ConcurrentBag lets threads efficiently take tasks while minimizing locks thanks to its internal structure.
Internal mechanics
ConcurrentBag achieves high performance by using thread-local storage (TLS). When a thread adds an item, it's placed into a thread-local structure. On TryTake() the local structure is checked first; if empty, it attempts "work-stealing" from other threads or the global pool. This reduces contention and makes the collection a great choice when access locality matters and order doesn't.
3. Thread-safe dictionary
ConcurrentDictionary<TKey, TValue> is one of the most commonly used thread-safe collections: a high-performance dictionary for safe add/read/update/remove operations from multiple threads.
A regular Dictionary<TKey, TValue> is not thread-safe at all. Any write (add/modify/remove) or even a read during a write can lead to exceptions (InvalidOperationException) or data corruption.
Example: problem with a regular Dictionary (recap)
using System.Collections.Generic;
using System.Threading.Tasks;
Dictionary<int, int> concurrentDictProblem = new Dictionary<int, int>();
void AddToDict(int start, int count)
{
for (int i = 0; i < count; i++)
{
// Attempt to add/modify concurrently
// Will lead to exceptions or incorrect behavior
concurrentDictProblem[start + i] = start + i;
}
}
//Run the example in Main:
try
{
Task t1 = Task.Run(() => AddToDict(0, 10000));
Task t2 = Task.Run(() => AddToDict(5000, 10000)); // Overlapping keys
Task.WaitAll(t1, t2);
Console.WriteLine($"Items in the (problematic) dictionary: {concurrentDictProblem.Count}");
}
catch (Exception ex)
{
Console.WriteLine($"Error in regular dictionary: {ex.Message}");
}
This code will almost certainly throw an exception or hang due to thread-safety issues.
4. Core operations
ConcurrentDictionary provides specialized atomic "check + act" operations.
TryAdd(TKey key, TValue value): atomically adds a key-value pair. Returns true if the key was added, and false if the key already exists.
ConcurrentDictionary<string, int> scores = new ConcurrentDictionary<string, int>();
if (scores.TryAdd("Alice", 100))
Console.WriteLine("Alice added."); // Output: Alice added.
if (!scores.TryAdd("Alice", 150))
Console.WriteLine("Alice already exists."); // Output: Alice already exists.
TryGetValue(TKey key, out TValue value): atomically gets the value by key.
if (scores.TryGetValue("Alice", out int aliceScore))
Console.WriteLine($"Alice's score: {aliceScore}"); // Output: Alice's score: 100
TryUpdate(TKey key, TValue newValue, TValue comparisonValue): atomically updates the value only if the current value equals comparisonValue. Prevents races.
// Current value Alice = 100
if (scores.TryUpdate("Alice", 120, 100)) // Will update 100 to 120
Console.WriteLine("Alice's score updated to 120."); // Output: Alice's score updated to 120.
if (!scores.TryUpdate("Alice", 130, 100)) // Won't update because current is 120, not 100
Console.WriteLine("Alice's score not updated (stale data)."); // Output: ...
TryRemove(TKey key, out TValue value): atomically removes an item by key.
if (scores.TryRemove("Alice", out int removedScore))
Console.WriteLine($"Alice removed, score was: {removedScore}"); // Output: Alice removed, score was: 120
5. Advanced atomic operations
These two methods are the workhorses of ConcurrentDictionary, covering many scenarios.
GetOrAdd(TKey key, TValue valueFactory(TKey key)): atomically returns the existing value for a key or creates and adds a new one via a factory. Ideal for caches and unique entities.
// Suppose we cache heavy objects
ConcurrentDictionary<int, HeavyObject> objectCache = new ConcurrentDictionary<int, HeavyObject>();
HeavyObject GetOrCreateHeavyObject(int id)
{
// If exists — returns it, otherwise creates and adds
return objectCache.GetOrAdd(id, (key) =>
{
Console.WriteLine($"Creating new HeavyObject for ID: {key}");
return new HeavyObject(key); // Simulate creation of a costly object
});
}
// In Main:
HeavyObject obj1 = GetOrCreateHeavyObject(1); // Will create new
HeavyObject obj2 = GetOrCreateHeavyObject(2); // Will create new
HeavyObject obj3 = GetOrCreateHeavyObject(1); // Will return existing obj1
AddOrUpdate(TKey key, TValue addValue, Func<TKey, TValue, TValue> updateValueFactory): atomically adds a value if the key is missing, or updates the existing one via a factory.
- addValue: value to add if the key is not found.
- updateValueFactory: function calculating the new value based on the key and current value.
// Counting page visits
ConcurrentDictionary<string, int> pageViews = new ConcurrentDictionary<string, int>();
void IncrementPageView(string page)
{
pageViews.AddOrUpdate(page, 1, // If page is new, add 1
(key, existingVal) => existingVal + 1); // Otherwise increment by 1
Console.WriteLine($"Page '{page}' visited {pageViews[page]} times.");
}
// In Main:
IncrementPageView("Home"); // Home: 1
IncrementPageView("About"); // About: 1
IncrementPageView("Home"); // Home: 2
IncrementPageView("Home"); // Home: 3
IncrementPageView("Contact"); // Contact: 1
6. Examples for caching or state management
Data caching: ConcurrentDictionary is a great choice for an in-memory cache: GetOrAdd prevents recreating expensive objects.
User session management: safely storing and updating session data from different requests.
Statistics counting: using AddOrUpdate is convenient for incrementing event counters, views, votes, etc.
Registries/Service Locator: storing registered services or plugins available from different threads.
ConcurrentDictionary<TKey, TValue> is a highly optimized collection that significantly simplifies multithreaded development with dictionaries by providing a set of atomic operations without manual synchronization.
GO TO FULL VERSION