CodeGym /課程 /C# SELF /非同步資料流

非同步資料流

C# SELF
等級 62 , 課堂 0
開放

1. 介紹

你應該已經熟悉 asyncawait。它們對單一的非同步動作很合適,例如下載一個檔案。但如果資料是以串流方式到來,或資源需要非同步釋放,該怎麼辦?

非同步「集合」的問題:想像需要從資料庫拿到數百萬筆紀錄。如果方法回傳 Task<List<T>>,你就得等到所有資料都載入記憶體。這既沒效率,又會造成延遲。如果每個元素都要以非同步方式取得,傳統同步的 IEnumerable<T> 也不合適。

同步釋放資源的問題IDisposableusing 對同步清理沒問題。但如果關閉網路連線或把緩衝寫入檔案本身是非同步的呢?你無法在同步的 Dispose() 裡用 await,這會導致線程阻塞或清理不正確。

為了解決這些問題,引入了 IAsyncEnumerable<T>IAsyncDisposable

2. 非同步資料流

IAsyncEnumerable<T>IEnumerable<T> 的非同步版本。它允許非同步地逐一產生序列元素,不需等到所有資料都準備好。

什麼時候需要?

  • 非同步逐行讀取大型檔案:例如,GB 等級的日誌檔案。
  • 從網路或資料庫串流資料:API 的結果分批到達。
  • 實作伺服器端的串流 API:例如 gRPC Streaming。
  • 任何資料以非同步方式產生或到達且需要被漸進處理的情況。

運作原理?

  1. IAsyncEnumerable<T>:有個方法 GetAsyncEnumerator(CancellationToken cancellationToken)。取消用的 token 很重要!
  2. IAsyncEnumerator<T>:有 ValueTask<bool> MoveNextAsync()(移到下一個)和 Current(目前元素)。它也繼承自 IAsyncDisposable
  3. await foreach:可以方便地迭代 IAsyncEnumerable<T>。編譯器會自動呼叫 MoveNextAsync() 和 使用 Current。重點是,await foreach 保證在迭代結束後呼叫迭代器的 DisposeAsync(),即使發生例外也會如此。

asyncyield return 建立 IAsyncEnumerable<T>

你可以在回傳 IAsyncEnumerable<T>async 方法中使用 yield return。這能建立非同步生成器。方法可以用 await 暫停產生,等候非同步操作後再繼續。

範例:簡單的非同步生成器


async IAsyncEnumerable<int> GenerateNumbersAsync()
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine($"產生中: {i}");
        await Task.Delay(100); // 模擬非同步工作
        yield return i; 
    }
}

// 使用:
async Task ConsumeAsyncNumbers()
{
    await foreach (var number in GenerateNumbersAsync())
    {
        Console.WriteLine($"取得: {number}");
    }
}
// 呼叫: await ConsumeAsyncNumbers();

範例:非同步逐行讀檔


async IAsyncEnumerable<string> ReadFileLinesAsync(string filePath)
{
    using var reader = new StreamReader(filePath); // 'using' 在這裡 (StreamReader 實作 IAsyncDisposable)
    string? line;
    while ((line = await reader.ReadLineAsync()) != null) 
    {
        yield return line;
    }
}

// 使用:
async Task ProcessFileAsync()
{
    await File.WriteAllLinesAsync("data.txt", new[] { "行 1", "行 2", "行 3" });
    await foreach (var line in ReadFileLinesAsync("data.txt")) 
    {
        Console.WriteLine($"已處理行: {line}");
    }
}
// 呼叫: await ProcessFileAsync();

範例:可取消的非同步生成器(CancellationToken


async IAsyncEnumerable<int> GetCancelableSequence(
    [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken token = default)
{
    for (int i = 0; i < 10; i++)
    {
        token.ThrowIfCancellationRequested(); // 檢查是否已取消
        await Task.Delay(200, token); // Task.Delay 也支援透過 token 取消
        yield return i;
    }
}

// 使用:
async Task ConsumeAndCancel()
{
    var cts = new CancellationTokenSource(500); // 500 毫秒後取消
    try
    {
        await foreach (var num in GetCancelableSequence(cts.Token))
        {
            Console.WriteLine($"取得: {num}");
        }
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("產生已取消!");
    }
}
// 呼叫: await ConsumeAndCancel();

屬性 [EnumeratorCancellation] 允許把 CancellationToken 傳給非同步生成器。這樣當呼叫端透過 CancellationTokenSource 要求取消時,就能取消迭代。沒有這個屬性時,token 不會自動傳到 GetAsyncEnumerator

3. 非同步資源管理

同步 IDisposable 的問題

方法 Dispose()IDisposable 裡是同步的(void Dispose())。你不能在裡面使用 await。如果關閉資料庫連線或把緩衝寫入檔案是耗時且非同步的操作,同步的 Dispose() 會阻塞執行緒,對非同步應用很不友好。

解法: IAsyncDisposable

IAsyncDisposable 解決了這個問題。它只有一個方法:ValueTask DisposeAsync() — 用於非同步清理。

await using

這是 using 的非同步版本。用於實作了 IAsyncDisposable 的物件。

  • await using 保證在宣告資源的程式區塊結束時(或因例外離開時)呼叫 DisposeAsync()
  • 可以正確地以非同步方式釋放資源,避免阻塞。

範例:基本的 IAsyncDisposableawait using


class MyAsyncResource : IAsyncDisposable
{
    public MyAsyncResource() => Console.WriteLine("資源已開啟。");
    
    public async ValueTask DisposeAsync()
    {
        Console.WriteLine("開始非同步清理...");
        await Task.Delay(200); // 模擬非同步清理
        Console.WriteLine("非同步清理完成。");
    }
}

// 使用 await using
async Task UseAndDisposeResource()
{
    await using var resource = new MyAsyncResource(); 
    Console.WriteLine("資源在使用中...");
} // 這裡會自動呼叫 resource.DisposeAsync()

// 呼叫: await UseAndDisposeResource();

範例:多個 await using 區塊


async Task UseMultipleResources()
{
    await using var res1 = new MyAsyncResource();
    await using var res2 = new MyAsyncResource();
    Console.WriteLine("同時使用兩個資源...");
} // 資源會以 LIFO (Last In, First Out) 的順序被釋放:res2.DisposeAsync() 會先被呼叫,接著是 res1.DisposeAsync()。
// 呼叫: await UseMultipleResources();

IAsyncEnumerable<T>IAsyncDisposable 的相容性

重點:IAsyncEnumerator<T>(被 await foreach 使用)本身就繼承自 IAsyncDisposable。這表示如果你的非同步生成器使用了需要非同步釋放的資源(像前面範例裡的 StreamReader),await foreach 會幫你處理:當迭代結束時,它會呼叫迭代器的 DisposeAsync()

留言
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION