1. 介紹
你應該已經熟悉 async 和 await。它們對單一的非同步動作很合適,例如下載一個檔案。但如果資料是以串流方式到來,或資源需要非同步釋放,該怎麼辦?
非同步「集合」的問題:想像需要從資料庫拿到數百萬筆紀錄。如果方法回傳 Task<List<T>>,你就得等到所有資料都載入記憶體。這既沒效率,又會造成延遲。如果每個元素都要以非同步方式取得,傳統同步的 IEnumerable<T> 也不合適。
同步釋放資源的問題:IDisposable 和 using 對同步清理沒問題。但如果關閉網路連線或把緩衝寫入檔案本身是非同步的呢?你無法在同步的 Dispose() 裡用 await,這會導致線程阻塞或清理不正確。
為了解決這些問題,引入了 IAsyncEnumerable<T> 和 IAsyncDisposable。
2. 非同步資料流
IAsyncEnumerable<T> 是 IEnumerable<T> 的非同步版本。它允許非同步地逐一產生序列元素,不需等到所有資料都準備好。
什麼時候需要?
- 非同步逐行讀取大型檔案:例如,GB 等級的日誌檔案。
- 從網路或資料庫串流資料:API 的結果分批到達。
- 實作伺服器端的串流 API:例如 gRPC Streaming。
- 任何資料以非同步方式產生或到達且需要被漸進處理的情況。
運作原理?
- IAsyncEnumerable<T>:有個方法 GetAsyncEnumerator(CancellationToken cancellationToken)。取消用的 token 很重要!
- IAsyncEnumerator<T>:有 ValueTask<bool> MoveNextAsync()(移到下一個)和 Current(目前元素)。它也繼承自 IAsyncDisposable。
- await foreach:可以方便地迭代 IAsyncEnumerable<T>。編譯器會自動呼叫 MoveNextAsync() 和 使用 Current。重點是,await foreach 保證在迭代結束後呼叫迭代器的 DisposeAsync(),即使發生例外也會如此。
用 async 和 yield return 建立 IAsyncEnumerable<T>
你可以在回傳 IAsyncEnumerable<T> 的 async 方法中使用 yield return。這能建立非同步生成器。方法可以用 await 暫停產生,等候非同步操作後再繼續。
範例:簡單的非同步生成器
async IAsyncEnumerable<int> GenerateNumbersAsync()
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine($"產生中: {i}");
await Task.Delay(100); // 模擬非同步工作
yield return i;
}
}
// 使用:
async Task ConsumeAsyncNumbers()
{
await foreach (var number in GenerateNumbersAsync())
{
Console.WriteLine($"取得: {number}");
}
}
// 呼叫: await ConsumeAsyncNumbers();
範例:非同步逐行讀檔
async IAsyncEnumerable<string> ReadFileLinesAsync(string filePath)
{
using var reader = new StreamReader(filePath); // 'using' 在這裡 (StreamReader 實作 IAsyncDisposable)
string? line;
while ((line = await reader.ReadLineAsync()) != null)
{
yield return line;
}
}
// 使用:
async Task ProcessFileAsync()
{
await File.WriteAllLinesAsync("data.txt", new[] { "行 1", "行 2", "行 3" });
await foreach (var line in ReadFileLinesAsync("data.txt"))
{
Console.WriteLine($"已處理行: {line}");
}
}
// 呼叫: await ProcessFileAsync();
範例:可取消的非同步生成器(CancellationToken)
async IAsyncEnumerable<int> GetCancelableSequence(
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken token = default)
{
for (int i = 0; i < 10; i++)
{
token.ThrowIfCancellationRequested(); // 檢查是否已取消
await Task.Delay(200, token); // Task.Delay 也支援透過 token 取消
yield return i;
}
}
// 使用:
async Task ConsumeAndCancel()
{
var cts = new CancellationTokenSource(500); // 500 毫秒後取消
try
{
await foreach (var num in GetCancelableSequence(cts.Token))
{
Console.WriteLine($"取得: {num}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine("產生已取消!");
}
}
// 呼叫: await ConsumeAndCancel();
屬性 [EnumeratorCancellation] 允許把 CancellationToken 傳給非同步生成器。這樣當呼叫端透過 CancellationTokenSource 要求取消時,就能取消迭代。沒有這個屬性時,token 不會自動傳到 GetAsyncEnumerator。
3. 非同步資源管理
同步 IDisposable 的問題
方法 Dispose() 在 IDisposable 裡是同步的(void Dispose())。你不能在裡面使用 await。如果關閉資料庫連線或把緩衝寫入檔案是耗時且非同步的操作,同步的 Dispose() 會阻塞執行緒,對非同步應用很不友好。
解法: IAsyncDisposable
IAsyncDisposable 解決了這個問題。它只有一個方法:ValueTask DisposeAsync() — 用於非同步清理。
await using
這是 using 的非同步版本。用於實作了 IAsyncDisposable 的物件。
- await using 保證在宣告資源的程式區塊結束時(或因例外離開時)呼叫 DisposeAsync()。
- 可以正確地以非同步方式釋放資源,避免阻塞。
範例:基本的 IAsyncDisposable 與 await using
class MyAsyncResource : IAsyncDisposable
{
public MyAsyncResource() => Console.WriteLine("資源已開啟。");
public async ValueTask DisposeAsync()
{
Console.WriteLine("開始非同步清理...");
await Task.Delay(200); // 模擬非同步清理
Console.WriteLine("非同步清理完成。");
}
}
// 使用 await using
async Task UseAndDisposeResource()
{
await using var resource = new MyAsyncResource();
Console.WriteLine("資源在使用中...");
} // 這裡會自動呼叫 resource.DisposeAsync()
// 呼叫: await UseAndDisposeResource();
範例:多個 await using 區塊
async Task UseMultipleResources()
{
await using var res1 = new MyAsyncResource();
await using var res2 = new MyAsyncResource();
Console.WriteLine("同時使用兩個資源...");
} // 資源會以 LIFO (Last In, First Out) 的順序被釋放:res2.DisposeAsync() 會先被呼叫,接著是 res1.DisposeAsync()。
// 呼叫: await UseMultipleResources();
IAsyncEnumerable<T> 與 IAsyncDisposable 的相容性
重點:IAsyncEnumerator<T>(被 await foreach 使用)本身就繼承自 IAsyncDisposable。這表示如果你的非同步生成器使用了需要非同步釋放的資源(像前面範例裡的 StreamReader),await foreach 會幫你處理:當迭代結束時,它會呼叫迭代器的 DisposeAsync()。
GO TO FULL VERSION