CodeGym /Kurslar /C# SELF /Concurrent-kolleksiyaların inkişaf etmiş patternləri və x...

Concurrent-kolleksiyaların inkişaf etmiş patternləri və xüsusiyyətləri Concurrent-kolleksiyaları

C# SELF
Səviyyə , Dərs
Mövcuddur

1. «Producer‑Consumer» + Concurrent

Biz artıq ConcurrentQueue barədə danışanda «Producer‑Consumer» patterninə toxunmuşduq. Gəlin bunu bir neçə producer və consumer vəziyyəti üçün və düzgün bitirmə siqnalizasiyası ilə daha ətraflı nəzərdən keçirək.

ConcurrentQueue-un (və digər Concurrent kolleksiyaların) əsas üstünlüyü odur ki, onlar özləri thread-safe davranışı təmin edir. Enqueue və ya TryDequeue-nu lock-a bükməyə ehtiyac yoxdur — bir neçə thread ortaq queue ilə təhlükəsiz işləyə bilər.

Nümunə: Bir neçə producer və bir neçə consumer

Bəzi worker threadlər tasklar yaradır, digər threadlər isə onları emal edir.

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();
CancellationTokenSource cts = new CancellationTokenSource(); // Consumer-ları dayandırmaq üçün

// Producer üçün metod
void Producer(string name, int count)
{
    for (int i = 0; i < count; i++)
    {
        string task = $"Zadacha_{name}_{i}";
        taskQueue.Enqueue(task);
        Console.WriteLine($"[P:{name}] Əlavə etdi: {task}");
        Thread.Sleep(10); 
    }
}

// Consumer üçün metod
void Consumer(string name)
{
    while (!cts.Token.IsCancellationRequested || taskQueue.Count > 0)
    {
        if (taskQueue.TryDequeue(out string task))
        {
            Console.WriteLine($"[C:{name}] Emal etdi: {task}");
            Thread.Sleep(20); 
        }
        else
        {
            Thread.Sleep(50); // Queue boşdursa gözləyirik
        }
    }
    Console.WriteLine($"[C:{name}] İşini bitirdi.");
}

// Main-də nümunənin işə salınması:
Task.Run(() => Producer("A", 10)); // Producer A
Task.Run(() => Producer("B", 10)); // Producer B
Task.Run(() => Consumer("1"));    // Consumer 1
Task.Run(() => Consumer("2"));    // Consumer 2

Thread.Sleep(1000); // İş üçün vaxt veririk
cts.Cancel();       // Consumer-lara bitirmə siqnalı
Thread.Sleep(500); // Qalanları götürüb bitirmələri üçün vaxt veririk

Burada bir neçə producer və consumer eyni ConcurrentQueue-la eyni vaxtda konflikt olmadan işləyir: EnqueueTryDequeue atomikdir.

İşin bitməsi siqnallarının önəmi (CancellationTokenSource)

Biz consumer-lara bitirmə ehtiyacını bildirmək üçün CancellationTokenSource (cts) istifadə edirik. Bu Producer‑Consumer patterni üçün kritikdir:

  • Producer-lər işini bitirdi. Element əlavə etmə bitəndə consumer-lər sonsuz boş queue-dan gözləməməlidir.
  • Tətbiq dayandırılır. Consumer-ləri düzgün şəkildə söndürmək lazımdır.

CancellationTokenSourceCancellationToken standart mexanizmdir: consumer dövri olaraq IsCancellationRequested-ə baxır və lazım olarsa ThrowIfCancellationRequested() çağırır.

2. BlockingCollection<T>

Hərçənd ConcurrentQueue<T> Producer‑Consumer üçün yaxşı uyğundur, boş queue halında əl ilə gözləmə və bitirmə siqnalizasiyası tələb edir. Daha rahat implementasiya üçün .NET-də BlockingCollection<T> var — bu müstəqil kolleksiya deyil, istənilən IProducerConsumerCollection<T>-nin (məsələn, ConcurrentQueue) əhatəedicisidir.

BlockingCollection-un üstünlükləri:

  • Blocking əməliyyatlar. Add()/Take() collection dolu/boş olduqda thread-i bloklayır. IsEmpty-i əl ilə yoxlamağa ehtiyac yoxdur.
  • Ölçü məhdudiyyəti. Capacitiy təyin etmək olar. Add() limitə çatanda bloklanır — yaddaş üçün nəzarət üçün faydalıdır.
  • Asan finalizasiya. CompleteAdding() əlavə etmənin bitdiyini bildirir, GetConsumingEnumerable() isə consumer-ə elementləri tam bitənə qədər işləməyə imkan verir.

Nümunə: BlockingCollection<T> ilə Producer‑Consumer

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

// BlockingCollection default olaraq ConcurrentQueue istifadə edir
BlockingCollection<int> numbers = new BlockingCollection<int>(capacity: 10); // Limit 10 olan queue

void ProducerBC(int count)
{
    for (int i = 0; i < count; i++)
    {
        numbers.Add(i); // Queue doludursa bloklanır
        Console.WriteLine($"[P] Əlavə etdi: {i}");
        Thread.Sleep(50);
    }
    numbers.CompleteAdding(); // Producer-in əlavə etməyi bitirdiyini bildiririk
    Console.WriteLine("[P] Producer əlavə etməni bitirdi.");
}

void ConsumerBC()
{
    // GetConsumingEnumerable elementlər olduğu müddətcə və ya CompleteAdding çağırılana qədər bloklayır
    foreach (var item in numbers.GetConsumingEnumerable())
    {
        Console.WriteLine($"[C] Emal etdi: {item}");
        Thread.Sleep(100);
    }
    Console.WriteLine("[C] Consumer işi bitirdi.");
}

// Main-də nümunənin işə salınması:
Task producerTask = Task.Run(() => ProducerBC(15)); // 15 element, limit 10
Task consumerTask = Task.Run(ConsumerBC);
Task.WaitAll(producerTask, consumerTask); // Bitməsini gözləyirik

Consumer kodunun GetConsumingEnumerable() sayəsində nə qədər daha təmiz olduğunu görürsünüz. Blocking əməliyyatlar və ölçü məhdudiyyəti lazım olarsa — BlockingCollection yaxşı seçimdir.

3. Concurrent-kolleksiyaların əlavə metodları və xüsusiyyətləri

IsEmpty, Count

  • IsEmpty (bool): kolleksiya boşdurmu.
  • Count (int): cari elementlərin sayı.

Nümunə: IsEmptyCount-dan istifadə

using System.Collections.Concurrent;

ConcurrentQueue<string> q = new ConcurrentQueue<string>();
Console.WriteLine($"Queue boşdur? {q.IsEmpty}"); // True

q.Enqueue("A");
q.Enqueue("B");
Console.WriteLine($"Queue-dakı elementlər sayı: {q.Count}"); // 2
Console.WriteLine($"Queue boşdur? {q.IsEmpty}"); // False

q.TryDequeue(out var itemA);
Console.WriteLine($"Queue-dakı elementlər sayı: {q.Count}"); // 1

Massivə çevirmə (ToArray())

Bütün Concurrent-kolleksiyalar ToArray() metodunu təqdim edir, bu metod elementlərin anlıq snapshot-unu qaytarır.

Nümunə: ToArray()-dan istifadə

using System.Collections.Concurrent;

ConcurrentStack<int> s = new ConcurrentStack<int>();
s.Push(10);
s.Push(20);
s.Push(30);

int[] items = s.ToArray(); // Yeni massiv yaradacaq: [30, 20, 10] (stack üçün LIFO)
Console.WriteLine($"Massivdəki elementlər: {string.Join(", ", items)}");

// Kolleksiya dəyişmir
Console.WriteLine($"ToArray-dan sonra stack-də elementlərin sayı: {s.Count}"); // 3

Kolleksiyaların təmizlənməsi

.NET 6+ da bir çox Concurrent-kolleksiyalara bütün elementləri silən Clear() metodu əlavə olunub.

Nümunə: Kolleksiyanın təmizlənməsi

using System.Collections.Concurrent;

ConcurrentBag<string> bag = new ConcurrentBag<string>();
bag.Add("Alpha");
bag.Add("Beta");
Console.WriteLine($"Bag-də elementlərin sayı: {bag.Count}"); // 2

bag.Clear(); // Kolleksiyanı təmizləyirik
Console.WriteLine($"Təmizlədikdən sonra Bag-də elementlərin sayı: {bag.Count}"); // 0
Console.WriteLine($"Bag boşdur? {bag.IsEmpty}"); // True

4. Concurrent-kolleksiyaların davranış xüsusiyyətləri

Anlıq snapshot prinsiplərini unutmayın. Count kimi xassələr və ToArray()-un nəticələri kolleksiyanın konkret bir anındakı vəziyyəti əks etdirir. Paralel dəyişikliklər şəraitində bu dəyər dərhal köhnəlmiş ola bilər.

Nümunə: CountToArray() — "anlıq snapshot"

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> snapshotQueue = new ConcurrentQueue<int>();

void AddItemsContinuously()
{
    for (int i = 0; i < 1000; i++)
    {
        snapshotQueue.Enqueue(i);
        Thread.Sleep(1); 
    }
}

// Main-də nümunənin işə salınması:
Task.Run(AddItemsContinuously); // Davamlı element əlavə edən thread

Thread.Sleep(100); // Biraz vaxt veririk
Console.WriteLine($"Cari Count: {snapshotQueue.Count}"); // 50, 80, 120... ola bilər
Thread.Sleep(100);
Console.WriteLine($"Cari Count yenə: {snapshotQueue.Count}"); // Başqa dəyər olacaq
int[] currentItems = snapshotQueue.ToArray();
Console.WriteLine($"ToArray()-dakı element sayı: {currentItems.Length}"); // Son Count-dan fərqli ola bilər

Aktiv dəyişikliklər zamanı Count-a sərt zəmanət kimi etibar etməyin.

5. Concurrent-kolleksiyalara iterasiya nüansları

Fərdi əməliyyatlar (Add, TryTake, TryPop, GetOrAdd və s.) thread-safe-dir. Amma kolleksiya paralel olaraq digər threadlər tərəfindən dəyişdirilirsə, foreach ilə iterasiya edərkən bütün elementləri görəcəyinizə və ya yalnız sabit dəst görəcəyinizə zəmanət verilmir — elementlər ötürülə, itə və ya gözlənilməz ola bilər.

Nümunə: Dəyişmə zamanı iterasiya

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> iterQueue = new ConcurrentQueue<int>();

// Başlanğıc elementlər əlavə edirik
for (int i = 0; i < 10; i++) iterQueue.Enqueue(i);

// Modifier thread
void Modifier()
{
    for (int i = 10; i < 20; i++)
    {
        iterQueue.Enqueue(i); // Yeni elementlər əlavə edilir
        Thread.Sleep(50);
    }
}

// Iterator thread
void Iterator()
{
    Console.WriteLine("Iterasiyaya başlayırıq...");
    int count = 0;
    foreach (var item in iterQueue) // Iterasiya edirik
    {
        Console.Write($"{item} ");
        count++;
        Thread.Sleep(30); // İşin simulyasiyası, modifier-in kolleksiyanı dəyişməsi üçün vaxt verir
    }
    Console.WriteLine($"\nIterasiya bitdi. Oxunan elementlər: {count}.");
    Console.WriteLine($"Queue-də hazırkı element sayı: {iterQueue.Count}");
}

// Main-də nümunənin işə salınması:
Task.Run(Modifier);
Task.Run(Iterator);
Thread.Sleep(1500); // İş üçün vaxt veririk

Qayda: əgər fix edilmiş element dəsti lazımdırsa (məsələn, hesabat üçün), əvvəlcə ToArray() ilə "snapshot" götürün, sonra onun üzərində iterasiya edin:

// Əgər kolleksiya dəyişə bilərsə, iterasiya üçün doğru yol
int[] snapshot = iterQueue.ToArray();
foreach (var item in snapshot)
{
    // İndi dəyişməz massiv-snapshot üzərində iterasiya edirsiniz
}

Bu, Concurrent-kolleksiyaların inkişaf etmiş patternləri və xüsusiyyətlərinə girişimizi yekunlaşdırır: biz bir neçə iştirakçılı Producer‑Consumer və BlockingCollection-u, həmçinin Count, ToArray() və iterasiya ilə bağlı vacib nüansları nəzərdən keçirdik.

Şərhlər
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION