CodeGym /Kurslar /C# SELF /Channel: istehsalçı–istehlakçı (

Channel: istehsalçı–istehlakçı ( Channel)

C# SELF
Səviyyə , Dərs
Mövcuddur

1. Giriş

Bugün biz yeni səviyyəyə keçirik! İndi xüsusi bir alətlə — Channel ilə tanış olmağın vaxtıdır. Bu vasitə .NET-də müasir asinxron tətbiqlər üçün hazırlanıb: adi bloklamalar ya kömək etmir, ya da performansı ciddi şəkildə aşağı salır.

Sizi Producer-Consumer patterni gözləyir — bu pattern 60-cı illərdən bəri populyarlığını itirməyib. Siz sadə asinxron "konveyer" yaradacaqsınız: burada bəzi tapşırıqlar və ya threadlər nəsə istehsal edir (məsələn, faylları yükləyir, ədədləri hesablayır, hadisələri gözləyir), başqa threadlər isə onları işləyir (məsələn, saxlayır, verilənlər bazasına yazır, UI-da göstərir).

Niyə Channel yarandı?

  1. Producer-Consumer patterni uzun müddətdir ki, növbələrlə həll edilir: istehsalçı tapşırıqları növbəyə qoyur, istehlakçı onları götürür. Amma! BlockingCollection<T>, ConcurrentQueue<T>-dən olan növbələr və ya əl ilə lock ilə sinxronizasiya — bunların hamısı asin xron deyil. Yəni thread məlumat gözləyərkən bloklanır, async/await planlayıcısına idarə qaytarılmır.
  2. Asinxronluq .NET-də modadan daha çox şeydir — bu müasir arxitekturanın əsasıdır. Elementləri gözləmək üçün threadləri bloklamaq baha başa gəlir və səmərəli deyil. Məlumatın gəlməsini bloklamadan gözləmək lazımdır — bunu Channel həll edir.
  3. Çevikliy: kanallarla mürəkkəb emal konveyerləri qurmaq, thread məntiqini ayırmaq, aralıq addımlar və yük balanslaşdırması əlavə etmək olar — və bütün bunlar aşağı səviyyəli sinxronizasiya ağrısı olmadan.

Channel nədir? (Analogiya və arxitektura)

Təsəvvür edin ki, əlinizdə estafet dərəsi (və ya konveyer lentası) var, üzərindən obyektləri bir yerdən başqa yerə ötürə bilərsiniz, və heç kim qonşu şöbədəki həmkarı ilə şəxsi görüşə ehtiyac duymur. Əsas odur ki, dərə yolunu itirməsin.

Channel — .NET-ə daxil edilmiş vasitədir ki, müxtəlif tasklar, threadlər və ya proqram hissələri arasında asinxron məlumat ötürmə təmin edir. Bu, həm yazma, həm də oxuma əməliyyatları üçün "gözləmə" dəstəyi olan asinxron növ növbəsidir.

  • İstehsalçı kanala elementlər qoyur (məsələn, emal üçün sorğular);
  • İstehlakçı elementləri götürür — və iş bitdi!

2. Channel<T> sinfi və onun strukturu

Hər şey namespaceden başlayır:

using System.Threading.Channels;

Adi kolleksiyalardan fərqli olaraq, Channelfabrikadandır, hansı ki məlumat ötürmək üçün xüsusi obyektlər yaradır.

Əsas növlər:

  • ChannelWriter<T> — "yazıcı" (istehsalçı). Yalnız element əlavə edir.
  • ChannelReader<T> — "oxuyucu" (istehlakçı). Yalnız elementləri çıxarır.
  • Channel (Channel) məsuliyyəti bölür: yazıcı oxuyucu haqqında heç nə bilmir və əksinə.

.NET-də müxtəlif kanal implementasiyaları var, hərəsinin öz xüsusiyyətləri mövcuddur: unbounded (ölçüsüz), bounded (maksimum element sayı ilə), single-producer-single-consumer (SPSC), multi-producer-multi-consumer (MPMC) və s. Biz ən ümumi ilə başlayacağıq.

Sadə nümunə: asinxron task növbəsi

