1. “无锁”/无等待 模式(Lock-Free / Wait-Free)
你们应该已经知道,Concurrent 集合是线程安全的。但它们怎么做到在你不在代码里到处加 lock 的情况下仍然安全?答案在于高级算法——lock-free(无锁)和 wait-free(无等待)。
简要解释 lock-free 和 wait-free 算法的概念
Lock-Free(无锁)
要点:保证至少有一个线程总能往前走一步,即使其他线程遇到延迟或被中断。
和 lock 的区别:用 lock 时,竞争的线程会等待锁被释放。在 lock-free 算法里,线程不按传统方式互相等待:发生冲突时它们只是重试。
举例:像超市排队:用 lock 你站着等;在 lock-free 情况下你过去看见忙了就稍后再试——不在公共队列里“排长队”。
Wait-Free(无等待)
要点:更强的保证:每个线程在有限的自有步骤内肯定会推进,不管其他线程做什么。没有人会无限重试。
区别:在 lock-free 中,线程可能因为冲突无限次重试;在 wait-free 中不会发生这种情况。
实际:实现 wait-free 要复杂得多,所以更常见的是 lock-free 或混合方法。
2. Concurrent 集合如何实现线程安全
lock-free 算法的基本构件是处理器级别的 原子操作。在 .NET 里我们通过类 System.Threading.Interlocked 使用它们。
Interlocked 操作
对原语(int, long)的快速原子操作:比如 Interlocked.Increment, Interlocked.Decrement, Interlocked.CompareExchange。
示例:Interlocked.Increment(ref value) — 原子增加;Interlocked.CompareExchange(ref location, value, comparand) — 原子比较并在匹配时更新。
CAS(Compare-And-Swap)——比较并交换
CAS 在 .NET 里以 Interlocked.CompareExchange 实现。基本逻辑:
- 读取变量当前值。
- 基于读取值计算新值。
- 仅当变量仍等于原先值时尝试写入新值。否则重试。
示例:用 Interlocked 的简单无锁计数器
using System.Threading;
using System.Threading.Tasks;
class CounterExample
{
static int regularCounter = 0;
static int interlockedCounter = 0;
static void IncrementRegular(int iterations)
{
for (int i = 0; i < iterations; i++)
{
regularCounter++; // 非线程安全!
}
}
static void IncrementInterlocked(int iterations)
{
for (int i = 0; i < iterations; i++)
{
Interlocked.Increment(ref interlockedCounter); // 原子操作!
}
}
}
// 在 Main:
Task t1 = Task.Run(() => IncrementRegular(500_000));
Task t2 = Task.Run(() => IncrementRegular(500_000));
Task.WaitAll(t1, t2);
Console.WriteLine($"普通计数器: {regularCounter}"); // 几乎总是小于 1_000_000
regularCounter = 0; // 重置以便下个测试
t1 = Task.Run(() => IncrementInterlocked(500_000));
t2 = Task.Run(() => IncrementInterlocked(500_000));
Task.WaitAll(t1, t2);
Console.WriteLine($"Interlocked 计数器: {interlockedCounter}"); // 将是正好 1_000_000
方法 Interlocked.Increment 保证了增量的原子性:即使多个线程并发访问也不会丢失数据。
3. 为什么这对扩展性和性能重要
降低开销:传统锁(lock)可能导致上下文切换和内核等待。Lock-free 把这些损耗降到最低。
避免死锁:线程不互相等待——就没法形成 deadlock。
更好的扩展性:在多核系统上线程之间的相互影响更小,没有单一全局锁的“瓶颈”。
更高的响应性:没有人会因为长时间等待而“卡死”。
简单看一下 ConcurrentQueue<T> 的内部
简化来说:队列由链状的段(segments)组成。对 Enqueue 来说,线程通过 CompareExchange 原子地推进“尾部”;对 TryDequeue 来说,只有当“头部”没变时才原子地移动它。真实实现更复杂(要解决 ABA 问题并考虑 GC),但关键是用原子操作替代“重量级”锁。
4. Concurrent 集合的性能
与在普通集合上加 lock 的性能对比
在低并发时差别不大,有时简单的在普通集合上加一个 lock 性能能对得上。但在高并发下,Concurrent 集合通常因为没有等待全局锁而明显更快。
示例:对比(思路,不运行)
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics; // 用于 Stopwatch
using System.Threading.Tasks;
class PerformanceTest
{
static List<int> regularList = new List<int>();
static ConcurrentQueue<int> concurrentQueue = new ConcurrentQueue<int>();
static object lockObject = new object();
const int Iterations = 1_000_000;
const int NumTasks = 4; // 并行任务数量
public static void RunTests()
{
Console.WriteLine("性能测试(添加):");
// 普通 List + lock 测试
regularList.Clear();
Stopwatch sw = Stopwatch.StartNew();
Parallel.For(0, NumTasks, (i) =>
{
for (int j = 0; j < Iterations / NumTasks; j++)
{
lock (lockObject)
{
regularList.Add(j);
}
}
});
sw.Stop();
Console.WriteLine($"List + lock: {sw.ElapsedMilliseconds} ms. Count: {regularList.Count}");
// ConcurrentQueue 测试
concurrentQueue.Clear();
sw = Stopwatch.StartNew();
Parallel.For(0, NumTasks, (i) =>
{
for (int j = 0; j < Iterations / NumTasks; j++)
{
concurrentQueue.Enqueue(j);
}
});
sw.Stop();
Console.WriteLine($"ConcurrentQueue: {sw.ElapsedMilliseconds} ms. Count: {concurrentQueue.Count}");
// 预计当 NumTasks > 1 时 ConcurrentQueue 会明显更快
}
}
结论:如果你看到代码里在集合外面套了 lock,这通常是改用 Concurrent 对应类型的信号。
6. 有用的细节
contention 对性能的影响
Contention —— 当很多线程同时访问同一资源。竞争越高,等待越多,性能越差。
Concurrent 集合为减少竞争做了设计:例如 ConcurrentBag<T> 使用线程本地存储,ConcurrentDictionary<TKey, TValue> 使用分段锁(striped locking)。
性能关键在于减少 contention:尽量把数据在线程间拆分,或使用多个集合。
为特定场景选择合适的集合
| 集合 | 顺序 | 何时使用 | 何时不使用 |
|---|---|---|---|
|
FIFO(先进先出) | 任务队列、日志、异步事件处理、Producer-Consumer 模型。 | 如果顺序不重要、需要 LIFO 或需要限制大小并阻塞时不适合。 |
|
LIFO(后进先出) | 操作历史(撤销/重做)、图的深度优先遍历(DFS)、优先“最后加入”的对象池。 | 如果需要 FIFO 或顺序稳定性则不适合。 |
|
不保证顺序 | 对象池,当生产者和消费者通常是同一线程;TPL 场景下有局部性优势。 | 如果顺序重要则不要使用。 |
|
无 | 缓存、用户会话、统计计数、并行聚合。 | 如果你不需要字典就不用它。 |
| BlockingCollection<T>(基于 ConcurrentQueue) | FIFO(或基于所用底层集合的顺序) | 带阻塞操作和大小限制的 Producer-Consumer,方便的完成/关闭机制。 | 如果不需要阻塞操作或大小限制则不必使用。 |
7. 优化建议
避免在“热路径”频繁使用 ToArray()
ToArray() 会创建集合的完整副本——在内存和时间上都很贵。只在需要“快照”时并尽量少用。要获取数量用 Count(记住它是调用时的一个快照)。
迭代 Concurrent 集合时要小心
并发修改时迭代器不保证稳定性:可能跳过元素或得到不一致的视图。若需要稳定表示,先用 ToArray() 做快照。
// 不好:可能跳过元素或在迭代时看到变化
foreach (var item in myConcurrentQueue) { /* ... */ }
// 好:先迭代固定快照
var snapshot = myConcurrentQueue.ToArray();
foreach (var item in snapshot) { /* ... */ }
尽量减少通过集合的“流量”
把任务/数据打包:减少 Add/Take 的调用次数——就能减少潜在的 contention。比如别发 1000 条单独消息,发一个包含 1000 条的“包”。
追踪竞争来源
如果看到性能下降,量化看看哪里竞争最大。可能通过让线程操作本地数据或使用不同的集合就能改进设计。
GO TO FULL VERSION