1. 介紹
ConcurrentBag<T> 是一個 thread-safe、無序的集合。它的主要特性與優勢就在於「Bag(袋子)」這個詞,表示取出時不保證元素順序。這代表你取到的元素可能不是依照加入順序所預期的那一個。不過 ConcurrentBag 在某些場景下有獨特的優化,讓它非常快。
ConcurrentBag 的特性
無順序性: 與佇列(FIFO)或堆疊(LIFO)不同,ConcurrentBag 不保證 TryTake() 會依照加入的某種特定順序回傳元素。這是它的 關鍵差異。
針對本地存取的優化(Thread-Local Storage): ConcurrentBag 存在的主要原因是,它在那種加入元素的執行緒很可能也是取出該元素的執行緒時,能夠提供非常好的效能。
範例:ConcurrentBag — 新增與取出
using System.Collections.Concurrent;
ConcurrentBag<string> itemBag = new ConcurrentBag<string>();
// 新增元素
itemBag.Add("項目 A");
itemBag.Add("項目 B");
itemBag.Add("項目 C");
Console.WriteLine($"袋子裡的元素數量: {itemBag.Count}"); // 輸出: 袋子裡的元素數量: 3
// 取出元素(順序不保證!)
if (itemBag.TryTake(out string item1))
{
Console.WriteLine($"取出: {item1}"); // 可能是 "項目 C", "項目 B" 或 "項目 A"
}
if (itemBag.TryTake(out string item2))
{
Console.WriteLine($"取出: {item2}");
}
Console.WriteLine($"剩下的元素數量: {itemBag.Count}"); // 輸出: 剩下的元素數量: 1
你可以多次執行這段程式,會發現取出元素的順序可能會變動。
方法 Add(), TryTake()
Add(T item): 用來把元素加入到 ConcurrentBag。此操作是 thread-safe 的。
TryTake(out T item): 嘗試從 ConcurrentBag 中取出元素。若成功取出則回傳 true,若袋子為空則回傳 false。注意,TryTake 不會阻塞執行緒。
2. 使用情境
ConcurrentBag 並不是要取代 ConcurrentQueue 或 ConcurrentStack。它在某些特定場合特別有用:
物件/資源池: 當你有一個可重用物件的池,而且希望通常會是把物件還回去的那個執行緒再次取得該物件。這可以減少對共用資源的競爭。
在 TPL 中動態分配工作: 類似 Parallel.ForEach 和 Parallel.For 的實作會使用本地 bag 與「work-stealing」機制來有效分配工作。
使用 ConcurrentBag 的工作池並優化本地性
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;
ConcurrentBag<string> taskPool = new ConcurrentBag<string>();
// 用初始任務填滿池子
for (int i = 0; i < 10; i++)
{
taskPool.Add($"任務 {i}");
}
void Worker()
{
// 每個執行緒嘗試拿任務
while (taskPool.TryTake(out string task))
{
Console.WriteLine($"執行緒 {Thread.CurrentThread.ManagedThreadId}: 正在處理 {task}");
Thread.Sleep(50); // 模擬工作
}
Console.WriteLine($"執行緒 {Thread.CurrentThread.ManagedThreadId}: 工作完成。");
}
// 啟動幾個 worker 執行緒
// Task.Run(Worker);
// Task.Run(Worker);
// Task.Run(Worker);
// Thread.Sleep(1000); // 給時間讓它們執行
在這個範例中,ConcurrentBag 允許執行緒有效率地取得任務,由於內部結構的緣故能最小化鎖定。
內部機制
ConcurrentBag 透過使用執行緒本地儲存(TLS)來達到高效能。當執行緒加入元素時,元素會放到該執行緒的本地結構。當呼叫 TryTake() 時,會先從本地結構讀取;若為空,則會從其他執行緒或全域池執行「work-stealing」。這樣能減少競爭,當存取本地性重要且順序不重要時,這個集合是很好的選擇。
3. thread-safe 字典
ConcurrentDictionary<TKey, TValue> 是最常用的 thread-safe 集合之一:一個高效能的字典,用於在多執行緒環境下安全地新增、讀取、更新和刪除。
一般的 Dictionary<TKey, TValue> 完全不是 thread-safe 的。任何寫入(新增/修改/刪除)或甚至在寫入期間的讀取,都可能導致例外(InvalidOperationException)或資料破壞。
範例:一般 Dictionary 的問題(重複示範)
using System.Collections.Generic;
using System.Threading.Tasks;
Dictionary<int, int> concurrentDictProblem = new Dictionary<int, int>();
void AddToDict(int start, int count)
{
for (int i = 0; i < count; i++)
{
// 嘗試同時新增/修改
// 會導致例外或不正確的行為
concurrentDictProblem[start + i] = start + i;
}
}
// 在 Main 啟動範例:
try
{
Task t1 = Task.Run(() => AddToDict(0, 10000));
Task t2 = Task.Run(() => AddToDict(5000, 10000)); // 鍵會重疊
Task.WaitAll(t1, t2);
Console.WriteLine($"問題字典裡的元素數量: {concurrentDictProblem.Count}");
}
catch (Exception ex)
{
Console.WriteLine($"一般字典發生錯誤: {ex.Message}");
}
這段程式幾乎可以保證會拋出例外或因 thread-safety 問題而當掉。
4. 主要操作
ConcurrentDictionary 提供了專門的原子性「檢查 + 動作」操作。
TryAdd(TKey key, TValue value): 以原子方式新增鍵值對。如果鍵被加入則回傳 true,若鍵已存在則回傳 false。
ConcurrentDictionary<string, int> scores = new ConcurrentDictionary<string, int>();
if (scores.TryAdd("Alice", 100))
Console.WriteLine("Alice 已加入。"); // 輸出: Alice 已加入。
if (!scores.TryAdd("Alice", 150))
Console.WriteLine("Alice 已經存在。"); // 輸出: Alice 已經存在。
TryGetValue(TKey key, out TValue value): 以原子方式取得某鍵的值。
if (scores.TryGetValue("Alice", out int aliceScore))
Console.WriteLine($"Alice 的分數: {aliceScore}"); // 輸出: Alice 的分數: 100
TryUpdate(TKey key, TValue newValue, TValue comparisonValue): 以原子方式更新值,只有當目前值等於 comparisonValue 時才會更新。能防止競態條件。
// 目前 Alice 的值 = 100
if (scores.TryUpdate("Alice", 120, 100)) // 將 100 更新為 120
Console.WriteLine("Alice 的分數已更新為 120。"); // 輸出: Alice 的分數已更新為 120。
if (!scores.TryUpdate("Alice", 130, 100)) // 不會更新,因為目前為 120,不是 100
Console.WriteLine("Alice 的分數未更新(資料已過期)。"); // 輸出: ...
TryRemove(TKey key, out TValue value): 以原子方式根據鍵移除元素。
if (scores.TryRemove("Alice", out int removedScore))
Console.WriteLine($"Alice 已被移除,當時分數為: {removedScore}"); // 輸出: Alice 已被移除,當時分數為: 120
5. 進階原子操作
下面這兩個方法是 ConcurrentDictionary 的工作馬,涵蓋了很多場景。
GetOrAdd(TKey key, TValue valueFactory(TKey key)): 以原子方式回傳已存在的鍵值,或透過 factory 建立並新增新的值。很適合用在快取或需要唯一實體的場景。
// 假設我們在快取昂貴的物件
ConcurrentDictionary<int, HeavyObject> objectCache = new ConcurrentDictionary<int, HeavyObject>();
HeavyObject GetOrCreateHeavyObject(int id)
{
// 如果已存在就回傳,否則建立並加入
return objectCache.GetOrAdd(id, (key) =>
{
Console.WriteLine($"為 ID 建立新的 HeavyObject: {key}");
return new HeavyObject(key); // 模擬建立昂貴物件
});
}
// 在 Main:
HeavyObject obj1 = GetOrCreateHeavyObject(1); // 會建立新的
HeavyObject obj2 = GetOrCreateHeavyObject(2); // 會建立新的
HeavyObject obj3 = GetOrCreateHeavyObject(1); // 會回傳已存在的 obj1
AddOrUpdate(TKey key, TValue addValue, Func<TKey, TValue, TValue> updateValueFactory): 以原子方式在鍵不存在時新增值,或在存在時用 factory 更新現有值。
- addValue: 若找不到鍵時要加入的值。
- updateValueFactory: 用來根據鍵與目前值計算新值的函式。
// 計算網頁的訪問次數
ConcurrentDictionary<string, int> pageViews = new ConcurrentDictionary<string, int>();
void IncrementPageView(string page)
{
pageViews.AddOrUpdate(page, 1, // 如果是新頁面,加入 1
(key, existingVal) => existingVal + 1); // 否則加 1
Console.WriteLine($"頁面 '{page}' 已被瀏覽 {pageViews[page]} 次。");
}
// 在 Main:
IncrementPageView("Home"); // Home: 1
IncrementPageView("About"); // About: 1
IncrementPageView("Home"); // Home: 2
IncrementPageView("Home"); // Home: 3
IncrementPageView("Contact"); // Contact: 1
6. 作為快取或狀態管理的範例使用
資料快取: ConcurrentDictionary 是記憶體中快取的好選擇:GetOrAdd 可以避免重複建立昂貴物件。
使用者 session 管理: 安全地從不同請求中儲存與更新 session 資料。
統計計數: 用 AddOrUpdate 很方便地遞增事件計數、瀏覽次數、投票數等。
註冊表 / Service Locator: 儲存已註冊的 services 或 plugins,並能從多個執行緒存取。
ConcurrentDictionary<TKey, TValue> 是高度優化的集合,透過一組原子操作顯著簡化了在多執行緒環境中使用字典的開發工作,而不需要手動同步。
GO TO FULL VERSION