CodeGym /课程 /C# SELF /线程安全模式

线程安全模式

C# SELF
第 58 级 , 课程 4
可用

1. “无锁”/无等待 模式(Lock-Free / Wait-Free)

你们应该已经知道,Concurrent 集合是线程安全的。但它们怎么做到在你不在代码里到处加 lock 的情况下仍然安全?答案在于高级算法——lock-free(无锁)和 wait-free(无等待)。

简要解释 lock-free 和 wait-free 算法的概念

Lock-Free(无锁)

要点:保证至少有一个线程总能往前走一步,即使其他线程遇到延迟或被中断。

lock 的区别:lock 时,竞争的线程会等待锁被释放。在 lock-free 算法里,线程不按传统方式互相等待:发生冲突时它们只是重试。

举例:像超市排队:用 lock 你站着等;在 lock-free 情况下你过去看见忙了就稍后再试——不在公共队列里“排长队”。

Wait-Free(无等待)

要点:更强的保证:每个线程在有限的自有步骤内肯定会推进,不管其他线程做什么。没有人会无限重试。

区别:在 lock-free 中,线程可能因为冲突无限次重试;在 wait-free 中不会发生这种情况。

实际:实现 wait-free 要复杂得多,所以更常见的是 lock-free 或混合方法。

2. Concurrent 集合如何实现线程安全

lock-free 算法的基本构件是处理器级别的 原子操作。在 .NET 里我们通过类 System.Threading.Interlocked 使用它们。

Interlocked 操作

对原语(int, long)的快速原子操作:比如 Interlocked.Increment, Interlocked.Decrement, Interlocked.CompareExchange

示例:Interlocked.Increment(ref value) — 原子增加;Interlocked.CompareExchange(ref location, value, comparand) — 原子比较并在匹配时更新。

CAS(Compare-And-Swap)——比较并交换

CAS 在 .NET 里以 Interlocked.CompareExchange 实现。基本逻辑:

  1. 读取变量当前值。
  2. 基于读取值计算新值。
  3. 仅当变量仍等于原先值时尝试写入新值。否则重试。

示例:用 Interlocked 的简单无锁计数器

using System.Threading;
using System.Threading.Tasks;

class CounterExample
{
    static int regularCounter = 0;
    static int interlockedCounter = 0;

    static void IncrementRegular(int iterations)
    {
        for (int i = 0; i < iterations; i++)
        {
            regularCounter++; // 非线程安全!
        }
    }

    static void IncrementInterlocked(int iterations)
    {
        for (int i = 0; i < iterations; i++)
        {
            Interlocked.Increment(ref interlockedCounter); // 原子操作!
        }
    }
}

// 在 Main:
Task t1 = Task.Run(() => IncrementRegular(500_000));
Task t2 = Task.Run(() => IncrementRegular(500_000));
Task.WaitAll(t1, t2);
Console.WriteLine($"普通计数器: {regularCounter}"); // 几乎总是小于 1_000_000

regularCounter = 0; // 重置以便下个测试
t1 = Task.Run(() => IncrementInterlocked(500_000));
t2 = Task.Run(() => IncrementInterlocked(500_000));
Task.WaitAll(t1, t2);
Console.WriteLine($"Interlocked 计数器: {interlockedCounter}"); // 将是正好 1_000_000

方法 Interlocked.Increment 保证了增量的原子性:即使多个线程并发访问也不会丢失数据。

3. 为什么这对扩展性和性能重要

降低开销:传统锁(lock)可能导致上下文切换和内核等待。Lock-free 把这些损耗降到最低。

避免死锁:线程不互相等待——就没法形成 deadlock。

更好的扩展性:在多核系统上线程之间的相互影响更小,没有单一全局锁的“瓶颈”。

更高的响应性:没有人会因为长时间等待而“卡死”。

简单看一下 ConcurrentQueue<T> 的内部

简化来说:队列由链状的段(segments)组成。对 Enqueue 来说,线程通过 CompareExchange 原子地推进“尾部”;对 TryDequeue 来说,只有当“头部”没变时才原子地移动它。真实实现更复杂(要解决 ABA 问题并考虑 GC),但关键是用原子操作替代“重量级”锁。

4. Concurrent 集合的性能

与在普通集合上加 lock 的性能对比

在低并发时差别不大,有时简单的在普通集合上加一个 lock 性能能对得上。但在高并发下,Concurrent 集合通常因为没有等待全局锁而明显更快

