CodeGym /Các khóa học /C# SELF /ConcurrentBag

ConcurrentBagConcurrentDictionary

C# SELF
Mức độ , Bài học
Có sẵn

1. Giới thiệu

ConcurrentBag<T> là một collection thread-safe, không có thứ tự. Điểm đặc trưng và lợi thế nằm ở từ "Bag" (túi), nghĩa là thứ tự phần tử khi lấy ra không được đảm bảo. Tức là phần tử bạn lấy ra có thể không phải cái bạn mong đợi dựa trên thứ tự thêm vào. Bù lại, ConcurrentBag có tối ưu đặc biệt khiến nó rất nhanh trong một số kịch bản.

Đặc điểm của ConcurrentBag

Không có thứ tự: Khác với hàng đợi (FIFO) và ngăn xếp (LIFO), ConcurrentBag không hứa rằng TryTake() sẽ trả về phần tử theo bất kỳ thứ tự cụ thể nào liên quan đến cách nó được thêm. Đây là một khác biệt quan trọng.

Tối ưu cho truy cập cục bộ (Thread-Local Storage): Lý do chính để dùng ConcurrentBag là hiệu năng trong các kịch bản mà luồng thêm phần tử rất có khả năng là cùng luồng lấy phần tử đó.

Ví dụ: ConcurrentBag — thêm và lấy

using System.Collections.Concurrent;

ConcurrentBag<string> itemBag = new ConcurrentBag<string>();

// Thêm phần tử
itemBag.Add("Mục A");
itemBag.Add("Mục B");
itemBag.Add("Mục V");

Console.WriteLine($"Số phần tử trong túi: {itemBag.Count}"); // Kết quả: Số phần tử trong túi: 3

// Lấy phần tử (không đảm bảo thứ tự!)
if (itemBag.TryTake(out string item1))
{
    Console.WriteLine($"Đã lấy: {item1}"); // Có thể là "Mục V", "Mục B" hoặc "Mục A"
}

if (itemBag.TryTake(out string item2))
{
    Console.WriteLine($"Đã lấy: {item2}");
}

Console.WriteLine($"Còn lại: {itemBag.Count}"); // Kết quả: Còn lại: 1

Bạn có thể chạy đoạn mã này vài lần và sẽ thấy thứ tự các phần tử lấy ra có thể thay đổi.

Phương thức Add(), TryTake()

Add(T item): được dùng để thêm phần tử vào ConcurrentBag. Thao tác an toàn với nhiều luồng.

TryTake(out T item): cố gắng lấy phần tử từ ConcurrentBag. Trả về true nếu lấy thành công, và false nếu túi rỗng. Quan trọng: TryTake không chặn luồng.

2. Tình huống sử dụng

ConcurrentBag — không phải để thay thế ConcurrentQueue hoặc ConcurrentStack. Nó phát huy mạnh trong các trường hợp đặc thù:

Pools đối tượng/tài nguyên: khi bạn có một pool các object tái sử dụng, và mong muốn luồng trả về object thường sẽ chính luồng lấy lại nó. Điều này giảm cạnh tranh trên tài nguyên chung.

Phân phối động tác vụ trong TPL: các cấu trúc như Parallel.ForEachParallel.For dùng các túi cục bộ và cơ chế "work-stealing" để phân phối công việc hiệu quả.

Bể công việc với ConcurrentBag và tối ưu cho cục bộ

using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Threading;

ConcurrentBag<string> taskPool = new ConcurrentBag<string>();

// Điền vào pool các task ban đầu
for (int i = 0; i < 10; i++)
{
    taskPool.Add($"Tác vụ {i}");
}

void Worker()
{
    // Mỗi luồng cố gắng lấy một tác vụ
    while (taskPool.TryTake(out string task))
    {
        Console.WriteLine($"Luồng {Thread.CurrentThread.ManagedThreadId}: Đang xử lý {task}");
        Thread.Sleep(50); // Giả lập công việc
    }
    Console.WriteLine($"Luồng {Thread.CurrentThread.ManagedThreadId}: Hoàn thành công việc.");
}

