Python Celery 入門:非同步任務的得力助手
Celery 是一套成熟的 Distributed Task Queue(分散式任務佇列)解決方案,讓開發者可以非同步地執行任務。在 Web 開發中,當遇到需要長時間執行的任務,例如匯出龐大的資料報表,我們無法要求使用者一直停留在網頁上等待。這時,Celery 就能派上用場,將這些耗時的任務移至背景執行,完成後再透過 Email 或其他方式通知使用者,大幅改善使用者體驗。
Celery 與 Message Queue 的關係
Celery 的運作核心與 Message Queue(訊息佇列) 密不可分。 事實上,Celery 本身不負責傳遞任務訊息,而是依賴一個稱為 Broker 的中介者來傳遞。 這個 Broker 就是所謂的 Message Queue 服務,例如常見的 RabbitMQ 或 Redis。
整個流程如下:
- 發布任務:當我們在應用程式中呼叫一個 Celery 任務時,Celery 會將這個任務的資訊(例如要執行的函式名稱、參數等)打包成一則任務訊息 (Task Message)。
- 訊息傳遞:這則任務訊息會被發送到指定的 Broker (Message Queue)。
- 背景執行:在背景運行的 Worker 會持續監聽 Message Queue,一旦收到新的任務訊息,便會取出並執行對應的任務。
透過這樣的架構,Celery 將任務的「發布」與「執行」解耦,讓主應用程式在發布任務後可以立即回應使用者,無需等待任務完成。
核心運作流程圖
用序列圖 (Sequence Diagram) 來視覺化整個流程:
sequenceDiagram
participant Client as (主應用程式)
participant Celery App as (Celery 實例)
participant Broker as (訊息中介者 Redis)
participant Worker as (背景工作者)
participant Result Backend as (結果儲存 Redis)
Client->>+Celery App: 呼叫 task.delay()
Celery App->>Broker: 發送任務訊息
Celery App-->>-Client: 立即返回 AsyncResult 物件
Note right of Worker: Worker 持續監聽 Broker
Worker->>Broker: 取得任務訊息
Worker->>Worker: 執行任務
Worker->>+Result Backend: 將執行狀態/結果寫入
Note left of Client: Client 可用 AsyncResult<br/>向 Result Backend 查詢結果
Celery 任務生命週期
1. 觸發任務 (Client → Celery App)
- 動作:主應用程式 (Client) 呼叫一個被
@app.task
裝飾的函式,並在其後附加.delay()
或.apply_async()
。 - 說明:這是整個流程的起點。主應用程式決定將一個耗時的操作(例如發送郵件、生成報表)交給背景處理,而不是自己執行,以避免阻塞主執行緒。
2. 發送任務到中介者 (Celery App → Broker)
- 動作:Celery 實例 (Celery App) 將任務的相關資訊(函式名稱、參數等)序列化成一條訊息,然後發送到設定好的訊息中介者 (Broker),在此例中是 Redis。
- 說明:Broker 就像一個任務的「待辦清單」或「郵箱」。任務被安全地存放在這裡,等待有空的 Worker 來領取。這一步是實現非同步的關鍵,因為主應用程式把任務「扔」進 Broker 後就不用再管了。
3. 立即返回憑證 (Celery App → Client)
- 動作:在將任務訊息成功發送到 Broker 後,Celery 幾乎是立即返回一個
AsyncResult
物件給主應用程式。 - 說明:這是非同步的核心優勢。主應用程式不會等待任務執行完成。它得到的是一個「任務憑證」或「追蹤號」(
AsyncResult
),裡面包含了獨一無二的task_id
。主應用程式可以繼續執行其他程式碼,使用者介面也能保持回應。
4. 背景工作者領取並執行任務 (Worker → Broker → Worker)
- 動作:一個或多個獨立運行的 Worker 行程持續監聽 (Polling) Broker。一旦發現有新的任務訊息,其中一個 Worker 就會領取它。
- 說明:Worker 是真正執行工作的角色。它從 Broker 取得訊息後,會反序列化內容,了解需要執行哪個函式以及使用什麼參數,然後在自己的行程中執行該函式。
5. 寫入執行結果 (Worker → Result Backend)
- 動作:任務執行完成後(無論成功或失敗),Worker 會將執行結果或狀態(例如
'SUCCESS'
、'FAILURE'
、回傳值、錯誤堆疊等)寫入到設定好的結果儲存 (Result Backend) 中。 - 說明:Result Backend 是一個用來儲存任務最終狀態的地方,它讓主應用程式有辦法查詢任務的執行情況。注意,它和 Broker 可以是同一個資料庫(如 Redis),但它們的角色和儲存的資料結構是完全不同的。Result Backend 是選用的,如果你的任務不需要回傳結果,可以不設定它。
6. (可選)查詢任務結果 (Client → Result Backend)
- 動作:主應用程式可以在任何時候,使用第 3 步中得到的
AsyncResult
物件,來查詢任務的狀態或獲取結果。 - 說明:當主應用程式呼叫
async_result.get()
或檢查async_result.state
時,它會拿著task_id
去 Result Backend 查詢對應的結果。這一步是選擇性的,但對於需要知道任務是否完成、是否成功或需要取得回傳值的場景至關重要。
如何應用 Celery
以下將以 Redis 作為 Broker 與 Result Backend,示範一個簡單的 Celery 應用。
核心名詞
在開始之前,先了解幾個 Celery 的核心名詞:
- Application (app):Celery 的實例,是使用 Celery 的起點。
- Task:Celery 執行的最小工作單元,通常是一個 Python 函式。
- Worker:負責執行 Task 的背景進程。
- Broker:訊息中介者,負責傳遞 Task Message,例如 RabbitMQ, Redis。
- Result Backend:用於儲存 Task 的執行狀態與結果,例如 Redis。
實作步驟
1. 環境準備與安裝
首先,確保已經安裝了 Python 與 Redis。 接著透過 pip 安裝 Celery 及 Redis 相關套件:
pip install celery
pip install "celery[redis]"
2. 撰寫 Celery 應用
建立一個名為 tasks.py
的檔案,內容如下:
# tasks.py
from celery import Celery
# 實例化 Celery,設定 Broker 和 Backend
app = Celery(
'tasks',
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/1',
)
# 定義一個 Task
@app.task
def say_hello():
print('Hello, Celery!')
在這段程式碼中:
- 建立了一個名為
tasks
的 Celery Application。 - 將 Broker 設定為本機 Redis 的 0 號資料庫。
- 將 Result Backend 設定為本機 Redis 的 1 號資料庫。
- 使用
@app.task
裝飾器將say_hello
函式定義為一個 Celery Task。
3. 啟動 Worker
接著,開啟一個終端機,在 tasks.py
所在的目錄下執行以下指令來啟動 Worker:
celery -A tasks worker --loglevel=info
會看到 Worker 成功啟動並等待接收任務的畫面。
4. 非同步執行任務
現在,另開一個終端機,進入 Python 直譯器來實際執行任務:
>>> from tasks import say_hello
# 使用 .delay() 來非同步執行任務
>>> result = say_hello.delay()
>>> result
<AsyncResult: e8af98f2-920c-43e2-a8e8-175bb8cd88cb>
執行 .delay()
後,會立即得到一個 AsyncResult
物件,這代表任務已經成功發送。 此時,可以回到 Worker 的終端機視窗,會發現 Hello, Celery!
的訊息已經被印出,代表 Worker 已經接收並執行了任務。
總結
透過以上步驟,便成功地利用 Celery 與 Redis 完成了一個非同步任務的執行。這只是 Celery 的初步應用,它還包含了任務排程、工作流設計(Workflow)等更進階的功能,是 Python 開發者處理非同步任務時非常值得學習的工具。