๊ฐ์
๊ธฐ์กด ํ๋ก์ ํธ์ ํฌ๋กค๋ง ์์คํ ์ Airflow๋ก ๊ด๋ฆฌํ๋ค.
Airflow ์ด์ ์ค์ด๋ ์๋ฒ์ ๋ฌธ์ ๊ฐ ๋ง์๋ ๋ฐ, ๊ทธ ์ค์์ ์ ์ผ ํฐ ๋ฌธ์ ๋ cpu ์ ์ ์จ์ด 100์ผ๋ก ์น์์ ๋๊ฐ ๋ง์๋ค๋ ๊ฒ์ด๋ค.
Airflow๋ก ์ด์ํ๋ ๊ฒ์ด ๋ง์ด ๋ฌด๊ฒ๊ณ ์ค๋ฒํค๋๊ฐ ํฌ๊ธฐ ๋๋ฌธ์ ์ด๋ฒ์ Airflow๊ฐ ์๋ ๋ค๋ฅธ ๋ฐฉ๋ฒ์ผ๋ก ๊ด๋ฆฌํ ์ ์๋์ง์ ๋ํด ์กฐ์ฌ ๋ฐ ์ ๋ฆฌ๋ฅผ ํด๋ณด๋ ค๊ณ ํ๋ค.
ํ์ธ
Airflow๋ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ๋ฐ ์ํฌํ๋ก์ฐ ๊ด๋ฆฌ์ ์ต์ ํ๋ ๋๊ตฌ์ด๋ค. ํฌ๋กค๋ง ๊ฐ์ ๋จ์ํ ์์ ์ ์คํํ ๋์๋ Celery๊ฐ ๋ ์ ํฉํ๋ค.
Airflow์ Celery๋ฅผ ๋น๊ตํด๋ณด๋ฉด
๋น๊ต ํญ๋ชฉ | Celery | Airflow |
์ฃผ์ ๋ชฉ์ | ๋น๋๊ธฐ ์์ ์คํ (ํ์คํฌ ํ) | ๋ฐ์ดํฐ ์ํฌํ๋ก์ฐ ๋ฐ ํ์ดํ๋ผ์ธ ๊ด๋ฆฌ |
์์ ์คํ ๋ฐฉ์ | ๋น ๋ฅธ ์คํ, ๋จ์ํ ์์ ์ฒ๋ฆฌ | DAG ๊ธฐ๋ฐ, ์ค์ผ์ค๋ง ์ค์ฌ |
์ค์ผ์ค๋ง | ๊ธฐ๋ณธ ์ ๊ณต (beat ์ ๊ณต) | ๊ฐ๋ ฅํ DAG ๊ธฐ๋ฐ ์ค์ผ์ค๋ง ์ค์ฌ |
์ํ ๊ด๋ฆฌ | ์ ํ์ (๋จ์ ์ฑ๊ณต/์คํจ) | Task, Retry, DAG ์์กด์ฑ ๊ด๋ฆฌ |
์ค์น ๋ฐ ์ค์ | ์๋์ ์ผ๋ก ๊ฐ๋จ | ๋ณต์กํ ์ค์น ๋ฐ ์ค์ ํ์ |
์ค์๊ฐ ํธ๋ฆฌ๊ฑฐ | ๊ฐ๋ฅ | DAG ์คํ ๊ธฐ๋ฐ์ด๋ผ ์ค์๊ฐ ๋์์ด ์ด๋ ค์ |
์์ ๋ฒ๋ ฌ ์คํ | ๊ฐ๋ฅ (worker ๊ฐ์ ์กฐ์ ) | CeleryExecuter ๋๋ KubernetesExecuter ์ฌ์ฉ ์ ๊ฐ๋ฅ |
→ ํฌ๋กค๋ง์ ๋จ์ํ ๊ฐ๋ณ ์์ ์ด ๋ง๊ณ , ๋น ๋ฅด๊ฒ ์คํ๋๋ฉฐ, ์ค์๊ฐ ์์ฒญ์ด ๋ง์ ๊ฐ๋ฅ์ฑ์ด ๋์ผ๋ → Celery๊ฐ ๋ ์ ํฉํ ๊ฒ์ผ๋ก ๋ณด์ธ๋ค.
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ๋ฐ ์ํฌํ๋ก์ฐ ๊ด๋ฆฌ = ์ฌ๋ฌ ๊ฐ์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์ ์ ์์๋๋ก ์คํํ๋ ๊ฒ
- Airflow๋ DAG ๊ธฐ๋ฐ์ผ๋ก ๊ฐ Task์ ์คํ ์์์ ์์กด์ฑ์ ๊ด๋ฆฌํ๋ ๋ฐ ๊ฐ๋ ฅํ๋ค.
- ํฌ๋กค๋ง ์๋น์ค๋ ๊ฐ๋ณ์ ์ผ๋ก ์คํ๋๋ ๊ฒฝ์ฐ๊ฐ ๋ง์ Celery๊ฐ ์ ํฉํ๋ค.
- ํ์ง๋ง ํฌ๋กค๋ง ํ ๋ฐ์ดํฐ ๋ถ์๊น์ง ํฌํจ๋๋ค๋ฉด Airflow๋ฅผ ๊ณ ๋ คํด๋ ๋๋ค.
์ค์๊ฐ Trigger ๊ฐ๋ฅ
- Celery๋ API ์์ฒญ์ ๋ฐ์์ ์ฆ์ ์คํ ๊ฐ๋ฅ (apply_async() ์ฌ์ฉ)
- Airflow๋ DAG ์คํ ๋ฐฉ์์ด๋ผ ์ฆ์ ์คํ์ด ์ด๋ ต๊ณ trigger๋ฅผ ์ถ๊ฐํด์ผ ํจ
์์
(1) Celery
from tasks import crawl_site
crawl_site.apply_async(args=["https://example.com"])
→ ์ฆ์ ์คํ ๊ฐ๋ฅ
(2) Airflow
from airflow.api.client.local_client import Client
client = Client(None)
client.trigger_dag(dag_id="crawl_dag")
→ DAG ์คํ์ ํธ๋ฆฌ๊ฑฐํด์ผ ํด์ ์ฆ์ ์คํ์ด ์ด๋ ค์
ํฌ๋กค๋ง Task๋ ๋จ์ํ ๊ฐ๋ณ ์คํ์ด๋ฏ๋ก DAG ํ์ ์์
- Airflow๋ DAG(Direct Acyclic Graph) ๊ธฐ๋ฐ์ด๋ผ ์ฌ๋ฌ ๋จ๊ณ์ ์ํฌํ๋ก์ฐ๋ฅผ ์ ์ํ๋ ๋ฐ ์ต์ ํ๋์ด ์๋ค.
- ํฌ๋กค๋ง์ ๋ณดํต ๋จ์ํ ์์ฒญ์ ์คํํ๋ ์์ ์ด๋ฏ๋ก DAG๊ฐ ๋ถํ์ํ๊ฒ ๋ณต์กํด์ง ์ ์๋ค.
- Celery๋ ๋จ์ํ ๋น๋๊ธฐ ํ ์คํธ ์คํ์ด๋ผ ํฌ๋กค๋ง์ ์ ํฉํ๋ค.
Celery๋ ์ค์ผ์ค๋ง ๊ฐ๋ฅ
- Celery๋ celery-beat๋ฅผ ์ฌ์ฉํ๋ฉด ํฌ๋ก ์ฒ๋ผ ์ค์ผ์ค๋ง ๊ฐ๋ฅ
- ํฌ๋กค๋ง ์ค์ผ์ค๋ง์ด ํ์ํ๋ฉด Celery๋ก๋ ์ถฉ๋ถํ ๊ตฌํ ๊ฐ๋ฅ
from celery.schedules import crontab
app.conf.beat_schedule = {
'crawl_every_minute': {
'task': 'tasks.crawl_site',
'schedule': crontab(minute='*/1'), # 1๋ถ๋ง๋ค ์คํ
'args': ('https://example.com',)
},
}
๊ฒฐ๋ก
Airflow๋ณด๋ค Celery๋ฅผ ์ฐ๋ ๊ฒ ๋ ๊ฐ๋จํ๊ณ ํจ์จ์ ์ด๋ผ๊ณ ํ๋จ.
Airflow๋ ๋ณต์กํ ๋ฐ์ดํฐ ์ํฌํ๋ก์ฐ ์ ์ฉ์ด๋ผ ํฌ๋กค๋ง ๊ฐ์ ์งง์ ์์ ์ ์ฒ๋ฆฌํ๊ธฐ์ ์ค๋ฒํค๋๊ฐ ํฌ๋ค.
๋ฐ๋ผ์, Celery + FastAPI ์กฐํฉ์ผ๋ก ํฌ๋กค๋ง ์์คํ ์ ๋ง๋ค๊ณ , ํ์ํ๋ฉด celery-beat๋ก ์ค์ผ์ค๋ง์ ์ถ๊ฐํ๋ ๊ฑธ ๊ณ ๋ คํ๋๊ฒ ์ข์ ๊ฒ์ด๋ผ๊ณ ํ๋จ๋๋ค.
+ Celery๋ฅผ ์ด์ฉํ ํฌ๋กค๋ง ์์ ๋ฅผ ์ถ๊ฐ๋ก ์ ๋ฆฌํ๋ค.
Celery๋ฅผ ์ด์ฉํ ํฌ๋กค๋ง ์์
๊ฐ์FastAPI + Celery + requests + BeautifulSoup ๋ฅผ ํ์ฉํ ํฌ๋กค๋ง ์์ ์ ์ํํ๋ ๊ธฐ๋ณธ์ ์ธ ๋ฐฉ๋ฒ์ ์์ ํฌ๋กค๋ง์ ๋ฐฑ๊ทธ๋ผ์ด๋์์ ์คํ๋๋๋ก ์งํ๊ธฐ๋ณธ ๊ตฌ์กฐFastAPI: ์ฌ์ฉ์๊ฐ API๋ฅผ ํตํด ํฌ๋กค๋ง ์์ฒญ์ ๋ณด
bonory.tistory.com