๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
  • ๐Ÿ‘ฉ๐Ÿปโ€๐Ÿ’ป ๐ŸŒฎ ๐Ÿ’ฌ
๐Ÿ‘ฉ๐Ÿป‍๐Ÿ’ป/python

Celery๋ฅผ ์ด์šฉํ•œ ํฌ๋กค๋ง ์˜ˆ์ œ

by ๋ฐ”์ฟ„๋ฆฌ 2025. 2. 5.

๊ฐœ์š”

Airflow์™€ Celery๋ฅผ ๋น„๊ตํ–ˆ๋‹ค.

 

ํฌ๋กค๋ง ๊ด€๋ฆฌ Airflow vs Celery

๊ฐœ์š”๊ธฐ์กด ํ”„๋กœ์ ํŠธ์˜ ํฌ๋กค๋ง ์‹œ์Šคํ…œ์„ Airflow๋กœ ๊ด€๋ฆฌํ–ˆ๋‹ค.Airflow ์šด์˜ ์ค‘์ด๋˜ ์„œ๋ฒ„์— ๋ฌธ์ œ๊ฐ€ ๋งŽ์•˜๋Š” ๋ฐ, ๊ทธ ์ค‘์—์„œ ์ œ์ผ ํฐ ๋ฌธ์ œ๋Š” cpu ์ ์œ ์œจ์ด 100์œผ๋กœ ์น˜์†Ÿ์„ ๋•Œ๊ฐ€ ๋งŽ์•˜๋‹ค๋Š” ๊ฒƒ์ด๋‹ค.Airflow๋กœ ์šด์˜

bonory.tistory.com

 

ํฌ๋กค๋ง ์ง„ํ–‰ ์‹œ, Celery๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒŒ ๋” ์ ํ•ฉํ•˜๋‹ค๋Š” ํŒ๋‹จ๊ณผ ํ•จ๊ป˜ ์˜ˆ์ œ๋ฅผ ์ž‘์„ฑํ–ˆ๋‹ค.

 

FastAPI + Celery + requests + BeautifulSoup ๋ฅผ ํ™œ์šฉํ•œ ํฌ๋กค๋ง ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ธฐ๋ณธ์ ์ธ ๋ฐฉ๋ฒ•์„ ์˜ˆ์ œ

ํฌ๋กค๋ง์€ ๋ฐฑ๊ทธ๋ผ์šด๋“œ์—์„œ ์‹คํ–‰๋˜๋„๋ก ์ง„ํ–‰

๊ธฐ๋ณธ ๊ตฌ์กฐ

  • FastAPI: ์‚ฌ์šฉ์ž๊ฐ€ API๋ฅผ ํ†ตํ•ด ํฌ๋กค๋ง ์š”์ฒญ์„ ๋ณด๋ƒ„
  • Celery: ์š”์ฒญ์„ ํ์— ๋„ฃ๊ณ , ๋ฐฑ๊ทธ๋ผ์šด๋“œ์—์„œ ํฌ๋กค๋ง ์‹คํ–‰
  • Redis: Celery์˜ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค ์—ญํ•  (Task ๊ด€๋ฆฌ)
  • requests + BeautifulSoup: ์›น์‚ฌ์ดํŠธ์—์„œ HTML ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€์„œ ํŒŒ์‹ฑ

FastAPI

FastAPI์—์„œ ์‚ฌ์šฉ์ž๊ฐ€ ํฌ๋กค๋ง์„ ์š”์ฒญํ•˜๋ฉด Celery ํƒœ์Šคํฌ๋ฅผ ์‹คํ–‰ํ•˜๋Š” API

from fastapi import FastAPI, BackgroundTasks
from celery_tasks import crawl_website_task, celery_app
from celery.result import AsyncResult

app = FastAPI()

@app.get("/")
def home():
    return {"message": "Welcome to the Celery Web Scraper API"}

@app.post("/crawl")
def start_crawling(url: str, background_tasks: BackgroundTasks):
    task = crawl_website_task.delay(url)  # Celery ๋น„๋™๊ธฐ ํƒœ์Šคํฌ ์‹คํ–‰
    return {"task_id": task.id, "message": f"Crawling started for {url}"}

@app.get("/task/{task_id}")
def get_task_status(task_id: str):
    
    if celery_app.conf.result_backend is None or celery_app.backend is None:
        return {"error": "Celery backend is disabled in FastAPI"}

    task_result = AsyncResult(task_id, app=celery_app)

    return {
        "task_id": task_id,
        "status": task_result.status,
        "result": task_result.result
    }
  • start_crawling(url): ์‚ฌ์šฉ์ž๊ฐ€ ํŠน์ • URL ํฌ๋กค๋ง์„ ์š”์ฒญํ•˜๋ฉด Celery ํƒœ์Šคํฌ ์‹คํ–‰ (delay(url))
  • get_task_status(task_id): Celery ํƒœ์Šคํฌ์˜ ํ˜„์žฌ ์ƒํƒœ๋ฅผ ์กฐํšŒ

Celery ํฌ๋กค๋ง ํƒœ์Šคํฌ

Celery์—์„œ ํฌ๋กค๋ง ์ž‘์—…์„ ์‹คํ–‰ํ•˜๋Š” ์ฝ”๋“œ

import requests
from bs4 import BeautifulSoup
from celery import Celery

# Celery ์ธ์Šคํ„ด์Šค ์ƒ์„ฑ
celery_app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",  # Redis ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค ์‚ฌ์šฉ
    backend="redis://localhost:6379/0"  # ์ž‘์—… ๊ฒฐ๊ณผ ์ €์žฅ
)

# Celery ์„ค์ • ์ถ”๊ฐ€
celery_app.conf.update(
    result_backend="redis://localhost:6379/0",
    result_expires=3600,  # ๊ฒฐ๊ณผ 1์‹œ๊ฐ„ ์œ ์ง€
    task_ignore_result=False,  # ๊ฒฐ๊ณผ ์ €์žฅ ํ™œ์„ฑํ™”
    broker_connection_retry_on_startup=True
)

@celery_app.task(bind=True)
def crawl_website_task(self, url: str):
    """ ์ฃผ์–ด์ง„ URL์„ ํฌ๋กค๋งํ•˜์—ฌ ์ œ๋ชฉ(title)๊ณผ ๋งํฌ(a ํƒœ๊ทธ)๋ฅผ ์ถ”์ถœํ•˜๋Š” ์ž‘์—… """
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()  # HTTP ์˜ค๋ฅ˜ ๋ฐœ์ƒ ์‹œ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ

        soup = BeautifulSoup(response.text, "html.parser")
        title = soup.title.string if soup.title else "No title found"

        # ๋ชจ๋“  ๋งํฌ(a ํƒœ๊ทธ) ๊ฐ€์ ธ์˜ค๊ธฐ
        links = [a["href"] for a in soup.find_all("a", href=True)]

        return {"title": title, "links": links[:5]}  # ์ƒ์œ„ 5๊ฐœ ๋งํฌ ๋ฐ˜ํ™˜

    except requests.RequestException as e:
        return {"error": str(e)}
  • celery_app.task: Celery ํƒœ์Šคํฌ๋ฅผ ์ •์˜
  • crawl_website_task(url): ์ฃผ์–ด์ง„ URL์„ ํฌ๋กค๋งํ•˜๊ณ  ์ œ๋ชฉ๊ณผ ๋งํฌ๋ฅผ ์ถ”์ถœ
# app ์‹คํ–‰
uvicorn app:app --host 0.0.0.0 --port 8000

Redis์™€ Celery ์‹คํ–‰

(1) Redis ์‹คํ–‰ (๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค)

redis-server

 

(2) Celery ์‹คํ–‰ (Worker ์‹คํ–‰)

celery -A celery_tasks.celery_app worker --loglevel=info

์ด๋ ‡๊ฒŒ ํ•˜๋ฉด Celery worker๊ฐ€ ๋Œ€๊ธฐ ์ƒํƒœ์—์„œ ํƒœ์Šคํฌ๋ฅผ ์‹คํ–‰ํ•  ์ค€๋น„๋ฅผ ํ•œ๋‹ค.

API ์š”์ฒญ ๋ฐ ์‹คํ–‰ ํ…Œ์ŠคํŠธ

FastAPI docs๋ฅผ ํ†ตํ•ด์„œ ํ…Œ์ŠคํŠธ ์ง„ํ–‰ ํ˜น์€ curl ์‚ฌ์šฉ

 

(1) ํฌ๋กค๋ง ์š”์ฒญ ๋ณด๋‚ด๊ธฐ

curl -X POST "http://127.0.0.1:8000/crawl?url=https://example.com"

 

Response:

{
    "task_id": "c4256b26-b36e-48cf-bc5e-b6a77a3f263b",
    "message": "Crawling started for https://example.com"
}

 

(2) ํฌ๋กค๋ง ๊ฒฐ๊ณผ ํ™•์ธ

curl -X GET "http://127.0.0.1:8000/task/c4256b26-b36e-48cf-bc5e-b6a77a3f263b"

Response ์„ฑ๊ณต:

{
    "task_id": "c4256b26-b36e-48cf-bc5e-b6a77a3f263b",
    "status": "SUCCESS",
    "result": {
        "title": "Example Domain",
        "links": ["https://www.iana.org/domains/example"]
    }
}

 

Response ์‹คํ–‰ ์ค‘:

{
    "task_id": "c4256b26-b36e-48cf-bc5e-b6a77a3f263b",
    "status": "PENDING",
    "result": null
}

Celery๋ฅผ ํ™œ์šฉํ•œ ์ถ”๊ฐ€ ๊ธฐ๋Šฅ

Celery๋ฅผ ํ™œ์šฉํ•˜๋ฉด ํฌ๋กค๋ง ์„œ๋น„์Šค๋ฅผ ๋” ํ™•์žฅํ•  ์ˆ˜ ์žˆ๋‹ค.

โœ… ์Šค์ผ€์ค„๋ง

์ผ์ • ์‹œ๊ฐ„๋งˆ๋‹ค ํฌ๋กค๋ง ์‹คํ–‰ํ•˜๊ณ  ์‹ถ์€ ๊ฒฝ์šฐ celery-beat ์‚ฌ์šฉ ๊ฐ€๋Šฅ

from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    "crawl_every_minute": {
        "task": "celery_tasks.crawl_website_task",
        "schedule": crontab(minute="*/1"),  # 1๋ถ„๋งˆ๋‹ค ์‹คํ–‰
        "args": ("https://example.com",),
    },
}

 

โœ… ๋™์‹œ ํฌ๋กค๋ง

Celery์˜ worker๋ฅผ ์—ฌ๋Ÿฌ ๊ฐœ ์‹คํ–‰ํ•˜๋ฉด ๋™์‹œ ํฌ๋กค๋ง ๊ฐ€๋Šฅ

celery -A celery_tasks.celery_app worker --loglevel=info --concurrency=4

--concurrency=4: ๋™์‹œ 4๊ฐœ์˜ ํฌ๋กค๋ง ํƒœ์Šคํฌ ์‹คํ–‰ ๊ฐ€๋Šฅ

 

โœ… ํฌ๋กค๋ง ์‹คํŒจ ์‹œ ์ž๋™ ์žฌ์‹œ๋„

@celery_app.task(bind=True, autoretry_for=(requests.RequestException,), retry_kwargs={"max_retries": 3})
def crawl_website_task(self, url: str):
    ...

 

retry_kwargs={"max_retries": 3}: ์š”์ฒญ ์‹คํŒจ ์‹œ ์ตœ๋Œ€ 3๋ฒˆ ์ž๋™ ์žฌ์‹œ๋„


โœจ FastAPI ์—์„œ background_tasks: BackgroundTasks

 

BackgroundTasks๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์š”์ฒญ์„ ๋ณด๋‚ธ ํ›„ ๋ฐฑ๊ทธ๋ผ์šด๋“œ์—์„œ ์‹คํ–‰๋˜์ง€๋งŒ, FastAPI ์ž์ฒด์˜ ์Šค๋ ˆ๋“œ ๋‚ด์—์„œ ์‹คํ–‰๋˜๋Š” ๋ฐฉ์‹์ด๊ธฐ ๋•Œ๋ฌธ์— ์ œํ•œ์ด ์žˆ๋‹ค. (์™„์ „ํžˆ ๋…๋ฆฝ์ ์ธ ์ž‘์—…์€ ์•„๋‹˜)

๋ช‡๊ฐ€์ง€ ๋‹จ์ ๋“ค:

  1. FastAPI ์„œ๋ฒ„๊ฐ€ ์žฌ์‹œ์ž‘๋˜๋ฉด ๋ฐฑ๊ทธ๋ผ์šด๋“œ ์ž‘์—…๋„ ์‚ฌ๋ผ์ง
    1. Celery ๊ฐ™์€ ์™ธ๋ถ€ ํƒœ์Šคํฌ ํ๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ์•ˆ์ •์ ์œผ๋กœ ์‹คํ–‰ ๊ฐ€๋Šฅ
  2. ์„œ๋ฒ„๊ฐ€ ๊ฐ•์ œ ์ข…๋ฃŒ๋˜๋ฉด ์ž‘์—…์ด ์ค‘๋‹จ๋จ
    1. ์˜ˆ: uvicorn์„ ์ค‘์ง€ํ•˜๋ฉด ์ง„ํ–‰ ์ค‘์ด๋˜ ๋ฐฑ๊ทธ๋ผ์šด๋“œ ํƒœ์Šคํฌ๋„ ์ข…๋ฃŒ๋จ
  3. ์Šค์ผ€์ผ๋ง(์—ฌ๋Ÿฌ ๊ฐœ์˜ ์„œ๋ฒ„ ์ธ์Šคํ„ด์Šค ์‹คํ–‰) ์‹œ, ์ž‘์—…์ด ํŠน์ • ์ธ์Šคํ„ด์Šค์—๋งŒ ๋‚จ์•„์žˆ์„ ์ˆ˜ ์žˆ์Œ
    1. ๋กœ๋“œ๋ฐธ๋Ÿฐ์„œ๋ฅผ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ ๋ฌธ์ œ๊ฐ€ ๋  ์ˆ˜ ์žˆ์Œ

์ฆ‰, BackgroundTasks๋Š” ๊ฐ€๋ฒผ์šด ์ž‘์—…(์˜ˆ: ๋กœ๊น…, ์ด๋ฉ”์ผ ์ „์†ก)์—๋Š” ์ ํ•ฉํ•˜์ง€๋งŒ, ์žฅ์‹œ๊ฐ„ ์‹คํ–‰๋˜๋Š” ํฌ๋กค๋ง ์ž‘์—…์—๋Š” ๋ถ€์ ํ•ฉํ•˜๋‹ค. ์žฅ์‹œ๊ฐ„ ์‹คํ–‰๋˜๋Š” ๋ฐฑ๊ทธ๋ผ์šด๋“œ ์ž‘์—…์€ Celery๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์•ˆ์ •์ ์ด๋‹ค.