using System;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        // Ölçüsüz kanal yaradırıq
        var channel = Channel.CreateUnbounded<int>();

        // İstehsalçı task
        var producer = Task.Run(async () =>
        {
            for (int i = 0; i < 10; i++)
            {
                Console.WriteLine($"İstehsalçı: Kanalə qoyur {i}");
                await channel.Writer.WriteAsync(i); // Asinxron yazı!
                await Task.Delay(100); // İşin imitasiya edilməsi
            }
            channel.Writer.Complete(); // Daha yazılmayacağını bildiririk
        });

        // İstehlakçı task
        var consumer = Task.Run(async () =>
        {
            await foreach (var item in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"İstehlakçı: Kanaldən aldı {item}");
                await Task.Delay(200); // Emalın imitasiya edilməsi
            }
            Console.WriteLine("İstehlakçı: Kanal bağlandı");
        });

        await Task.WhenAll(producer, consumer);
    }
}

Burada nə baş verir?

  • Channel.CreateUnbounded<int>() — ölçüsüz kanal yaradırıq.
  • İstehsalçı 0-dan 9-a qədər ədədləri WriteAsync ilə kanala yazır.
  • Yazı bitdikdən sonra Complete() çağırılır — "Daha element olmayacaq!" siqnalı.
  • İstehlakçı ReadAllAsync() ilə bütün elementləri asinxron olaraq götürür, kanal bağlananadək davam edir.
  • Task.Delay gecikmələri real işin olduğunu imitasiya edir: görürsən ki, ədədlər yazmaq oxumaqdan tez ola bilər.

3. Niyə bütün bunlar asinxrondur?

Adi bloklayan növbələr (məsələn, BlockingCollection və ya lock ilə qorunanlar) yalnız thread-i bloklayır. Bu o deməkdir ki, əgər çoxlu tasklar varsa və ya maksimum performans istəyirsənsə, resursları itirirsən.

Kanallarla:

  • Əgər istehsalçı daha sürətlidirsə, kanal elementləri toplayacaq (sərbəst yaddaş və ya göstərilmiş maksimum ölçü ilə məhdudlaşır).
  • Əgər istehlakçı daha sürətlidirsə, o zaman o, nəsə görünənə qədər gözləyəcək (və thread-i bloklamayacaq, planlayıcıya qaytaracaq).

Bu, istehsalçıların və istehlakçıların hansının daha sürətli olacağını əvvəlcədən bilmədiyiniz ssenarilər üçün idealidir.

Real həyat tətbiqləri

  • Asinxron logging: mesajların fayla/DB-yə yazılması ayrılmış taskda edilir;
  • Web sorğularının işlənməsi: bir task bir neçə səhifəni yükləyir, başqa task onların məzmununu analiz edir;
  • Qovluqların skan edilməsi və indeksləşdirilməsi: bir sıra tasklar fayl sistemini gəzir, digərləri fayllar üzrə statistika hesablayır;
  • Mürəkkəb data pipeline-lar: məsələn, ETL işlərində bir mərhələ xammalı yarımfabrikatlara çevirir, başqa mərhələ isə onları son məhsula çevirir.

4. Məhdudlaşdırılmış kanal (Bounded Channel)

"Limitsiz" kanallar əyləncəlidir, amma yaddaşımız sonsuz deyil (həm də kompüteriniz böyük görsə də).

Məhdudlaşdırılmış kanal (bounded) eyni anda kanalda ola biləcək maksimum element sayını təyin etməyə imkan verir. Əgər kanal doludursa — istehsalçı bir şey çıxarılana qədər gözləyəcək.

Nümunə:

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(3)
{
    FullMode = BoundedChannelFullMode.Wait // (default) - boş yer yaranana qədər gözlə
});

Burada yalnız üç element eyni anda kanalda ola bilər. İstehsalçı dördüncünü yazmağa çalışsa — o gözləyəcək.

Bir neçə istehsalçı və istehlakçı

var channel = Channel.CreateUnbounded<int>();

// 2 istehsalçı
for (int producerId = 0; producerId < 2; producerId++)
{
    Task.Run(async () =>
    {
        for (int i = 0; i < 5; i++)
        {
            int value = producerId * 100 + i;
            Console.WriteLine($"İstehsalçı {producerId}: qoyur {value}");
            await channel.Writer.WriteAsync(value);
            await Task.Delay(50);
        }
        // Hər istehsalçı Complete() çağırır — təhlükəlidir!
    });
}
// Fənd: Complete() yalnız BİR dəfə, bütün istehsalçılar bitəndən sonra çağırılmalıdır.
// Nümunə üçün bir istehlakçı taskı qoyaq:
Task.Run(async () =>
{
    await foreach (var item in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"İstehlakçı aldı {item}");
        await Task.Delay(100);
    }
});