// Khởi chạy vài worker
// Task.Run(Worker);
// Task.Run(Worker);
// Task.Run(Worker);
// Thread.Sleep(1000); // Cho thời gian thực thi

Trong ví dụ này ConcurrentBag cho phép các luồng lấy tác vụ hiệu quả, giảm thiểu khóa nhờ cấu trúc nội bộ.

Cơ chế bên trong

ConcurrentBag đạt hiệu năng cao bằng cách dùng lưu trữ cục bộ theo luồng (Thread-Local Storage). Khi một luồng thêm phần tử, nó được đặt vào cấu trúc cục bộ của luồng đó. Khi gọi TryTake(), đầu tiên sẽ kiểm tra cấu trúc cục bộ; nếu rỗng — thực hiện "work-stealing" từ các luồng khác hoặc từ pool toàn cục. Điều này giảm cạnh tranh và làm cho collection này phù hợp khi cục bộ truy cập quan trọng và thứ tự không quan trọng.

3. Từ điển an toàn với nhiều luồng

ConcurrentDictionary<TKey, TValue> — một trong những collection thread-safe được dùng rất nhiều: dictionary hiệu năng cao để thêm, đọc, cập nhật và xóa an toàn từ nhiều luồng.

Lớp Dictionary<TKey, TValue> thông thường hoàn toàn không an toàn với nhiều luồng. Bất kỳ ghi nào (thêm/thay đổi/xóa) hoặc thậm chí đọc khi có ghi có thể dẫn đến ngoại lệ (InvalidOperationException) hoặc hỏng dữ liệu.

Ví dụ: vấn đề với Dictionary thường (lặp lại)

using System.Collections.Generic;
using System.Threading.Tasks;

Dictionary<int, int> concurrentDictProblem = new Dictionary<int, int>();

void AddToDict(int start, int count)
{
    for (int i = 0; i < count; i++)
    {
        // Cố gắng thêm/thay đổi đồng thời
        // Sẽ dẫn đến exception hoặc hành vi không đúng
        concurrentDictProblem[start + i] = start + i;
    }
}

// Chạy ví dụ trong Main:
try
{
    Task t1 = Task.Run(() => AddToDict(0, 10000));
    Task t2 = Task.Run(() => AddToDict(5000, 10000)); // Khóa bị trùng nhau
    Task.WaitAll(t1, t2);
    Console.WriteLine($"Số phần tử trong dictionary (gặp vấn đề): {concurrentDictProblem.Count}");
}
catch (Exception ex)
{
    Console.WriteLine($"Lỗi trong Dictionary thường: {ex.Message}");
}

Đoạn mã này gần như chắc chắn sẽ ném ngoại lệ hoặc treo do vấn đề về an toàn luồng.

4. Các thao tác chính

ConcurrentDictionary cung cấp các thao tác nguyên tử kiểu "kiểm tra + hành động".

TryAdd(TKey key, TValue value): thêm cặp key-value một cách nguyên tử. Trả về true nếu key được thêm, và false nếu key đã tồn tại.

ConcurrentDictionary<string, int> scores = new ConcurrentDictionary<string, int>();
if (scores.TryAdd("Alice", 100))
    Console.WriteLine("Alice đã được thêm."); // Kết quả: Alice đã được thêm.
if (!scores.TryAdd("Alice", 150))
    Console.WriteLine("Alice đã tồn tại."); // Kết quả: Alice đã tồn tại.

TryGetValue(TKey key, out TValue value): lấy giá trị theo key một cách nguyên tử.

if (scores.TryGetValue("Alice", out int aliceScore))
    Console.WriteLine($"Điểm của Alice: {aliceScore}"); // Kết quả: Điểm của Alice: 100

TryUpdate(TKey key, TValue newValue, TValue comparisonValue): cập nhật giá trị nguyên tử chỉ khi giá trị hiện tại bằng comparisonValue. Ngăn ngừa race condition.

