1. 介绍
今天我们要上一个新台阶!该认识一个特别的工具了 — Channel。这个东西是为现代 .NET 异步应用做的:在那些普通锁要么不够用、要么严重拖慢你的场景下,它很有用。
你会接触到“生产者-消费者”模式(Producer-Consumer),这个模式从上世纪60年代开始就一直流行。你会搭一个简单的异步“流水线”:有的任务或线程生产东西(比如下载文件、计算数字、等待事件),有的任务去处理它们(比如保存、写数据库、更新 UI)。
为什么会有 Channel?
- "生产者-消费者" 模式 长久以来是用队列来解决的:生产者把任务放进队列,消费者取出来。不过!BlockingCollection<T>、基于 ConcurrentQueue<T> 的队列,甚至用 lock 做的手动同步 — 都不是异步的。也就是说线程只能被阻塞去等数据,而不能把控制权交还给调度器(async/await 的世界)。
- 异步性 在 .NET 不是周五的花哨词,而是现代架构的基石。为了等待元素而阻塞线程既昂贵又低效。需要在不阻塞线程的情况下等待数据——这正是 Channel 要解决的。
- 灵活性:用通道可以搭出复杂的数据处理流水线,分离线程逻辑,添加中间步骤和负载均衡 —— 全部不用干低级别的同步操作,爽。
什么是 Channel?(比喻和架构)
想象你有一根接力棒(或者传送带),可以把对象从 A 地带到 B 地,大家不需要面对面交接。关键是接力棒别丢了。
Channel 是 .NET 里用于在不同的 tasks、线程或程序部分之间进行异步数据传递的内建工具。它实现了带有“等待”支持的异步队列,既可以等待写入,也可以等待读取。
- 生产者 把元素放进通道(比如处理请求);
- 消费者 从通道取元素 —— 就这么简单!
2. 类 Channel<T> 和它的结构
一切从命名空间开始:
using System.Threading.Channels;
与常见集合不同,Channel 更像一个工厂,用来创建专门的数据传递对象。
主要类型:
- ChannelWriter<T> — “写者”(生产者)。只负责插入元素。
- ChannelReader<T> — “读者”(消费者)。只负责取出元素。
- 通道(Channel)把职责分离:写者不知道读者,读者也不知道写者。
在 .NET 里有几种通道实现,各有特点:unbounded(无限制)、bounded(有容量限制)、single-producer-single-consumer (SPSC)、multi-producer-multi-consumer (MPMC) 等。我们先从最通用的开始。
简单示例:异步任务队列
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
// 创建一个不限制大小的 channel
var channel = Channel.CreateUnbounded<int>();
// 生产者任务
var producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"生产者: 放入 {i} 到 channel");
await channel.Writer.WriteAsync(i); // 异步写入!
await Task.Delay(100); // 模拟工作
}
channel.Writer.Complete(); // 通知不会再写入了
});
// 消费者任务
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消费者: 从 channel 收到 {item}");
await Task.Delay(200); // 模拟处理
}
Console.WriteLine("消费者: channel 已关闭");
});
await Task.WhenAll(producer, consumer);
}
}
这里发生了什么?
- Channel.CreateUnbounded<int>() — 创建一个不限制队列大小的通道。
- 生产者用 WriteAsync 把 0 到 9 写进通道。
- 写完后调用 Complete() —— 表示“不会再有新元素了!”。
- 消费者通过 ReadAllAsync()(也是异步的)遍历所有元素,直到通道关闭。
- 那些延迟(Task.Delay)只是用来模拟真实工作:你会看到写入通常比读取快。
3. 为什么这一切是异步的?
普通的阻塞队列(比如 BlockingCollection 或用 lock 保护的集合)只能阻塞线程。这意味着如果有大量任务或你想追求高性能,阻塞线程代价很高。
用通道可以做到:
- 如果生产者更快,通道会积累元素(受内存或设定的最大容量限制);
- 如果消费者更快,它会等待直到有新元素(并不会把线程卡死,而是把线程交还给调度器);
这对你不知道谁更快——生产者还是消费者——的场景特别合适。
现实中的应用
- 异步日志记录:把日志消息写到文件/数据库放到单独任务处理;
- 处理 web 请求:一个任务抓取一批页面,另一个任务解析内容;
- 扫描和索引文件夹:一些任务遍历文件系统,另一些统计文件信息;
- 复杂的数据管线:比如在 ETL(Extract–Transform–Load) 中,一个步骤把原料变成半成品,另一个把半成品变成成品。
4. 有容量限制的通道(Bounded Channel)
“无限”通道虽然方便,但内存毕竟不是无限的(即便你觉得你的电脑很猛)。
有界通道 可以设置同时存在的最大元素数。如果通道满了,生产者会等待,直到消费者取走一些元素。
示例:
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(3)
{
FullMode = BoundedChannelFullMode.Wait // (默认) - 等待直到有空位
});
这里通道里最多只能有三个元素。如果生产者尝试写入第四个元素,它会等待。
多个生产者和消费者
var channel = Channel.CreateUnbounded<int>();
// 2 个生产者
for (int producerId = 0; producerId < 2; producerId++)
{
Task.Run(async () =>
{
for (int i = 0; i < 5; i++)
{
int value = producerId * 100 + i;
Console.WriteLine($"生产者 {producerId}: 放入 {value}");
await channel.Writer.WriteAsync(value);
await Task.Delay(50);
}
// 每个生产者都调用 Complete() — 很危险!
});
}
// 小技巧:Complete() 只能被调用一次,等所有生产者都结束后再调用。
// 为了示例,我们只留一个消费者任务:
Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消费者收到 {item}");
await Task.Delay(100);
}
});
注意! 通道应该在所有生产者完成工作后才被关闭(通过 Complete())。否则还想写入的生产者会遇到通道已关闭的情况!实际中通常用任务计数或者 Task.WhenAll 来保证这点。
5. 实战:通过通道处理图片
我们来稍微复杂一点的例子。假设有个文件夹存放图片。一个任务查找图片并把路径放到通道,另一个任务读取路径并对文件做有意义的处理(比如计算大小或转换格式)。
说明:为了简单示例我们只处理文件名(不真正操作图片),但思路是一样的。
using System;
using System.IO;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
var channel = Channel.CreateBounded<string>(5);
// 生产者:在文件夹中查找 .jpg 文件
var producer = Task.Run(async () =>
{
foreach (var file in Directory.EnumerateFiles(@"images", "*.jpg"))
{
await channel.Writer.WriteAsync(file);
Console.WriteLine($"加入队列: {file}");
await Task.Delay(50); // 模拟搜索延迟
}
channel.Writer.Complete(); // 队列结束
});
// 消费者:读取并“处理”文件
var consumer = Task.Run(async () =>
{
await foreach (var file in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"处理文件: {file}");
await Task.Delay(200); // 模拟处理
}
Console.WriteLine("所有图片都处理完了!");
});
await Task.WhenAll(producer, consumer);
}
}
6. 配置 Channel:选项和细节
创建通道时可以用选项进行配置 — 对于有界通道来说,主要参数如下:
| 选项 | 说明 |
|---|---|
|
同时在通道中的最大元素数量 |
|
true:如果只有一个生产者(可以提高性能) |
|
true:如果只有一个消费者(可以提高性能) |
|
通道满时的处理策略。可能的值:Wait, DropWrite, DropOldest, DropNewest |
带选项的示例:
var options = new BoundedChannelOptions(10)
{
SingleWriter = false,
SingleReader = true,
FullMode = BoundedChannelFullMode.Wait
};
var channel = Channel.CreateBounded<string>(options);
7. 异步方法:ReadAsync, WriteAsync, ReadAllAsync
为什么 async 很重要?
WriteAsync 和 ReadAsync 不会阻塞线程!如果没有可读的内容,任务会被挂起,把线程释放给调度器去做别的事。这在服务器端和 UI 应用里尤其重要,额外的阻塞会导致界面卡顿或吞吐下降。
ReadAllAsync — 现代 C# 的便利
可以用异步方式迭代:
await foreach (var item in channel.Reader.ReadAllAsync())
{
// 对 item 做处理
}
Channel<T> 和 线程安全集合:有什么区别?
ConcurrentQueue<T>/BlockingCollection<T> 适用于多线程场景,但不适合纯异步(await)场景。
Channel<T> 是为异步 pipeline 应用设计的。从线程安全角度看,这些集合都能做好工作,但通道在灵活性和与现代 C# 功能(比如 IAsyncEnumerable)的集成上更有优势。
8. 错误和常见坑
别忘了在写者完成后调用 Complete()!否则消费者会永远等待新元素。
如果有多个写者,不要让每个写者都调用 Complete() — 只在所有写者都完成后调用一次。
通道关闭后不能再写入元素,但仍然可以读取剩下的元素。
并发写入时的竞态条件:如果通道已关闭且还有人尝试写入,会抛出异常。
GO TO FULL VERSION