1. «Producer‑Consumer» + Concurrent
Biz artıq ConcurrentQueue barədə danışanda «Producer‑Consumer» patterninə toxunmuşduq. Gəlin bunu bir neçə producer və consumer vəziyyəti üçün və düzgün bitirmə siqnalizasiyası ilə daha ətraflı nəzərdən keçirək.
ConcurrentQueue-un (və digər Concurrent kolleksiyaların) əsas üstünlüyü odur ki, onlar özləri thread-safe davranışı təmin edir. Enqueue və ya TryDequeue-nu lock-a bükməyə ehtiyac yoxdur — bir neçə thread ortaq queue ilə təhlükəsiz işləyə bilər.
Nümunə: Bir neçə producer və bir neçə consumer
Bəzi worker threadlər tasklar yaradır, digər threadlər isə onları emal edir.
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();
CancellationTokenSource cts = new CancellationTokenSource(); // Consumer-ları dayandırmaq üçün
// Producer üçün metod
void Producer(string name, int count)
{
for (int i = 0; i < count; i++)
{
string task = $"Zadacha_{name}_{i}";
taskQueue.Enqueue(task);
Console.WriteLine($"[P:{name}] Əlavə etdi: {task}");
Thread.Sleep(10);
}
}
// Consumer üçün metod
void Consumer(string name)
{
while (!cts.Token.IsCancellationRequested || taskQueue.Count > 0)
{
if (taskQueue.TryDequeue(out string task))
{
Console.WriteLine($"[C:{name}] Emal etdi: {task}");
Thread.Sleep(20);
}
else
{
Thread.Sleep(50); // Queue boşdursa gözləyirik
}
}
Console.WriteLine($"[C:{name}] İşini bitirdi.");
}
// Main-də nümunənin işə salınması:
Task.Run(() => Producer("A", 10)); // Producer A
Task.Run(() => Producer("B", 10)); // Producer B
Task.Run(() => Consumer("1")); // Consumer 1
Task.Run(() => Consumer("2")); // Consumer 2
Thread.Sleep(1000); // İş üçün vaxt veririk
cts.Cancel(); // Consumer-lara bitirmə siqnalı
Thread.Sleep(500); // Qalanları götürüb bitirmələri üçün vaxt veririk
Burada bir neçə producer və consumer eyni ConcurrentQueue-la eyni vaxtda konflikt olmadan işləyir: Enqueue və TryDequeue atomikdir.
İşin bitməsi siqnallarının önəmi (CancellationTokenSource)
Biz consumer-lara bitirmə ehtiyacını bildirmək üçün CancellationTokenSource (cts) istifadə edirik. Bu Producer‑Consumer patterni üçün kritikdir:
- Producer-lər işini bitirdi. Element əlavə etmə bitəndə consumer-lər sonsuz boş queue-dan gözləməməlidir.
- Tətbiq dayandırılır. Consumer-ləri düzgün şəkildə söndürmək lazımdır.
CancellationTokenSource və CancellationToken standart mexanizmdir: consumer dövri olaraq IsCancellationRequested-ə baxır və lazım olarsa ThrowIfCancellationRequested() çağırır.
2. BlockingCollection<T>
Hərçənd ConcurrentQueue<T> Producer‑Consumer üçün yaxşı uyğundur, boş queue halında əl ilə gözləmə və bitirmə siqnalizasiyası tələb edir. Daha rahat implementasiya üçün .NET-də BlockingCollection<T> var — bu müstəqil kolleksiya deyil, istənilən IProducerConsumerCollection<T>-nin (məsələn, ConcurrentQueue) əhatəedicisidir.
BlockingCollection-un üstünlükləri:
- Blocking əməliyyatlar. Add()/Take() collection dolu/boş olduqda thread-i bloklayır. IsEmpty-i əl ilə yoxlamağa ehtiyac yoxdur.
- Ölçü məhdudiyyəti. Capacitiy təyin etmək olar. Add() limitə çatanda bloklanır — yaddaş üçün nəzarət üçün faydalıdır.
- Asan finalizasiya. CompleteAdding() əlavə etmənin bitdiyini bildirir, GetConsumingEnumerable() isə consumer-ə elementləri tam bitənə qədər işləməyə imkan verir.
Nümunə: BlockingCollection<T> ilə Producer‑Consumer
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
// BlockingCollection default olaraq ConcurrentQueue istifadə edir
BlockingCollection<int> numbers = new BlockingCollection<int>(capacity: 10); // Limit 10 olan queue
void ProducerBC(int count)
{
for (int i = 0; i < count; i++)
{
numbers.Add(i); // Queue doludursa bloklanır
Console.WriteLine($"[P] Əlavə etdi: {i}");
Thread.Sleep(50);
}
numbers.CompleteAdding(); // Producer-in əlavə etməyi bitirdiyini bildiririk
Console.WriteLine("[P] Producer əlavə etməni bitirdi.");
}
void ConsumerBC()
{
// GetConsumingEnumerable elementlər olduğu müddətcə və ya CompleteAdding çağırılana qədər bloklayır
foreach (var item in numbers.GetConsumingEnumerable())
{
Console.WriteLine($"[C] Emal etdi: {item}");
Thread.Sleep(100);
}
Console.WriteLine("[C] Consumer işi bitirdi.");
}
// Main-də nümunənin işə salınması:
Task producerTask = Task.Run(() => ProducerBC(15)); // 15 element, limit 10
Task consumerTask = Task.Run(ConsumerBC);
Task.WaitAll(producerTask, consumerTask); // Bitməsini gözləyirik
Consumer kodunun GetConsumingEnumerable() sayəsində nə qədər daha təmiz olduğunu görürsünüz. Blocking əməliyyatlar və ölçü məhdudiyyəti lazım olarsa — BlockingCollection yaxşı seçimdir.
3. Concurrent-kolleksiyaların əlavə metodları və xüsusiyyətləri
IsEmpty, Count
- IsEmpty (bool): kolleksiya boşdurmu.
- Count (int): cari elementlərin sayı.
Nümunə: IsEmpty və Count-dan istifadə
using System.Collections.Concurrent;
ConcurrentQueue<string> q = new ConcurrentQueue<string>();
Console.WriteLine($"Queue boşdur? {q.IsEmpty}"); // True
q.Enqueue("A");
q.Enqueue("B");
Console.WriteLine($"Queue-dakı elementlər sayı: {q.Count}"); // 2
Console.WriteLine($"Queue boşdur? {q.IsEmpty}"); // False
q.TryDequeue(out var itemA);
Console.WriteLine($"Queue-dakı elementlər sayı: {q.Count}"); // 1
Massivə çevirmə (ToArray())
Bütün Concurrent-kolleksiyalar ToArray() metodunu təqdim edir, bu metod elementlərin anlıq snapshot-unu qaytarır.
Nümunə: ToArray()-dan istifadə
using System.Collections.Concurrent;
ConcurrentStack<int> s = new ConcurrentStack<int>();
s.Push(10);
s.Push(20);
s.Push(30);
int[] items = s.ToArray(); // Yeni massiv yaradacaq: [30, 20, 10] (stack üçün LIFO)
Console.WriteLine($"Massivdəki elementlər: {string.Join(", ", items)}");
// Kolleksiya dəyişmir
Console.WriteLine($"ToArray-dan sonra stack-də elementlərin sayı: {s.Count}"); // 3
Kolleksiyaların təmizlənməsi
.NET 6+ da bir çox Concurrent-kolleksiyalara bütün elementləri silən Clear() metodu əlavə olunub.
Nümunə: Kolleksiyanın təmizlənməsi
using System.Collections.Concurrent;
ConcurrentBag<string> bag = new ConcurrentBag<string>();
bag.Add("Alpha");
bag.Add("Beta");
Console.WriteLine($"Bag-də elementlərin sayı: {bag.Count}"); // 2
bag.Clear(); // Kolleksiyanı təmizləyirik
Console.WriteLine($"Təmizlədikdən sonra Bag-də elementlərin sayı: {bag.Count}"); // 0
Console.WriteLine($"Bag boşdur? {bag.IsEmpty}"); // True
4. Concurrent-kolleksiyaların davranış xüsusiyyətləri
Anlıq snapshot prinsiplərini unutmayın. Count kimi xassələr və ToArray()-un nəticələri kolleksiyanın konkret bir anındakı vəziyyəti əks etdirir. Paralel dəyişikliklər şəraitində bu dəyər dərhal köhnəlmiş ola bilər.
Nümunə: Count və ToArray() — "anlıq snapshot"
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
ConcurrentQueue<int> snapshotQueue = new ConcurrentQueue<int>();
void AddItemsContinuously()
{
for (int i = 0; i < 1000; i++)
{
snapshotQueue.Enqueue(i);
Thread.Sleep(1);
}
}
// Main-də nümunənin işə salınması:
Task.Run(AddItemsContinuously); // Davamlı element əlavə edən thread
Thread.Sleep(100); // Biraz vaxt veririk
Console.WriteLine($"Cari Count: {snapshotQueue.Count}"); // 50, 80, 120... ola bilər
Thread.Sleep(100);
Console.WriteLine($"Cari Count yenə: {snapshotQueue.Count}"); // Başqa dəyər olacaq
int[] currentItems = snapshotQueue.ToArray();
Console.WriteLine($"ToArray()-dakı element sayı: {currentItems.Length}"); // Son Count-dan fərqli ola bilər
Aktiv dəyişikliklər zamanı Count-a sərt zəmanət kimi etibar etməyin.
5. Concurrent-kolleksiyalara iterasiya nüansları
Fərdi əməliyyatlar (Add, TryTake, TryPop, GetOrAdd və s.) thread-safe-dir. Amma kolleksiya paralel olaraq digər threadlər tərəfindən dəyişdirilirsə, foreach ilə iterasiya edərkən bütün elementləri görəcəyinizə və ya yalnız sabit dəst görəcəyinizə zəmanət verilmir — elementlər ötürülə, itə və ya gözlənilməz ola bilər.
Nümunə: Dəyişmə zamanı iterasiya
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
ConcurrentQueue<int> iterQueue = new ConcurrentQueue<int>();
// Başlanğıc elementlər əlavə edirik
for (int i = 0; i < 10; i++) iterQueue.Enqueue(i);
// Modifier thread
void Modifier()
{
for (int i = 10; i < 20; i++)
{
iterQueue.Enqueue(i); // Yeni elementlər əlavə edilir
Thread.Sleep(50);
}
}
// Iterator thread
void Iterator()
{
Console.WriteLine("Iterasiyaya başlayırıq...");
int count = 0;
foreach (var item in iterQueue) // Iterasiya edirik
{
Console.Write($"{item} ");
count++;
Thread.Sleep(30); // İşin simulyasiyası, modifier-in kolleksiyanı dəyişməsi üçün vaxt verir
}
Console.WriteLine($"\nIterasiya bitdi. Oxunan elementlər: {count}.");
Console.WriteLine($"Queue-də hazırkı element sayı: {iterQueue.Count}");
}
// Main-də nümunənin işə salınması:
Task.Run(Modifier);
Task.Run(Iterator);
Thread.Sleep(1500); // İş üçün vaxt veririk
Qayda: əgər fix edilmiş element dəsti lazımdırsa (məsələn, hesabat üçün), əvvəlcə ToArray() ilə "snapshot" götürün, sonra onun üzərində iterasiya edin:
// Əgər kolleksiya dəyişə bilərsə, iterasiya üçün doğru yol
int[] snapshot = iterQueue.ToArray();
foreach (var item in snapshot)
{
// İndi dəyişməz massiv-snapshot üzərində iterasiya edirsiniz
}
Bu, Concurrent-kolleksiyaların inkişaf etmiş patternləri və xüsusiyyətlərinə girişimizi yekunlaşdırır: biz bir neçə iştirakçılı Producer‑Consumer və BlockingCollection-u, həmçinin Count, ToArray() və iterasiya ilə bağlı vacib nüansları nəzərdən keçirdik.
GO TO FULL VERSION