// Giá trị hiện tại của Alice = 100
if (scores.TryUpdate("Alice", 120, 100)) // Sẽ cập nhật 100 thành 120
    Console.WriteLine("Điểm của Alice được cập nhật thành 120."); // Kết quả: Điểm của Alice được cập nhật thành 120.
if (!scores.TryUpdate("Alice", 130, 100)) // Sẽ không cập nhật vì hiện tại là 120 chứ không phải 100
    Console.WriteLine("Điểm của Alice không được cập nhật (dữ liệu cũ)."); // Kết quả: ...

TryRemove(TKey key, out TValue value): xóa phần tử theo key một cách nguyên tử.

if (scores.TryRemove("Alice", out int removedScore))
    Console.WriteLine($"Alice đã bị xóa, điểm trước đó là: {removedScore}"); // Kết quả: Alice đã bị xóa, điểm trước đó là: 120

5. Các thao tác nguyên tử nâng cao

Hai phương thức dưới đây là "con ngựa thồ" của ConcurrentDictionary, bao phủ nhiều kịch bản.

GetOrAdd(TKey key, TValue valueFactory(TKey key)): trả về giá trị tồn tại theo key hoặc tạo và thêm mới bằng factory. Rất phù hợp cho cache và thực thể duy nhất.

// Giả sử chúng ta cache các object nặng
ConcurrentDictionary<int, HeavyObject> objectCache = new ConcurrentDictionary<int, HeavyObject>();

HeavyObject GetOrCreateHeavyObject(int id)
{
    // Nếu đã có — trả về nó, nếu không thì tạo và thêm
    return objectCache.GetOrAdd(id, (key) =>
    {
        Console.WriteLine($"Tạo một HeavyObject mới cho ID: {key}");
        return new HeavyObject(key); // Giả lập tạo một object tốn kém
    });
}

// Trong Main:
HeavyObject obj1 = GetOrCreateHeavyObject(1); // Tạo mới
HeavyObject obj2 = GetOrCreateHeavyObject(2); // Tạo mới
HeavyObject obj3 = GetOrCreateHeavyObject(1); // Trả về obj1 đã tồn tại

AddOrUpdate(TKey key, TValue addValue, Func<TKey, TValue, TValue> updateValueFactory): thêm giá trị nếu key chưa tồn tại, hoặc cập nhật giá trị hiện có bằng factory.

  • addValue: giá trị để thêm nếu key không tìm thấy.
  • updateValueFactory: hàm tính giá trị mới dựa trên key và giá trị hiện tại.
// Đếm số lượt truy cập trang
ConcurrentDictionary<string, int> pageViews = new ConcurrentDictionary<string, int>();

void IncrementPageView(string page)
{
    pageViews.AddOrUpdate(page, 1, // Nếu trang mới, thêm 1
                          (key, existingVal) => existingVal + 1); // Nếu không, tăng thêm 1
    Console.WriteLine($"Trang '{page}' đã được truy cập {pageViews[page]} lần.");
}

// Trong Main:
IncrementPageView("Home");   // Home: 1
IncrementPageView("About");  // About: 1
IncrementPageView("Home");   // Home: 2
IncrementPageView("Home");   // Home: 3
IncrementPageView("Contact"); // Contact: 1

6. Ví dụ sử dụng cho caching hoặc quản lý trạng thái

Cache dữ liệu: ConcurrentDictionary là lựa chọn tốt cho in-memory cache: GetOrAdd ngăn chặn việc tạo lại các object tốn kém.

Quản lý session người dùng: lưu trữ và cập nhật an toàn dữ liệu session từ nhiều request.

Đếm thống kê: dùng AddOrUpdate để tăng các bộ đếm sự kiện, lượt xem, vote, v.v.

Registry/Service Locator: lưu các service hoặc plugin đã đăng ký, truy cập từ nhiều luồng.

ConcurrentDictionary<TKey, TValue> là một collection rất tối ưu, giúp đơn giản hóa rất nhiều việc lập trình đa luồng với dictionary nhờ tập các thao tác nguyên tử mà không cần đồng bộ thủ công.

Bình luận
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION