CodeGym /コース /C# SELF /Concurrent コレクション入門

Concurrent コレクション入門

C# SELF
レベル 58 , レッスン 0
使用可能

1. 問題の背景

シングルスレッドのアプリケーションでは、List<T>Dictionary<T> のようなコレクションは予測どおりに動く。でも同じコレクションに複数のスレッドが同時にアクセスし始めると、よくある問題――データ競合(race conditions)が発生するんだ。

複数のスレッドが適切な同期なしに同じコレクションを読み書きすると、次のようなことが起きる可能性があるよ:

  • 不正なデータ: あるスレッドが要素を削除している間に別のスレッドが更新しようとして、整合性が壊れる。
  • データの喪失: あるスレッドが要素を追加したのに、別のスレッドが前の書き込みを知らずに上書きしてしまう。
  • 例外: コレクションが不正な状態になって InvalidOperationException(例えば "Collection was modified; enumeration operation may not execute.") や場合によっては NullReferenceException が出ることがある。

例 1: List<T> のデータ競合(単純なインクリメント)

2つのスレッドが同時に同じリスト要素をインクリメントする例。

using System.Collections.Generic;
using System.Threading.Tasks; // Task.Run のため

class RaceConditionExample
{
    static List<int> numbers = new List<int> { 0 }; // 要素1つのリスト

    static void Main(string[] args)
    {
        Console.WriteLine("初期値: " + numbers[0]); // 0

        // numbers[0] をそれぞれインクリメントする2つのスレッドを起動
        Task task1 = Task.Run(() => IncrementNumbers(500_000));
        Task task2 = Task.Run(() => IncrementNumbers(500_000));

        Task.WaitAll(task1, task2); // 両方のスレッドの終了を待つ

        Console.WriteLine("最終値: " + numbers[0]); // 期待は 1_000_000 だが...
        // 結果はほとんどの場合 1_000_000 未満になる!
    }

    static void IncrementNumbers(int count)
    {
        for (int i = 0; i < count; i++)
        {
            // この操作 "numbers[0]++" は実際には3つのステップから成る:
            // 1. numbers[0] を読む
            // 2. 値を1増やす
            // 3. 新しい値を書き戻す
            numbers[0]++; 
        }
    }
}

なぜこれがレースなのか? もしスレッドAが numbers[0] を読み(値が 0)、その後スレッドBが numbers[0] を読み(これも 0)で、A が 1 を書き戻す前に起きると、両方のスレッドは 01 にして 1 を書き込む。結果として一つ分のインクリメントが失われる。numbers[0]++ は原子操作ではないんだ。

例 2: Dictionary を変更して InvalidOperationException

一方のスレッドが辞書を列挙している間に、別のスレッドがそれを変更すると起こる例。

using System.Collections.Generic;
using System.Threading; // Thread.Sleep のため

class DictionaryRaceExample
{
    static Dictionary<int, string> users = new Dictionary<int, string>();

    static void Main(string[] args)
    {
        // 辞書の初期化
        for (int i = 0; i < 5; i++) users.Add(i, $"ユーザー {i}");

        // 読み取りスレッド
        Thread readerThread = new Thread(() =>
        {
            try
            {
                foreach (var user in users) // 辞書を列挙
                {
                    Console.WriteLine($"リーダー: {user.Key} - {user.Value}");
                    Thread.Sleep(10); // 動作をシミュレート
                }
            }
            catch (InvalidOperationException ex)
            {
                Console.WriteLine($"リーダー: エラー! {ex.Message}");
            }
        });

        // 書き込みスレッド
        Thread writerThread = new Thread(() =>
        {
            Thread.Sleep(5); // リーダーが始める時間を少し与える
            for (int i = 5; i < 10; i++)
            {
                users.Add(i, $"新しいユーザー {i}"); // 要素を追加
                Console.WriteLine($"ライター: ユーザー {i} を追加した");
                Thread.Sleep(15);
            }
        });

        readerThread.Start();
        writerThread.Start();

        readerThread.Join(); // スレッドの終了を待つ
        writerThread.Join();
        Console.WriteLine("例は完了した。");
    }
}

なぜ例外が出るのか? Dictionary<TKey, TValue>(や List<T>)は、外部での同期なしに別スレッドから同時に読み書きされることを想定していない。書き込みスレッドが内部構造を変えると、読み取り中のスレッドは既に変更されたデータに対して foreach を続けようとして、InvalidOperationException が発生するんだ。

2. なぜ単純なブロック(lock)が常に最適ではないの?

「全部を lock で囲えばいい」という考えはシンプルだけど、欠点もある。

// 悪い例: ロックが多すぎる
// (デモ用。実際はこうしないほうがいい!)
static object _lock = new object();
static List<int> _sharedList = new List<int>();

void AddItem(int item)
{
    lock (_lock)
    {
        _sharedList.Add(item);
    }
}

int GetItemCount()
{
    lock (_lock)
    {
        return _sharedList.Count;
    }
}
  • パフォーマンス (bottleneck): lock はコレクション全体へのアクセスをブロックする。100 スレッドがあるとき、99 は 1 つが終わるのを待つことになり、たとえ操作が直接競合していなくても遅くなる。
  • 複雑さ: コレクションを使う全ての箇所で lock を忘れずに使う必要がある。1 箇所でも忘れるとデータ競合が再発する。
  • デッドロック: 異なるオブジェクトに対して複数の lock を使うと簡単に deadlock が発生する。
  • イテレータ: foreach は、別スレッドがコレクションを変更すると助けにならない。

だから .NET には専用のスレッドセーフなコレクションが用意されているんだ。

アトミックな操作

スレッドセーフなコレクション は、ユーザー側で外部ロックを取らなくても複数スレッドから同時に使って正しく動くことを保証する。ポイントは アトミック操作:操作が全体として実行されるか全く実行されないかで、他のスレッドからは「半端な状態」が見えないようにすること。

  • 追加、削除、読み取りは、まるで1つずつ順に実行されているかのように振る舞う。
  • 内部では低レベルな技術を使っている:インターリーブされた操作(Interlocked)、Compare-And-Swap (CAS)、細かいロックなどで、コレクション全体をグローバルにロックする代わりに効率的に処理している。

3. System.Collections.Concurrent の概要

名前空間 System.Collections.Concurrent は、マルチスレッド向けに最初から設計されたコレクションのセットを提供する。フィロソフィーは 最大限の並列化と最小限のロック

  • パフォーマンス: コア数が増えてもスケールするように作られている。
  • 簡単さ: 各操作の周りで手動で lock をやる必要が少ない。
  • ミスが減る: 手動同期に伴う典型的なバグを減らせる。
  • 競合に最適化: 同時追加・削除が頻発するシナリオで効率的に動く。

4. 主要なクラス

ConcurrentQueue<T>(スレッドセーフなキュー)

原則:FIFO ― 先入れ先出し。使いどころ:producer–consumer、ログ、タスクキューなど。

using System.Collections.Concurrent;

ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();

void Producer() => messageQueue.Enqueue("メッセージ 1");

void Consumer()
{
    if (messageQueue.TryDequeue(out string message))
    {
        Console.WriteLine($"処理済み: {message}");
    }
    else
    {
        Console.WriteLine("キューは空です。");
    }
}

ConcurrentStack<T>(スレッドセーフなスタック)

原則:LIFO ― 後入れ先出し。使いどころ:操作履歴、DFS 探索、オブジェクトプールなど。

using System.Collections.Concurrent;

ConcurrentStack<int> historyStack = new ConcurrentStack<int>();

void PushAction(int value) => historyStack.Push(value);

void PopAction()
{
    if (historyStack.TryPop(out int action))
    {
        Console.WriteLine($"アクションを取り消した: {action}");
    }
    else
    {
        Console.WriteLine("スタックは空です。");
    }
}

ConcurrentBag<T>(スレッドセーフな「バッグ」)

順序は保証されない非順序コレクション。自分が入れたものを自分が取ることが多いパターンに最適化されている。プールに向いている。

using System.Collections.Concurrent;

ConcurrentBag<System.Guid> objectPool = new ConcurrentBag<System.Guid>();

void AddObject() => objectPool.Add(System.Guid.NewGuid());

void TakeObject()
{
    if (objectPool.TryTake(out System.Guid obj))
    {
        Console.WriteLine($"オブジェクトを取得した: {obj}");
    }
    else
    {
        Console.WriteLine("プールは空です。");
    }
}

ConcurrentDictionary<TKey, TValue>(スレッドセーフな辞書)

キーに対する追加、更新、取得のアトミック操作をサポートする。キャッシュ、セッション、カウンタに最適。

using System.Collections.Concurrent;

ConcurrentDictionary<string, int> userScores = new ConcurrentDictionary<string, int>();

void UpdateScore(string user, int score)
{
    // 存在しなければアトミックに追加、あれば更新する
    userScores.AddOrUpdate(user, score, (key, existingVal) => existingVal + score);
    Console.WriteLine($"スコア {user}: {userScores[user]}");
}

void GetScore(string user)
{
    if (userScores.TryGetValue(user, out int score))
    {
        Console.WriteLine($"現在のスコア {user}: {score}");
    }
    else
    {
        Console.WriteLine($"ユーザー {user} が見つかりません。");
    }
}

5. いつこれらのコレクションを普通のコレクションの代わりに使うべきか?

  • アプリがマルチスレッドである: シングルスレッドなら普通のコレクションの方が速い(オーバーヘッドがない)。
  • 複数スレッドで共有する1つのコレクションがある: これは System.Collections.Concurrent を使う主なサインだよ。
  • 高いパフォーマンスとスケーラビリティが必要: これらは待ち時間を最小化するよう設計されている。
  • コードを簡潔にしたい: 各操作の周りで手動の lock を入れる必要がなくなる。
  • アトミックな操作が必要: 追加/削除/取得がコレクションを不整合にしないよう保証したいとき。

以下のときは Concurrent-コレクションを使わないほうがいい

  • アプリが完全に単一スレッドで動く場合。
  • 複数の関連操作を1つのトランザクションのように扱いたい場合(その場合は外部で同期するか、別の仕組みが必要)。
  • 取り出しの厳密な順序が必要で、コレクションがそれを保証しない場合(例えば ConcurrentBag<T> では順序保証がない)。
コメント
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION