CodeGym /课程 /C# SELF /Channel: 生产者–消费者 ( Channel<...

Channel: 生产者–消费者 ( Channel)

C# SELF
第 62 级 , 课程 3
可用

1. 介绍

今天我们要上一个新台阶!该认识一个特别的工具了 — Channel。这个东西是为现代 .NET 异步应用做的:在那些普通锁要么不够用、要么严重拖慢你的场景下,它很有用。

你会接触到“生产者-消费者”模式(Producer-Consumer),这个模式从上世纪60年代开始就一直流行。你会搭一个简单的异步“流水线”:有的任务或线程生产东西(比如下载文件、计算数字、等待事件),有的任务去处理它们(比如保存、写数据库、更新 UI)。

为什么会有 Channel

  1. "生产者-消费者" 模式 长久以来是用队列来解决的:生产者把任务放进队列,消费者取出来。不过!BlockingCollection<T>、基于 ConcurrentQueue<T> 的队列,甚至用 lock 做的手动同步 — 都不是异步的。也就是说线程只能被阻塞去等数据,而不能把控制权交还给调度器(async/await 的世界)。
  2. 异步性 在 .NET 不是周五的花哨词,而是现代架构的基石。为了等待元素而阻塞线程既昂贵又低效。需要在不阻塞线程的情况下等待数据——这正是 Channel 要解决的。
  3. 灵活性:用通道可以搭出复杂的数据处理流水线,分离线程逻辑,添加中间步骤和负载均衡 —— 全部不用干低级别的同步操作,爽。

什么是 Channel?(比喻和架构)

想象你有一根接力棒(或者传送带),可以把对象从 A 地带到 B 地,大家不需要面对面交接。关键是接力棒别丢了。

Channel 是 .NET 里用于在不同的 tasks、线程或程序部分之间进行异步数据传递的内建工具。它实现了带有“等待”支持的异步队列,既可以等待写入,也可以等待读取。

  • 生产者 把元素放进通道(比如处理请求);
  • 消费者 从通道取元素 —— 就这么简单!

2. 类 Channel<T> 和它的结构

一切从命名空间开始:

using System.Threading.Channels;

与常见集合不同,Channel 更像一个工厂,用来创建专门的数据传递对象。

主要类型:

  • ChannelWriter<T> — “写者”(生产者)。只负责插入元素。
  • ChannelReader<T> — “读者”(消费者)。只负责取出元素。
  • 通道(Channel)把职责分离:写者不知道读者,读者也不知道写者。

在 .NET 里有几种通道实现,各有特点:unbounded(无限制)、bounded(有容量限制)、single-producer-single-consumer (SPSC)、multi-producer-multi-consumer (MPMC) 等。我们先从最通用的开始。

简单示例:异步任务队列

using System;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        // 创建一个不限制大小的 channel
        var channel = Channel.CreateUnbounded<int>();

        // 生产者任务
        var producer = Task.Run(async () =>
        {
            for (int i = 0; i < 10; i++)
            {
                Console.WriteLine($"生产者: 放入 {i} 到 channel");
                await channel.Writer.WriteAsync(i); // 异步写入!
                await Task.Delay(100); // 模拟工作
            }
            channel.Writer.Complete(); // 通知不会再写入了
        });

        // 消费者任务
        var consumer = Task.Run(async () =>
        {
            await foreach (var item in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"消费者: 从 channel 收到 {item}");
                await Task.Delay(200); // 模拟处理
            }
            Console.WriteLine("消费者: channel 已关闭");
        });

        await Task.WhenAll(producer, consumer);
    }
}

这里发生了什么?

  • Channel.CreateUnbounded<int>() — 创建一个不限制队列大小的通道。
  • 生产者用 WriteAsync09 写进通道。
  • 写完后调用 Complete() —— 表示“不会再有新元素了!”。
  • 消费者通过 ReadAllAsync()(也是异步的)遍历所有元素,直到通道关闭。
  • 那些延迟(Task.Delay)只是用来模拟真实工作:你会看到写入通常比读取快。

3. 为什么这一切是异步的?

普通的阻塞队列(比如 BlockingCollection 或用 lock 保护的集合)只能阻塞线程。这意味着如果有大量任务或你想追求高性能,阻塞线程代价很高。

用通道可以做到:

  • 如果生产者更快,通道会积累元素(受内存或设定的最大容量限制);
  • 如果消费者更快,它会等待直到有新元素(并不会把线程卡死,而是把线程交还给调度器);

这对你不知道谁更快——生产者还是消费者——的场景特别合适。

现实中的应用

  • 异步日志记录:把日志消息写到文件/数据库放到单独任务处理;
  • 处理 web 请求:一个任务抓取一批页面,另一个任务解析内容;
  • 扫描和索引文件夹:一些任务遍历文件系统,另一些统计文件信息;
  • 复杂的数据管线:比如在 ETL(Extract–Transform–Load) 中,一个步骤把原料变成半成品,另一个把半成品变成成品。