示例:对比(思路,不运行)

using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics; // 用于 Stopwatch
using System.Threading.Tasks;

class PerformanceTest
{
    static List<int> regularList = new List<int>();
    static ConcurrentQueue<int> concurrentQueue = new ConcurrentQueue<int>();
    static object lockObject = new object();

    const int Iterations = 1_000_000;
    const int NumTasks = 4; // 并行任务数量

    public static void RunTests()
    {
        Console.WriteLine("性能测试(添加):");

        // 普通 List + lock 测试
        regularList.Clear();
        Stopwatch sw = Stopwatch.StartNew();
        Parallel.For(0, NumTasks, (i) =>
        {
            for (int j = 0; j < Iterations / NumTasks; j++)
            {
                lock (lockObject)
                {
                    regularList.Add(j);
                }
            }
        });
        sw.Stop();
        Console.WriteLine($"List + lock: {sw.ElapsedMilliseconds} ms. Count: {regularList.Count}");

        // ConcurrentQueue 测试
        concurrentQueue.Clear();
        sw = Stopwatch.StartNew();
        Parallel.For(0, NumTasks, (i) =>
        {
            for (int j = 0; j < Iterations / NumTasks; j++)
            {
                concurrentQueue.Enqueue(j);
            }
        });
        sw.Stop();
        Console.WriteLine($"ConcurrentQueue: {sw.ElapsedMilliseconds} ms. Count: {concurrentQueue.Count}");
        // 预计当 NumTasks > 1 时 ConcurrentQueue 会明显更快
    }
}

结论:如果你看到代码里在集合外面套了 lock,这通常是改用 Concurrent 对应类型的信号。

6. 有用的细节

contention 对性能的影响

Contention —— 当很多线程同时访问同一资源。竞争越高,等待越多,性能越差。

Concurrent 集合为减少竞争做了设计:例如 ConcurrentBag<T> 使用线程本地存储,ConcurrentDictionary<TKey, TValue> 使用分段锁(striped locking)。

性能关键在于减少 contention:尽量把数据在线程间拆分,或使用多个集合。

为特定场景选择合适的集合

集合 顺序 何时使用 何时不使用
ConcurrentQueue<T>
FIFO(先进先出) 任务队列、日志、异步事件处理、Producer-Consumer 模型。 如果顺序不重要、需要 LIFO 或需要限制大小并阻塞时不适合。
ConcurrentStack<T>
LIFO(后进先出) 操作历史(撤销/重做)、图的深度优先遍历(DFS)、优先“最后加入”的对象池。 如果需要 FIFO 或顺序稳定性则不适合。
ConcurrentBag<T>
不保证顺序 对象池,当生产者和消费者通常是同一线程;TPL 场景下有局部性优势。 如果顺序重要则不要使用。
ConcurrentDictionary<TKey, TValue>
缓存、用户会话、统计计数、并行聚合。 如果你不需要字典就不用它。
BlockingCollection<T>(基于 ConcurrentQueue FIFO(或基于所用底层集合的顺序) 带阻塞操作和大小限制的 Producer-Consumer,方便的完成/关闭机制。 如果不需要阻塞操作或大小限制则不必使用。

7. 优化建议

避免在“热路径”频繁使用 ToArray()

ToArray() 会创建集合的完整副本——在内存和时间上都很贵。只在需要“快照”时并尽量少用。要获取数量用 Count(记住它是调用时的一个快照)。

迭代 Concurrent 集合时要小心

并发修改时迭代器不保证稳定性:可能跳过元素或得到不一致的视图。若需要稳定表示,先用 ToArray() 做快照。

// 不好:可能跳过元素或在迭代时看到变化
foreach (var item in myConcurrentQueue) { /* ... */ }

// 好:先迭代固定快照
var snapshot = myConcurrentQueue.ToArray();
foreach (var item in snapshot) { /* ... */ }

尽量减少通过集合的“流量”

把任务/数据打包:减少 Add/Take 的调用次数——就能减少潜在的 contention。比如别发 1000 条单独消息,发一个包含 1000 条的“包”。

追踪竞争来源

如果看到性能下降,量化看看哪里竞争最大。可能通过让线程操作本地数据或使用不同的集合就能改进设计。

1
调查/小测验
Concurrent-集合第 58 级,课程 4
不可用
Concurrent-集合
线程安全的集合
评论
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION