Python Celery 入門:非同步任務的得力助手

Celery 是一套成熟的 Distributed Task Queue(分散式任務佇列)解決方案,讓開發者可以非同步地執行任務。在 Web 開發中,當遇到需要長時間執行的任務,例如匯出龐大的資料報表,我們無法要求使用者一直停留在網頁上等待。這時,Celery 就能派上用場,將這些耗時的任務移至背景執行,完成後再透過 Email 或其他方式通知使用者,大幅改善使用者體驗。


Celery 與 Message Queue 的關係

Celery 的運作核心與 Message Queue(訊息佇列) 密不可分。 事實上,Celery 本身不負責傳遞任務訊息,而是依賴一個稱為 Broker 的中介者來傳遞。 這個 Broker 就是所謂的 Message Queue 服務,例如常見的 RabbitMQ 或 Redis。

整個流程如下:

  1. 發布任務:當我們在應用程式中呼叫一個 Celery 任務時,Celery 會將這個任務的資訊(例如要執行的函式名稱、參數等)打包成一則任務訊息 (Task Message)
  2. 訊息傳遞:這則任務訊息會被發送到指定的 Broker (Message Queue)。
  3. 背景執行:在背景運行的 Worker 會持續監聽 Message Queue,一旦收到新的任務訊息,便會取出並執行對應的任務。

透過這樣的架構,Celery 將任務的「發布」與「執行」解耦,讓主應用程式在發布任務後可以立即回應使用者,無需等待任務完成。

核心運作流程圖

用序列圖 (Sequence Diagram) 來視覺化整個流程:

sequenceDiagram
    participant Client as (主應用程式)
    participant Celery App as (Celery 實例)
    participant Broker as (訊息中介者 Redis)
    participant Worker as (背景工作者)
    participant Result Backend as (結果儲存 Redis)

    Client->>+Celery App: 呼叫 task.delay()
    Celery App->>Broker: 發送任務訊息
    Celery App-->>-Client: 立即返回 AsyncResult 物件
    
    Note right of Worker: Worker 持續監聽 Broker
    Worker->>Broker: 取得任務訊息
    Worker->>Worker: 執行任務
    Worker->>+Result Backend: 將執行狀態/結果寫入
    Note left of Client: Client 可用 AsyncResult<br/>向 Result Backend 查詢結果

Celery 任務生命週期

1. 觸發任務 (Client → Celery App)

  • 動作:主應用程式 (Client) 呼叫一個被 @app.task 裝飾的函式,並在其後附加 .delay().apply_async()
  • 說明:這是整個流程的起點。主應用程式決定將一個耗時的操作(例如發送郵件、生成報表)交給背景處理,而不是自己執行,以避免阻塞主執行緒。

2. 發送任務到中介者 (Celery App → Broker)

  • 動作:Celery 實例 (Celery App) 將任務的相關資訊(函式名稱、參數等)序列化成一條訊息,然後發送到設定好的訊息中介者 (Broker),在此例中是 Redis。
  • 說明:Broker 就像一個任務的「待辦清單」或「郵箱」。任務被安全地存放在這裡,等待有空的 Worker 來領取。這一步是實現非同步的關鍵,因為主應用程式把任務「扔」進 Broker 後就不用再管了。

3. 立即返回憑證 (Celery App → Client)

  • 動作:在將任務訊息成功發送到 Broker 後,Celery 幾乎是立即返回一個 AsyncResult 物件給主應用程式。
  • 說明:這是非同步的核心優勢。主應用程式不會等待任務執行完成。它得到的是一個「任務憑證」或「追蹤號」(AsyncResult),裡面包含了獨一無二的 task_id。主應用程式可以繼續執行其他程式碼,使用者介面也能保持回應。

4. 背景工作者領取並執行任務 (Worker → Broker → Worker)

  • 動作:一個或多個獨立運行的 Worker 行程持續監聽 (Polling) Broker。一旦發現有新的任務訊息,其中一個 Worker 就會領取它。
  • 說明:Worker 是真正執行工作的角色。它從 Broker 取得訊息後,會反序列化內容,了解需要執行哪個函式以及使用什麼參數,然後在自己的行程中執行該函式。