4. 有容量限制的通道(Bounded Channel)

“无限”通道虽然方便,但内存毕竟不是无限的(即便你觉得你的电脑很猛)。

有界通道 可以设置同时存在的最大元素数。如果通道满了,生产者会等待,直到消费者取走一些元素。

示例:

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(3)
{
    FullMode = BoundedChannelFullMode.Wait // (默认) - 等待直到有空位
});

这里通道里最多只能有三个元素。如果生产者尝试写入第四个元素,它会等待。

多个生产者和消费者

var channel = Channel.CreateUnbounded<int>();

// 2 个生产者
for (int producerId = 0; producerId < 2; producerId++)
{
    Task.Run(async () =>
    {
        for (int i = 0; i < 5; i++)
        {
            int value = producerId * 100 + i;
            Console.WriteLine($"生产者 {producerId}: 放入 {value}");
            await channel.Writer.WriteAsync(value);
            await Task.Delay(50);
        }
        // 每个生产者都调用 Complete() — 很危险!
    });
}
// 小技巧:Complete() 只能被调用一次,等所有生产者都结束后再调用。
// 为了示例,我们只留一个消费者任务:
Task.Run(async () =>
{
    await foreach (var item in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"消费者收到 {item}");
        await Task.Delay(100);
    }
});

注意! 通道应该在所有生产者完成工作后才被关闭(通过 Complete())。否则还想写入的生产者会遇到通道已关闭的情况!实际中通常用任务计数或者 Task.WhenAll 来保证这点。

5. 实战:通过通道处理图片

我们来稍微复杂一点的例子。假设有个文件夹存放图片。一个任务查找图片并把路径放到通道,另一个任务读取路径并对文件做有意义的处理(比如计算大小或转换格式)。

说明:为了简单示例我们只处理文件名(不真正操作图片),但思路是一样的。

using System;
using System.IO;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        var channel = Channel.CreateBounded<string>(5);

        // 生产者:在文件夹中查找 .jpg 文件
        var producer = Task.Run(async () =>
        {
            foreach (var file in Directory.EnumerateFiles(@"images", "*.jpg"))
            {
                await channel.Writer.WriteAsync(file);
                Console.WriteLine($"加入队列: {file}");
                await Task.Delay(50); // 模拟搜索延迟
            }
            channel.Writer.Complete(); // 队列结束
        });

        // 消费者:读取并“处理”文件
        var consumer = Task.Run(async () =>
        {
            await foreach (var file in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"处理文件: {file}");
                await Task.Delay(200); // 模拟处理
            }
            Console.WriteLine("所有图片都处理完了!");
        });

        await Task.WhenAll(producer, consumer);
    }
}

6. 配置 Channel:选项和细节

创建通道时可以用选项进行配置 — 对于有界通道来说,主要参数如下:

选项 说明
Capacity
同时在通道中的最大元素数量
SingleWriter
true:如果只有一个生产者(可以提高性能)
SingleReader
true:如果只有一个消费者(可以提高性能)
FullMode
通道满时的处理策略。可能的值:Wait, DropWrite, DropOldest, DropNewest

带选项的示例:

var options = new BoundedChannelOptions(10)
{
    SingleWriter = false,
    SingleReader = true,
    FullMode = BoundedChannelFullMode.Wait
};
var channel = Channel.CreateBounded<string>(options);

7. 异步方法:ReadAsync, WriteAsync, ReadAllAsync

为什么 async 很重要?

WriteAsyncReadAsync 不会阻塞线程!如果没有可读的内容,任务会被挂起,把线程释放给调度器去做别的事。这在服务器端和 UI 应用里尤其重要,额外的阻塞会导致界面卡顿或吞吐下降。

ReadAllAsync — 现代 C# 的便利

可以用异步方式迭代:

await foreach (var item in channel.Reader.ReadAllAsync())
{
    // 对 item 做处理
}

Channel<T> 和 线程安全集合:有什么区别?

ConcurrentQueue<T>/BlockingCollection<T> 适用于多线程场景,但不适合纯异步(await)场景。

Channel<T> 是为异步 pipeline 应用设计的。从线程安全角度看,这些集合都能做好工作,但通道在灵活性和与现代 C# 功能(比如 IAsyncEnumerable)的集成上更有优势。

8. 错误和常见坑

别忘了在写者完成后调用 Complete()!否则消费者会永远等待新元素。

如果有多个写者,不要让每个写者都调用 Complete() — 只在所有写者都完成后调用一次

通道关闭后不能再写入元素,但仍然可以读取剩下的元素。

并发写入时的竞态条件:如果通道已关闭且还有人尝试写入,会抛出异常。

评论
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION