๊ฐ์
Airflow์ Celery๋ฅผ ๋น๊ตํ๋ค.
ํฌ๋กค๋ง ๊ด๋ฆฌ Airflow vs Celery
๊ฐ์๊ธฐ์กด ํ๋ก์ ํธ์ ํฌ๋กค๋ง ์์คํ ์ Airflow๋ก ๊ด๋ฆฌํ๋ค.Airflow ์ด์ ์ค์ด๋ ์๋ฒ์ ๋ฌธ์ ๊ฐ ๋ง์๋ ๋ฐ, ๊ทธ ์ค์์ ์ ์ผ ํฐ ๋ฌธ์ ๋ cpu ์ ์ ์จ์ด 100์ผ๋ก ์น์์ ๋๊ฐ ๋ง์๋ค๋ ๊ฒ์ด๋ค.Airflow๋ก ์ด์
bonory.tistory.com
ํฌ๋กค๋ง ์งํ ์, Celery๋ฅผ ์ฌ์ฉํ๋ ๊ฒ ๋ ์ ํฉํ๋ค๋ ํ๋จ๊ณผ ํจ๊ป ์์ ๋ฅผ ์์ฑํ๋ค.
FastAPI + Celery + requests + BeautifulSoup ๋ฅผ ํ์ฉํ ํฌ๋กค๋ง ์์ ์ ์ํํ๋ ๊ธฐ๋ณธ์ ์ธ ๋ฐฉ๋ฒ์ ์์
ํฌ๋กค๋ง์ ๋ฐฑ๊ทธ๋ผ์ด๋์์ ์คํ๋๋๋ก ์งํ
๊ธฐ๋ณธ ๊ตฌ์กฐ
- FastAPI: ์ฌ์ฉ์๊ฐ API๋ฅผ ํตํด ํฌ๋กค๋ง ์์ฒญ์ ๋ณด๋
- Celery: ์์ฒญ์ ํ์ ๋ฃ๊ณ , ๋ฐฑ๊ทธ๋ผ์ด๋์์ ํฌ๋กค๋ง ์คํ
- Redis: Celery์ ๋ฉ์์ง ๋ธ๋ก์ปค ์ญํ (Task ๊ด๋ฆฌ)
- requests + BeautifulSoup: ์น์ฌ์ดํธ์์ HTML ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์์ ํ์ฑ
FastAPI
FastAPI์์ ์ฌ์ฉ์๊ฐ ํฌ๋กค๋ง์ ์์ฒญํ๋ฉด Celery ํ์คํฌ๋ฅผ ์คํํ๋ API
from fastapi import FastAPI, BackgroundTasks
from celery_tasks import crawl_website_task, celery_app
from celery.result import AsyncResult
app = FastAPI()
@app.get("/")
def home():
return {"message": "Welcome to the Celery Web Scraper API"}
@app.post("/crawl")
def start_crawling(url: str, background_tasks: BackgroundTasks):
task = crawl_website_task.delay(url) # Celery ๋น๋๊ธฐ ํ์คํฌ ์คํ
return {"task_id": task.id, "message": f"Crawling started for {url}"}
@app.get("/task/{task_id}")
def get_task_status(task_id: str):
if celery_app.conf.result_backend is None or celery_app.backend is None:
return {"error": "Celery backend is disabled in FastAPI"}
task_result = AsyncResult(task_id, app=celery_app)
return {
"task_id": task_id,
"status": task_result.status,
"result": task_result.result
}
- start_crawling(url): ์ฌ์ฉ์๊ฐ ํน์ URL ํฌ๋กค๋ง์ ์์ฒญํ๋ฉด Celery ํ์คํฌ ์คํ (delay(url))
- get_task_status(task_id): Celery ํ์คํฌ์ ํ์ฌ ์ํ๋ฅผ ์กฐํ
Celery ํฌ๋กค๋ง ํ์คํฌ
Celery์์ ํฌ๋กค๋ง ์์ ์ ์คํํ๋ ์ฝ๋
import requests
from bs4 import BeautifulSoup
from celery import Celery
# Celery ์ธ์คํด์ค ์์ฑ
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0", # Redis ๋ฉ์์ง ๋ธ๋ก์ปค ์ฌ์ฉ
backend="redis://localhost:6379/0" # ์์
๊ฒฐ๊ณผ ์ ์ฅ
)
# Celery ์ค์ ์ถ๊ฐ
celery_app.conf.update(
result_backend="redis://localhost:6379/0",
result_expires=3600, # ๊ฒฐ๊ณผ 1์๊ฐ ์ ์ง
task_ignore_result=False, # ๊ฒฐ๊ณผ ์ ์ฅ ํ์ฑํ
broker_connection_retry_on_startup=True
)
@celery_app.task(bind=True)
def crawl_website_task(self, url: str):
""" ์ฃผ์ด์ง URL์ ํฌ๋กค๋งํ์ฌ ์ ๋ชฉ(title)๊ณผ ๋งํฌ(a ํ๊ทธ)๋ฅผ ์ถ์ถํ๋ ์์
"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status() # HTTP ์ค๋ฅ ๋ฐ์ ์ ์์ธ ์ฒ๋ฆฌ
soup = BeautifulSoup(response.text, "html.parser")
title = soup.title.string if soup.title else "No title found"
# ๋ชจ๋ ๋งํฌ(a ํ๊ทธ) ๊ฐ์ ธ์ค๊ธฐ
links = [a["href"] for a in soup.find_all("a", href=True)]
return {"title": title, "links": links[:5]} # ์์ 5๊ฐ ๋งํฌ ๋ฐํ
except requests.RequestException as e:
return {"error": str(e)}
- celery_app.task: Celery ํ์คํฌ๋ฅผ ์ ์
- crawl_website_task(url): ์ฃผ์ด์ง URL์ ํฌ๋กค๋งํ๊ณ ์ ๋ชฉ๊ณผ ๋งํฌ๋ฅผ ์ถ์ถ
# app ์คํ
uvicorn app:app --host 0.0.0.0 --port 8000
Redis์ Celery ์คํ
(1) Redis ์คํ (๋ฉ์์ง ๋ธ๋ก์ปค)
redis-server
(2) Celery ์คํ (Worker ์คํ)
celery -A celery_tasks.celery_app worker --loglevel=info
์ด๋ ๊ฒ ํ๋ฉด Celery worker๊ฐ ๋๊ธฐ ์ํ์์ ํ์คํฌ๋ฅผ ์คํํ ์ค๋น๋ฅผ ํ๋ค.
API ์์ฒญ ๋ฐ ์คํ ํ ์คํธ
FastAPI docs๋ฅผ ํตํด์ ํ ์คํธ ์งํ ํน์ curl ์ฌ์ฉ
(1) ํฌ๋กค๋ง ์์ฒญ ๋ณด๋ด๊ธฐ
curl -X POST "http://127.0.0.1:8000/crawl?url=https://example.com"
Response:
{
"task_id": "c4256b26-b36e-48cf-bc5e-b6a77a3f263b",
"message": "Crawling started for https://example.com"
}
(2) ํฌ๋กค๋ง ๊ฒฐ๊ณผ ํ์ธ
curl -X GET "http://127.0.0.1:8000/task/c4256b26-b36e-48cf-bc5e-b6a77a3f263b"
Response ์ฑ๊ณต:
{
"task_id": "c4256b26-b36e-48cf-bc5e-b6a77a3f263b",
"status": "SUCCESS",
"result": {
"title": "Example Domain",
"links": ["https://www.iana.org/domains/example"]
}
}
Response ์คํ ์ค:
{
"task_id": "c4256b26-b36e-48cf-bc5e-b6a77a3f263b",
"status": "PENDING",
"result": null
}
Celery๋ฅผ ํ์ฉํ ์ถ๊ฐ ๊ธฐ๋ฅ
Celery๋ฅผ ํ์ฉํ๋ฉด ํฌ๋กค๋ง ์๋น์ค๋ฅผ ๋ ํ์ฅํ ์ ์๋ค.
โ ์ค์ผ์ค๋ง
์ผ์ ์๊ฐ๋ง๋ค ํฌ๋กค๋ง ์คํํ๊ณ ์ถ์ ๊ฒฝ์ฐ celery-beat ์ฌ์ฉ ๊ฐ๋ฅ
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"crawl_every_minute": {
"task": "celery_tasks.crawl_website_task",
"schedule": crontab(minute="*/1"), # 1๋ถ๋ง๋ค ์คํ
"args": ("https://example.com",),
},
}
โ ๋์ ํฌ๋กค๋ง
Celery์ worker๋ฅผ ์ฌ๋ฌ ๊ฐ ์คํํ๋ฉด ๋์ ํฌ๋กค๋ง ๊ฐ๋ฅ
celery -A celery_tasks.celery_app worker --loglevel=info --concurrency=4
→ --concurrency=4: ๋์ 4๊ฐ์ ํฌ๋กค๋ง ํ์คํฌ ์คํ ๊ฐ๋ฅ
โ ํฌ๋กค๋ง ์คํจ ์ ์๋ ์ฌ์๋
@celery_app.task(bind=True, autoretry_for=(requests.RequestException,), retry_kwargs={"max_retries": 3})
def crawl_website_task(self, url: str):
...
→ retry_kwargs={"max_retries": 3}: ์์ฒญ ์คํจ ์ ์ต๋ 3๋ฒ ์๋ ์ฌ์๋
โจ FastAPI ์์ background_tasks: BackgroundTasks
BackgroundTasks๋ฅผ ์ฌ์ฉํ๋ฉด ์์ฒญ์ ๋ณด๋ธ ํ ๋ฐฑ๊ทธ๋ผ์ด๋์์ ์คํ๋์ง๋ง, FastAPI ์์ฒด์ ์ค๋ ๋ ๋ด์์ ์คํ๋๋ ๋ฐฉ์์ด๊ธฐ ๋๋ฌธ์ ์ ํ์ด ์๋ค. (์์ ํ ๋ ๋ฆฝ์ ์ธ ์์ ์ ์๋)
๋ช๊ฐ์ง ๋จ์ ๋ค:
- FastAPI ์๋ฒ๊ฐ ์ฌ์์๋๋ฉด ๋ฐฑ๊ทธ๋ผ์ด๋ ์์
๋ ์ฌ๋ผ์ง
- Celery ๊ฐ์ ์ธ๋ถ ํ์คํฌ ํ๋ฅผ ์ฌ์ฉํด์ผ ์์ ์ ์ผ๋ก ์คํ ๊ฐ๋ฅ
- ์๋ฒ๊ฐ ๊ฐ์ ์ข
๋ฃ๋๋ฉด ์์
์ด ์ค๋จ๋จ
- ์: uvicorn์ ์ค์งํ๋ฉด ์งํ ์ค์ด๋ ๋ฐฑ๊ทธ๋ผ์ด๋ ํ์คํฌ๋ ์ข ๋ฃ๋จ
- ์ค์ผ์ผ๋ง(์ฌ๋ฌ ๊ฐ์ ์๋ฒ ์ธ์คํด์ค ์คํ) ์, ์์
์ด ํน์ ์ธ์คํด์ค์๋ง ๋จ์์์ ์ ์์
- ๋ก๋๋ฐธ๋ฐ์๋ฅผ ์ฌ์ฉํ ๊ฒฝ์ฐ ๋ฌธ์ ๊ฐ ๋ ์ ์์
์ฆ, BackgroundTasks๋ ๊ฐ๋ฒผ์ด ์์ (์: ๋ก๊น , ์ด๋ฉ์ผ ์ ์ก)์๋ ์ ํฉํ์ง๋ง, ์ฅ์๊ฐ ์คํ๋๋ ํฌ๋กค๋ง ์์ ์๋ ๋ถ์ ํฉํ๋ค. ์ฅ์๊ฐ ์คํ๋๋ ๋ฐฑ๊ทธ๋ผ์ด๋ ์์ ์ Celery๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด ์์ ์ ์ด๋ค.
'๐ฉ๐ปโ๐ป > python' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Redis ์ค์๊ฐ ๋ชจ๋ํฐ๋ง (0) | 2025.02.13 |
---|---|
Redis and Celery (0) | 2025.02.12 |
[python] gRPC ๊ฐ๋จํ๊ฒ ๊ตฌํํด๋ณด๊ธฐ (0) | 2025.01.02 |
[python] PySide6๋ก ์ฌ๋ฆฐ gui๋ก ์๊ฒฉ ์๋ฒ์ ํ์ผ ๋ค์ด๋ก๋ํ๊ธฐ (1) | 2024.12.31 |
[python] Fastapi ์ ํ๋ฆฌ์ผ์ด์ exe ํ์ผ๋ก ๋ฐฐํฌํ๊ธฐ (0) | 2024.11.05 |