1. Giới thiệu
Hôm nay chúng ta lên một cấp mới! Đã đến lúc làm quen với một công cụ khá đặc biệt — Channel. Cái này được tạo ra dành riêng cho các ứng dụng bất đồng bộ hiện đại trong .NET: nơi mà các khóa thông thường hoặc không cứu được, hoặc làm bạn chậm lại trên con đường đến hạnh phúc.
Bạn sẽ gặp mẫu "Producer-Consumer" vốn vẫn phổ biến từ thập niên 60 của thế kỉ trước. Bạn sẽ tạo một "băng chuyền" bất đồng bộ đơn giản, nơi một số luồng hoặc task sản xuất thứ gì đó (ví dụ: tải file, tính toán số, chờ sự kiện), còn những luồng khác xử lý chúng (ví dụ: lưu, ghi DB, cập nhật UI).
Tại sao xuất hiện Channel?
- Mẫu "Producer-Consumer" từ lâu được giải bằng hàng đợi: producer bỏ nhiệm vụ vào queue, consumer lấy ra. Nhưng! BlockingCollection<T>, các queue từ ConcurrentQueue<T>, thậm chí sync thủ công bằng lock — tất cả đều không bất đồng bộ. Tức là thread chỉ có thể bị block chờ dữ liệu, chứ không trả control lại cho scheduler của async/await.
- Bất đồng bộ trong .NET — không phải là từ thời thượng hôm thứ Sáu, mà là nền tảng kiến trúc hiện đại. Block thread để chờ phần tử là tốn kém và không hiệu quả. Cần chờ dữ liệu mà không block — đó là nhiệm vụ mà Channel giải quyết.
- Linh hoạt: với channel bạn có thể dựng pipeline phức tạp, tách logic giữa các luồng, thêm bước trung gian và cân bằng tải — tất cả mà không đau đầu với sync thấp cấp.
Channel là gì? (Tương tự và kiến trúc)
Hãy tưởng tượng bạn có một cây gậy tiếp sức (hoặc băng chuyền) để truyền đối tượng từ chỗ này sang chỗ khác, và không ai cần gặp trực tiếp đồng nghiệp bên cạnh. Quan trọng là gậy không bị thất lạc trên đường.
Channel là công cụ tích hợp trong .NET để truyền dữ liệu bất đồng bộ giữa các task, thread hoặc phần khác của chương trình. Nó triển khai một hàng đợi bất đồng bộ hỗ trợ "chờ" cả khi thêm lẫn khi lấy phần tử.
- Producer bỏ phần tử vào channel (ví dụ: request cần xử lý);
- Consumer lấy phần tử — và xong chuyện!
2. Lớp Channel<T> và cấu trúc của nó
Mọi thứ bắt đầu từ namespace:
using System.Threading.Channels;
Khác với các collection thông thường, Channel là một factory tạo ra các đối tượng đặc biệt để truyền dữ liệu.
Các loại chính:
- ChannelWriter<T> — "writer" (producer). Chỉ ghi phần tử.
- ChannelReader<T> — "reader" (consumer). Chỉ lấy phần tử.
- Channel (Channel) tách trách nhiệm: writer không biết gì về reader và ngược lại.
Trong .NET có vài implementation của channel, mỗi loại có đặc tính riêng: unbounded (không giới hạn), bounded (giới hạn số phần tử), single-producer-single-consumer (SPSC), multi-producer-multi-consumer (MPMC), v.v. Chúng ta bắt đầu từ cái phổ dụng nhất.
Ví dụ đơn giản: hàng đợi task bất đồng bộ
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
// Tạo channel không giới hạn
var channel = Channel.CreateUnbounded<int>();
// Task producer
var producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"Producer: Đưa {i} vào channel");
await channel.Writer.WriteAsync(i); // Ghi bất đồng bộ!
await Task.Delay(100); // Giả lập công việc
}
channel.Writer.Complete(); // Báo rằng không còn ghi nữa
});
// Task consumer
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumer: Nhận {item} từ channel");
await Task.Delay(200); // Giả lập xử lý
}
Console.WriteLine("Consumer: Channel đã đóng");
});
await Task.WhenAll(producer, consumer);
}
}
Chuyện gì đang xảy ra ở đây?
- Channel.CreateUnbounded<int>() — tạo channel không giới hạn kích thước queue.
- Producer ghi số từ 0 đến 9 vào channel bằng WriteAsync.
- Sau khi ghi xong gọi Complete() — tín hiệu "Không còn phần tử nữa!".
- Consumer duyệt tất cả phần tử bằng ReadAllAsync() (cũng bất đồng bộ!), cho tới khi channel đóng.
- Delay (Task.Delay) giả lập công việc: bạn thấy số được ghi nhanh hơn đọc.
3. Tại sao mọi thứ hoạt động bất đồng bộ?
Các hàng đợi block thông thường (ví dụ BlockingCollection hoặc các cấu trúc bảo vệ bằng lock) chỉ có thể block thread. Điều đó có nghĩa là ta mất tài nguyên quý giá nếu có nhiều task hoặc muốn đạt hiệu năng cao nhất.
Với channel:
- Nếu producer nhanh hơn, channel sẽ tích trữ phần tử (bị giới hạn bởi memory hoặc capacity nếu được đặt).
- Nếu consumer nhanh hơn, nó sẽ chờ khi có thứ gì đó (và không block chết thread, trả thread cho scheduler).
Điều này lý tưởng cho các scenario mà bạn không biết trước ai nhanh hơn — producer hay consumer.
Ứng dụng thực tế
- Logging bất đồng bộ: ghi message vào file/DB thực hiện ở thread riêng;
- Xử lý request web: một task tải nhiều trang, task khác phân tích nội dung;
- Quét và index thư mục: một số task duyệt file system, các task khác tính thống kê;
- Pipeline xử lý dữ liệu phức tạp: ví dụ trong ETL (Extract–Transform–Load) một bước biến nguyên liệu thành bán phẩm, bước sau biến thành sản phẩm.
4. Channel giới hạn (Bounded Channel)
Channel "không giới hạn" thì vui, nhưng bộ nhớ không phải vô hạn (dù máy bạn có vẻ to ra sao đi nữa).
Bounded channel cho phép đặt số phần tử tối đa có thể tồn tại trong channel cùng lúc. Nếu channel đầy — producer sẽ chờ cho tới khi consumer lấy bớt.
Ví dụ:
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(3)
{
FullMode = BoundedChannelFullMode.Wait // (mặc định) - chờ cho tới khi có chỗ
});
Ở đây chỉ có ba phần tử trong channel cùng lúc. Nếu producer cố ghi phần tử thứ tư — nó sẽ chờ.
Nhiều producer và nhiều consumer
var channel = Channel.CreateUnbounded<int>();
// 2 producer
for (int producerId = 0; producerId < 2; producerId++)
{
Task.Run(async () =>
{
for (int i = 0; i < 5; i++)
{
int value = producerId * 100 + i;
Console.WriteLine($"Producer {producerId}: Đưa {value}");
await channel.Writer.WriteAsync(value);
await Task.Delay(50);
}
// Mỗi producer gọi Complete() — nguy hiểm!
});
}
// Mẹo: Complete() chỉ nên gọi một lần duy nhất, khi TẤT CẢ producer kết thúc.
// Ở ví dụ để nguyên một task consumer:
Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumer nhận {item}");
await Task.Delay(100);
}
});
Chú ý! Channel chỉ nên được đóng (bằng Complete()) sau khi mọi producer đã kết thúc. Nếu không, một producer khác có thể cố ghi khi channel đã đóng — sẽ gây exception. Trong thực tế người ta thường dùng bộ đếm task hoặc Task.WhenAll.
5. Thực hành: Xử lý ảnh qua channel
Hơi tăng độ khó một chút! Giả sử ta có một folder chứa ảnh. Một task tìm ảnh và đặt đường dẫn vào channel, task khác lấy đường dẫn và xử lý file (ví dụ tính kích thước hoặc convert).
Ghi chú: Để đơn giản ví dụ sẽ làm việc với tên file (không xử lý ảnh thật), nhưng ý tưởng hoàn toàn giống nhau.
using System;
using System.IO;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
var channel = Channel.CreateBounded<string>(5);
// Producer: tìm file .jpg trong thư mục
var producer = Task.Run(async () =>
{
foreach (var file in Directory.EnumerateFiles(@"images", "*.jpg"))
{
await channel.Writer.WriteAsync(file);
Console.WriteLine($"Đã thêm vào queue: {file}");
await Task.Delay(50); // giả lập độ trễ tìm kiếm
}
channel.Writer.Complete(); // kết thúc queue
});
// Consumer: đọc và "xử lý" file
var consumer = Task.Run(async () =>
{
await foreach (var file in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Xử lý file: {file}");
await Task.Delay(200); // giả lập xử lý
}
Console.WriteLine("Tất cả ảnh đã được xử lý!");
});
await Task.WhenAll(producer, consumer);
}
}
6. Cấu hình Channel: options và chú ý
Channel có thể cấu hình qua options khi tạo — đây là các tham số chính cho bounded channel:
| Option | Mô tả |
|---|---|
|
Số phần tử tối đa có thể có trong channel cùng lúc |
|
true nếu chỉ có một producer (tăng tốc) |
|
true nếu chỉ có một consumer (tăng tốc) |
|
Nếu channel đầy thì làm gì? Giá trị có thể: Wait, DropWrite, DropOldest, DropNewest |
Ví dụ với options:
var options = new BoundedChannelOptions(10)
{
SingleWriter = false,
SingleReader = true,
FullMode = BoundedChannelFullMode.Wait
};
var channel = Channel.CreateBounded<string>(options);
7. Các phương thức bất đồng bộ: ReadAsync, WriteAsync, ReadAllAsync
Tại sao async quan trọng?
Các phương thức WriteAsync và ReadAsync không block thread! Nếu không có gì để đọc — task được tạm dừng, giải phóng thread cho công việc khác. Điều này rất quan trọng với server và UI app, nơi block có thể gây "freeze".
ReadAllAsync — tiện lợi của C# hiện đại
Có thể iter bất đồng bộ:
await foreach (var item in channel.Reader.ReadAllAsync())
{
// Làm việc với item
}
Channel<T> và các collection thread-safe: khác nhau chỗ nào?
ConcurrentQueue<T>/BlockingCollection<T> phù hợp cho scenario với threads, nhưng không phù hợp cho pure async (await-based) scenarios.
Channel<T> được thiết kế cho các pipeline bất đồng bộ. Về mặt thread-safety cả hai đều ổn, nhưng channel mang lại linh hoạt và tích hợp với các tính năng hiện đại của C# (IAsyncEnumerable v.v.).
8. Lỗi và bẫy thường gặp
Đừng quên gọi Complete() ở writer khi đã thêm hết phần tử! Nếu không consumer sẽ chờ mãi các phần tử mới.
Đừng gọi Complete() nhiều lần nếu có nhiều writer — chỉ làm điều đó sau khi tất cả producer thực sự kết thúc.
Sau khi channel đóng không thể ghi thêm phần tử, nhưng vẫn có thể đọc phần tử còn lại.
Race condition khi ghi đồng thời: nếu channel đã đóng mà vẫn có người cố ghi — sẽ nhận exception.
GO TO FULL VERSION