Celery๋ฅผ ์ด์ฉํ ํฌ๋กค๋ง ์์
๊ฐ์Airflow์ Celery๋ฅผ ๋น๊ตํ๋ค. ํฌ๋กค๋ง ๊ด๋ฆฌ Airflow vs Celery๊ฐ์๊ธฐ์กด ํ๋ก์ ํธ์ ํฌ๋กค๋ง ์์คํ ์ Airflow๋ก ๊ด๋ฆฌํ๋ค.Airflow ์ด์ ์ค์ด๋ ์๋ฒ์ ๋ฌธ์ ๊ฐ ๋ง์๋ ๋ฐ, ๊ทธ ์ค์์ ์ ์ผ ํฐ ๋ฌธ์ ๋ cpu ์
bonory.tistory.com
ํฌ๋กค๋ง ์์ ๋ฅผ ์งํํ๋ฉฐ ์๊ฒผ๋ ์ค๋ฅ ๊ธฐ๋ก
๊ฐ์
celery app์ ํ์ฑํ ์ํค๊ณ , Fastapi app๋ ํ์ฑํํ๋ค.
celery_tasks.py
import requests
from bs4 import BeautifulSoup
from celery import Celery
import time
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task(bind=True)
def crawl_website_task(self, url: str):
...
app.py : FastAPI app
from fastapi import FastAPI, BackgroundTasks
from celery_tasks import crawl_website_task
from celery.result import AsyncResult
app = FastAPI()
@app.get("/")
def home():
return {"message": "Welcome to the Celery Web Scraper API"}
@app.post("/crawl")
def start_crawling(url: str, background_tasks: BackgroundTasks):
task = crawl_website_task.delay(url)
return {"task_id": task.id, "message": f"Crawling started for {url}"}
@app.get("/task/{task_id}")
def get_task_status(task_id: str):
task_result = AsyncResult(task_id)
return {
"task_id": task_id,
"status": task_result.status,
"result": task_result.result
}
/crawl endpoint๋ฅผ ํธ์ถํด์ ํฌ๋กค๋ง์ ์์ํ๊ณ ,
โ ~ curl -X POST "http://127.0.0.1:8000/crawl/?url=https://example.com"
{"task_id":"9f9e0b42-09d5-4fc1-9a76-21ce57397471","message":"Crawling started for https://example.com"}
์ํ๋ฅผ ํ์ธํ๊ธฐ ์ํด /task/{task_id} ํธ์ถํ๋ ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ค.
โ ~ curl -X GET "http://127.0.0.1:8000/task/9f9e0b42-09d5-4fc1-9a76-21ce57397471"
Internal Server Error
FastAPI ๋ก๊ทธ๋ฅผ ํ์ธํ๋ ๋ค์๊ณผ ๊ฐ์ ์ค๋ฅ ๋ฉ์ธ์ง๋ฅผ ํ์ธํ ์ ์๋ค.
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
→ ์ด ์ค๋ฅ๋ celery ๋ฐฑ์๋๊ฐ ๋นํ์ฑํ๋์ด์๋๋ฐ ํ์คํฌ๋ฅผ ์กฐํํ๋ ค๊ณ ํ ๋ ๋ฐ์ํ๋ ์ค๋ฅ์ด๋ค.
- transport: redis://localhost:6379/0 → Celery๊ฐ Redis๋ฅผ ๋ฉ์์ง ๋ธ๋ก์ปค๋ก ์ฌ์ฉ ์ค
- results: redis://localhost:6379/0 → ๊ฒฐ๊ณผ ์ ์ฅ ๋ฐฑ์๋๋ก Redis ์ฌ์ฉ ์ค
ํ์ง๋ง, Celery ์คํ ์ํฌ ๋ results: redis://localhost:6379/0 ์ผ๋ก ๋ฐฑ์๋ ์ ํ์ฑํ ๋์ด์์์ ํ์ธํ๋ค.
๊ทธ๋ผ FastAPI์์ celery config๋ฅผ ๋ชป ์ฝ์ด์ ์๊ธฐ๋ ๋ฌธ์ ์ด๋ค.
celery_task.py ์ config๋ฅผ ์ง์ ๋ช ์ํด์ค๋ค.
celery_app.conf.update(
result_backend="redis://localhost:6379/0", # ๋ฐฑ์๋ ์ค์ ๋ช
์์ ์ผ๋ก ์ถ๊ฐ
result_expires=3600, # ๊ฒฐ๊ณผ ์ ์ฅ ์๊ฐ (1์๊ฐ)
task_ignore_result=False, # ๊ฒฐ๊ณผ ์ ์ฅ ํ์ฑํ (๊ธฐ๋ณธ์ ์ผ๋ก ๋นํ์ฑํ์ผ ์๋ ์์)
broker_connection_retry_on_startup=True # Celery 6.0 ์ด์์์ ํ์ํ ์ค์ ์ถ๊ฐ
)
FastAPI app ์ชฝ์ ์ฝ๋๋ ์์ ํด์ค๋ค.
# ์์ ์
@app.get("/task/{task_id}")
def get_task_status(task_id: str):
task_result = AsyncResult(task_id)
return {
"task_id": task_id,
"status": task_result.status,
"result": task_result.result
}
# ์์ ํ
@app.get("/task/{task_id}")
def get_task_status(task_id: str):
# Celery ๋ฐฑ์๋ ์ค์ ์ด ์ ์ฉ๋์๋์ง ํ์ธ
if celery_app.conf.result_backend is None or celery_app.backend is None:
return {"error": "Celery backend is disabled in FastAPI"}
# AsyncResult ํธ์ถ
task_result = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": task_result.status,
"result": task_result.result
}
- ๋ฐฑ์๋ ์ค์ ์ ์ฉ ํ์ธ ๋จผ์ ์งํํ๊ณ
- ํธ์ถ ํ ๋ celery app์ ์ง์ ๋ช ์ํด์ฃผ์๋ค.
celery์ fastapi app๋ฅผ restart ์ํค๊ณ ํ ์คํธ ์งํ
โ ~ curl -X GET "http://127.0.0.1:8000/task/9f9e0b42-09d5-4fc1-9a76-21ce57397471"
{"task_id":"9f9e0b42-09d5-4fc1-9a76-21ce57397471","status":"SUCCESS","result":{"title":"Example Domain","links":["https://www.iana.org/domains/example"]}}
ํ์คํธ ์ํ ์ ์ถ๋ ฅ๋จ ํ์ธ