๊ฐ์
Airflow์ ํ์ฉํ์ฌ ํฌ๋กค๋ง์ ์งํํ๊ณ ์์๋ค.
ํ๋์ DAG๊ฐ ํน์ DAG๋ฅผ ํ๋ฒ์ ์ฌ๋ฌ๋ฒ trigger ํ๋ ํ๋ก์ธ์ค์ธ๋ฐ
์ง๊ธ ๊ณ์ ํฌ๋กค๋ง ์ฌ์ดํธ๊ฐ ๋์ด๋๋ ๊ณผ์ ์ ์์๋ค.
ํฌ๋กค๋ง ์ฌ์ดํธ๊ฐ 300๊ฐ ๊ฐ๊น์ด ๋์ด๋๊ณ , ํ๋์ DAG๋ฅผ 300๊ฐ ๊ฐ๊น์ด ํธ์ถํ๋ค๊ฐ ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ค.

์๋ฌ๋ trigger_internal_dag task๊ฐ Fail
ํ์ธํด๋ณด์
error message: run_id ์ค๋ณต ์ค๋ฅ
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dag_run_dag_id_execution_date_key"
- trigger_internal_dag์ ๋งค์ฐ ์งง์ ์๊ฐ ์์ ๋์ผํ DAG๋ฅผ ์ฌ๋ฌ ๋ฒ ํธ๋ฆฌ๊ฑฐ(trigger)ํ๋ ค๊ณ ์๋
- ์ด๋ Airflow ๋ด๋ถ์์ ์คํํ๋ ์๊ฐ์ผ๋ก run_id๋ฅผ ์๋์ผ๋ก ๋ถ์ฌํ๋๋ฐ,
- ๋์์ ์ฌ๋ฌ๊ฐ๊ฐ ์คํ๋๋ฉฐ ๊ฐ์ run_id๋ฅผ ๊ฐ์ง๋ ๊ฒฝ์ฐ๊ฐ ๋ฐ์ → ๊ทธ๋์ ์ค๋ณต ์ค๋ฅ ๋ฐ์!

์ด๋ ๊ฒ ๋ณ๊ฒฝ
๋ณ๊ฒฝ ์ :
trigger_internal = TriggerDagRunOperator.partial(
task_id="trigger_internal_dag",
trigger_dag_id="execute_crawler",
wait_for_completion=False,
).expand(conf=internal_merged_confs)
๋ณ๊ฒฝ ํ:
@task
def trigger_internal_dag_dynamically(payload: dict):
category_id = payload.get("category_id")
run_id = f"internal_{category_id}_{timezone.utcnow().strftime('%Y%m%dT%H%M%S.%f')}"
print(f"Triggering execute_crawler with run_id: {run_id}")
trigger_dag(
dag_id="execute_crawler",
run_id=run_id,
conf=payload,
execution_date=timezone.utcnow(),
replace_microseconds=False
)
...
trigger_internal = trigger_internal_dag_dynamically.expand(payload=internal_merged_confs)
๋ญ ๋ณ๊ฒฝํ์๊น
- Task ๋ฐ์ฝ๋ ์ดํฐ ์ ์ฉ
- ์ด์ : Airflow์ ๋ด์ฅ๋ TriggerDagRunOperator ์ฌ์ฉ → .expand()์ ํจ๊ป ์ฌ์ฉํ ๋ run_id๋ฅผ ๋์ ์ผ๋ก ๋ง๋๋ ๋ฐ ๋ฌธ์ ๋ฐ์
- ๋ณ๊ฒฝ: trigger_internal_dag_dynamically๋ผ๋ ์ด๋ฆ์ ํ์ด์ฌ ํจ์๋ฅผ ์์ฑ → @task ๋ฐ์ฝ๋ ์ดํฐ๋ฅผ ๋ถ์ฌ Airflow๊ฐ ์ด ํจ์๋ฅผ ํ๋์ task๋ก ์ธ์ํ๊ฒ ํจ
- run_id ์์ฑ ๋ฐฉ์
- ์ด์ : Airflow๊ฐ ๋ด๋ถ ๊ท์น์ ๋ฐ๋ผ run_id์ ์๋์ผ๋ก ๋ง๋ค๋๋ก ํจ, ์ด ๊ณผ์ ์์ ๋์์ trigger ๋๋ ๊ฒฝ์ฐ ๊ฐ์ run_id์ ์์ฑํ๋ฉฐ ์๋ฌ๊ฐ ๋ฐ์
- ํ์ฌ: run_id = f"internal_{category_id}_{timezone.utcnow()...}" ์ฝ๋๋ฅผ ํตํด ๊ฐ task๋ง๋ค ๊ณ ์ ํ run_id๋ฅผ ์ง์ ๋ช ์์ ์ผ๋ก ์์ฑ → category_id์ ํ์ฌ ์๊ฐ์ ์กฐํฉํด์ ์ ๋ ๊ฒน์น์ง ์๊ฒ ํจ
- DAG ์คํ ํธ์ถ ๋ฐฉ์
- ์ด์ : TriggerDagRunOperator๊ฐ ๋ด๋ถ์ ์ผ๋ก DAG ์คํ
- ํ์ฌ: Airflow์ API์ธ trigger_dag() ํจ์๋ฅผ ์ง์ ํธ์ถ → dag_id, run_id, conf๋ฅผ ์ ๋ฌ
๊ฒฐ๋ก
๋์ผํ Dag๋ฅผ ๋์์ 300์ฌ๊ฐ ์ ๋ ํธ์ถํด๋ ๊ฐ๊ฐ์ run_id๋ฅผ ๊ฐ์ง๊ณ ์๊ธฐ์ → ์ค๋ณต ํธ์ถ ์ค๋ฅ๊ฐ ์ฌ๋ผ์ง

์ค์ ๋ก ์ด์ํ๋ฉด์ ์๊ธฐ๋ ๋ฌธ์ ๋ฅผ ๋ฐ์ํ๋ ๊ฑด ์ฌ๋ฐ๋ค
DAG๋ฅผ ๋ ๋ง์ด ํธ์ถํ๋ฉด ๋ ์ด๋ค ๋ฌธ์ ๊ฐ ๋ฐ์ํ ๊น
๋ด๊ฐ ๋ ๋์น ๋ถ๋ถ์ ์ด๋์ผ๊น