5. 寫入執行結果 (Worker → Result Backend)

  • 動作:任務執行完成後(無論成功或失敗),Worker 會將執行結果或狀態(例如 'SUCCESS''FAILURE'、回傳值、錯誤堆疊等)寫入到設定好的結果儲存 (Result Backend) 中。
  • 說明:Result Backend 是一個用來儲存任務最終狀態的地方,它讓主應用程式有辦法查詢任務的執行情況。注意,它和 Broker 可以是同一個資料庫(如 Redis),但它們的角色和儲存的資料結構是完全不同的。Result Backend 是選用的,如果你的任務不需要回傳結果,可以不設定它。

6. (可選)查詢任務結果 (Client → Result Backend)

  • 動作:主應用程式可以在任何時候,使用第 3 步中得到的 AsyncResult 物件,來查詢任務的狀態或獲取結果。
  • 說明:當主應用程式呼叫 async_result.get() 或檢查 async_result.state 時,它會拿著 task_id 去 Result Backend 查詢對應的結果。這一步是選擇性的,但對於需要知道任務是否完成、是否成功或需要取得回傳值的場景至關重要。

如何應用 Celery

以下將以 Redis 作為 Broker 與 Result Backend,示範一個簡單的 Celery 應用。

核心名詞

在開始之前,先了解幾個 Celery 的核心名詞:

  • Application (app):Celery 的實例,是使用 Celery 的起點。
  • Task:Celery 執行的最小工作單元,通常是一個 Python 函式。
  • Worker:負責執行 Task 的背景進程。
  • Broker:訊息中介者,負責傳遞 Task Message,例如 RabbitMQ, Redis。
  • Result Backend:用於儲存 Task 的執行狀態與結果,例如 Redis。

實作步驟

1. 環境準備與安裝

首先,確保已經安裝了 Python 與 Redis。 接著透過 pip 安裝 Celery 及 Redis 相關套件:

pip install celery
pip install "celery[redis]"

2. 撰寫 Celery 應用

建立一個名為 tasks.py 的檔案,內容如下:

# tasks.py
from celery import Celery

# 實例化 Celery,設定 Broker 和 Backend
app = Celery(
    'tasks',
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/1',
)

# 定義一個 Task
@app.task
def say_hello():
    print('Hello, Celery!')

在這段程式碼中:

  • 建立了一個名為 tasks 的 Celery Application。
  • 將 Broker 設定為本機 Redis 的 0 號資料庫。
  • 將 Result Backend 設定為本機 Redis 的 1 號資料庫。
  • 使用 @app.task 裝飾器將 say_hello 函式定義為一個 Celery Task。

3. 啟動 Worker

接著,開啟一個終端機,在 tasks.py 所在的目錄下執行以下指令來啟動 Worker:

celery -A tasks worker --loglevel=info

會看到 Worker 成功啟動並等待接收任務的畫面。

4. 非同步執行任務

現在,另開一個終端機,進入 Python 直譯器來實際執行任務:

>>> from tasks import say_hello

# 使用 .delay() 來非同步執行任務
>>> result = say_hello.delay()

>>> result
<AsyncResult: e8af98f2-920c-43e2-a8e8-175bb8cd88cb>

執行 .delay() 後,會立即得到一個 AsyncResult 物件,這代表任務已經成功發送。 此時,可以回到 Worker 的終端機視窗,會發現 Hello, Celery! 的訊息已經被印出,代表 Worker 已經接收並執行了任務。

總結

透過以上步驟,便成功地利用 Celery 與 Redis 完成了一個非同步任務的執行。這只是 Celery 的初步應用,它還包含了任務排程、工作流設計(Workflow)等更進階的功能,是 Python 開發者處理非同步任務時非常值得學習的工具。