Task類別

Python SELF TW
等級 26 , 課堂 0
開放

7.1 創建任務

在模組asyncio中,Task類別用來管理協程(coroutines)的執行。 任務(Tasks)就是協程的包裝器,允許管理他們的執行並跟蹤他們的狀態。 Task類別允許通過事件循環並行運行協程並管理它們。

Task類別的基本方法:

Task類別是一種協程的包裝,提供了以下額外功能:

管理協程執行:

任務使得管理協程的執行變得輕鬆,控制它們的狀態並獲得結果。

取消任務:

取消任務的功能使Task在需要在完成之前中止的長時間操作中變得非常有用。

回調函數:

任務支援添加回調函數,這允許在任務完成時執行額外的操作。

創建任務:

任務是通過asyncio.create_task(coroutine)函數或loop.create_task(coroutine)方法創建的, 這些方法調度協程的執行。


import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")
            
async def main():
    task = asyncio.create_task(say_hello())
    await task
            
asyncio.run(main())

7.2 基本方法

方法 Task.cancel()

請求取消任務。如果任務尚未完成,則將引發CancelledError例外。

讓我們寫一個小程序來演示這一點。

  1. 創建一個只等待10秒的異步任務
  2. 將其包裝到一個Task對象中
  3. 等待一秒,以便任務(Task)開始執行
  4. 取消任務 – task.cancel()
  5. 如果進一步嘗試用await運算符等待任務(Task)完成,我們將得到一個CancelledError異常。

import asyncio

async def main():
    task = asyncio.create_task(asyncio.sleep(10)) 
    await asyncio.sleep(1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")
            
asyncio.run(main())

方法 Task.result()

返回任務執行的結果。如果任務以例外結束,則在調用result()時引發該例外。


import asyncio

async def main():
    task = asyncio.create_task(asyncio.sleep(1, result='Completed'))
    result = await task
    print(result)  # Output: Completed
            
asyncio.run(main())

在我們的例子中,根本不需要寫額外的代碼 — await運算符知道如何處理Task對象:任務完成後 它自動調用其result()方法並返回獲得的結果。

方法 Task.exception()

返回任務引發的例外。如果任務以非例外狀態結束,則返回None


import asyncio

async def main():
    task = asyncio.create_task(asyncio.sleep(1))
    try:
        result = await task
    except Exception as e:
        print(f"Task failed with exception: {e}")

asyncio.run(main())

也不需要寫額外的代碼 — 如果在任務中出現例外,await運算符將調用exception()方法。

方法 Task.add_done_callback(callback)

添加一個回調函數(callback),它將在任務完成時被調用。


import asyncio

def callback(future):
    print("Task completed")
            
async def main():
    task = asyncio.create_task(asyncio.sleep(1))
    task.add_done_callback(callback)
    await task
            
asyncio.run(main())

可以為任務添加一個函數,當任務完成時,該函數將自動調用。這種方法使代碼變得非常靈活。 更重要的是,可以添加多個函數,并且可以添加到任何任務中。

方法 Task.done()

如果任務已完成(成功、錯誤或被取消),則返回True


import asyncio

async def main():
    task = asyncio.create_task(asyncio.sleep(1))
    await asyncio.sleep(1.5)
    print(task.done())  # Output: True
            
asyncio.run(main())

使用done()方法可以知道任務是否已成功完成。更確切地說,是完成或被取消, 因為如果出現例外,則任務在形式上可能未完成,但並未取消。 總之,如果任務被取消(cancellable),則該方法將返回False,否則返回True

留言
TO VIEW ALL COMMENTS OR TO MAKE A COMMENT,
GO TO FULL VERSION