DİQQƏT! Kanal yalnız bütün istehsalçılar işini bitirdikdən sonra (məsələn, sayğac və ya Task.WhenAll ilə) bağlanmalıdır (Complete()). Əks halda, kimsə yazmağa çalışdıqda kanal artıq bağlı ola bilər və istisna alınacaq.

5. Praktika: Kanal vasitəsilə şəkillərin emalı

Gəlin işi bir az çətinləşdirək! Tutaq ki, bizdə şəkillərdən ibarət qovluq var. Bir task şəkilləri tapıb onların yollarını kanala qoyur, başqa task isə həmin fayl yollarını götürüb onlarla faydalı iş görür (məsələn, ölçünü hesablayır və ya konvert edir).

Qeyd: Sadəlik üçün nümunə fayl adları ilə işləyəcək (şəkillərlə birbaşa iş olmayacaq), amma məntiq eynidir.

using System;
using System.IO;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        var channel = Channel.CreateBounded<string>(5);

        // İstehsalçı: qovluqda .jpg fayllarını axtarır
        var producer = Task.Run(async () =>
        {
            foreach (var file in Directory.EnumerateFiles(@"images", "*.jpg"))
            {
                await channel.Writer.WriteAsync(file);
                Console.WriteLine($"Növbəyə əlavə olundu: {file}");
                await Task.Delay(50); // axtarış gecikməsini imitasiya edirik
            }
            channel.Writer.Complete(); // növbənin sonu
        });

        // İstehlakçı: faylları oxuyur və "emal" edir
        var consumer = Task.Run(async () =>
        {
            await foreach (var file in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"Fayl emalı: {file}");
                await Task.Delay(200); // emalın imitasiya edilməsi
            }
            Console.WriteLine("Bütün şəkillər emal olundu!");
        });

        await Task.WhenAll(producer, consumer);
    }
}

6. Channel-un konfiqurasiyası: seçimlər və incəliklər

Kanallar yaradılarkən müxtəlif seçimlərlə tənzimlənə bilər — bounded kanallar üçün əsas parametrlər:

Seçim Təsvir
Capacity
Kanala eyni anda daxil ola biləcək maksimum elementlərin sayı
SingleWriter
true — yalnız bir istehsalçı varsa (performansı sürətləndirir)
SingleReader
true — yalnız bir istehlakçı varsa (performansı sürətləndirir)
FullMode
Əgər kanal doludursa nə etmək lazımdır? Mümkün dəyərlər: Wait, DropWrite, DropOldest, DropNewest

Opsiyalı nümunə:

var options = new BoundedChannelOptions(10)
{
    SingleWriter = false,
    SingleReader = true,
    FullMode = BoundedChannelFullMode.Wait
};
var channel = Channel.CreateBounded<string>(options);

7. Asinxron metodlar: ReadAsync, WriteAsync, ReadAllAsync

Niyə async vacibdir?

WriteAsyncReadAsync metodları thread-i bloklamır! Əgər oxumaq üçün heç nə yoxdursa — task pauza qoyulur və thread digər işlər üçün azad edilir. Bu server və UI tətbiqlərində xüsusilə vacibdir, çünki əlavə bloklama "freez"ə səbəb ola bilər.

ReadAllAsync — müasir C# rahatlığı

Asinxron iterasiya etmək olar:

await foreach (var item in channel.Reader.ReadAllAsync())
{
    // item ilə işləyirik
}

Channel<T> və multi-thread-safe kolleksiyalar: fərq nədir?

ConcurrentQueue<T>/BlockingCollection<T> thread-lər üçün yaxşıdır, amma tam asinxron (await) ssenarilər üçün ideal deyil.

Channel<T> xüsusi olaraq asinxron pipeline tətbiqləri üçün dizayn edilib. Thread-safety baxımından hər iki yanaşma öz işini görür, amma kanallar C#-ın müasir imkanları ilə — məsələn, IAsyncEnumerable və s. — daha yaxşı inteqrasiya olunur və çevikdir.

8. Səhvlər və tipik tələlər

Yazıcıda Complete() çağırmağı unutmayın — əks halda istehlakçı yeni elementləri gözləyərək əbədi asılı qala bilər!

Bir neçə yazıcı varsa, Complete()-i bir neçə dəfə çağırmayın — bunu yalnız bütün istehsalçılar tamamlandıqdan sonra edin.

Kanal bağlandıqdan sonra artıq yazmaq olmaz, amma qalan elementləri oxumaq olar.

Eyni vaxtda yazma zamanı race condition: əgər kanal bağlanıbsa və kiminsə hələ yazmağa cəhdi varsa — istisna alacaqsınız.

Şərhlər
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION