๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
  • ๐Ÿ‘ฉ๐Ÿปโ€๐Ÿ’ป ๐ŸŒฎ ๐Ÿ’ฌ
๐Ÿ‘ฉ๐Ÿป‍๐Ÿ’ป/airflow

[Airflow] TriggerDagRunOperator ์‚ฌ์šฉํ•  ๋•Œ duplicate key value violates unique constraint "dag_run_dag_id_execution_date_key"

by ๋ฐ”์ฟ„๋ฆฌ 2025. 9. 14.

 

๊ฐœ์š”

Airflow์„ ํ™œ์šฉํ•˜์—ฌ ํฌ๋กค๋ง์„ ์ง„ํ–‰ํ•˜๊ณ  ์žˆ์—ˆ๋‹ค.

ํ•˜๋‚˜์˜ DAG๊ฐ€ ํŠน์ • DAG๋ฅผ ํ•œ๋ฒˆ์— ์—ฌ๋Ÿฌ๋ฒˆ trigger ํ•˜๋Š” ํ”„๋กœ์„ธ์Šค์ธ๋ฐ

์ง€๊ธˆ ๊ณ„์† ํฌ๋กค๋ง ์‚ฌ์ดํŠธ๊ฐ€ ๋Š˜์–ด๋‚˜๋Š” ๊ณผ์ •์— ์žˆ์—ˆ๋‹ค.

 

ํฌ๋กค๋ง ์‚ฌ์ดํŠธ๊ฐ€ 300๊ฐœ ๊ฐ€๊นŒ์ด ๋Š˜์–ด๋‚˜๊ณ , ํ•˜๋‚˜์˜ DAG๋ฅผ 300๊ฐœ ๊ฐ€๊นŒ์ด ํ˜ธ์ถœํ•˜๋‹ค๊ฐ€ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ๋‹ค.

 

 

์—๋Ÿฌ๋Š” trigger_internal_dag task๊ฐ€ Fail

 

ํ™•์ธํ•ด๋ณด์ž

error message: run_id ์ค‘๋ณต ์˜ค๋ฅ˜

sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dag_run_dag_id_execution_date_key"

 

 

  • trigger_internal_dag์€ ๋งค์šฐ ์งง์€ ์‹œ๊ฐ„ ์•ˆ์— ๋™์ผํ•œ DAG๋ฅผ ์—ฌ๋Ÿฌ ๋ฒˆ ํŠธ๋ฆฌ๊ฑฐ(trigger)ํ•˜๋ ค๊ณ  ์‹œ๋„
  • ์ด๋•Œ Airflow ๋‚ด๋ถ€์—์„œ ์‹คํ–‰ํ•˜๋Š” ์‹œ๊ฐ„์œผ๋กœ run_id๋ฅผ ์ž๋™์œผ๋กœ ๋ถ€์—ฌํ•˜๋Š”๋ฐ,
  • ๋™์‹œ์— ์—ฌ๋Ÿฌ๊ฐœ๊ฐ€ ์‹คํ–‰๋˜๋ฉฐ ๊ฐ™์€ run_id๋ฅผ ๊ฐ€์ง€๋Š” ๊ฒฝ์šฐ๊ฐ€ ๋ฐœ์ƒ → ๊ทธ๋ž˜์„œ ์ค‘๋ณต ์˜ค๋ฅ˜ ๋ฐœ์ƒ!

 

์ด๋ ‡๊ฒŒ ๋ณ€๊ฒฝ

๋ณ€๊ฒฝ ์ „:

trigger_internal = TriggerDagRunOperator.partial(
    task_id="trigger_internal_dag",
    trigger_dag_id="execute_crawler",
    wait_for_completion=False,
).expand(conf=internal_merged_confs)

 

๋ณ€๊ฒฝ ํ›„:

@task
def trigger_internal_dag_dynamically(payload: dict):
    category_id = payload.get("category_id")
    run_id = f"internal_{category_id}_{timezone.utcnow().strftime('%Y%m%dT%H%M%S.%f')}"
    print(f"Triggering execute_crawler with run_id: {run_id}")
    trigger_dag(
        dag_id="execute_crawler",
        run_id=run_id,
        conf=payload,
        execution_date=timezone.utcnow(),
        replace_microseconds=False
    )

...

trigger_internal = trigger_internal_dag_dynamically.expand(payload=internal_merged_confs)

 

๋ญ˜ ๋ณ€๊ฒฝํ–ˆ์„๊นŒ

  1. Task ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ ์ ์šฉ
    1. ์ด์ „: Airflow์— ๋‚ด์žฅ๋œ TriggerDagRunOperator ์‚ฌ์šฉ → .expand()์™€ ํ•จ๊ป˜ ์‚ฌ์šฉํ•  ๋•Œ run_id๋ฅผ ๋™์ ์œผ๋กœ ๋งŒ๋“œ๋Š” ๋ฐ ๋ฌธ์ œ ๋ฐœ์ƒ
    2. ๋ณ€๊ฒฝ: trigger_internal_dag_dynamically๋ผ๋Š” ์ด๋ฆ„์˜ ํŒŒ์ด์ฌ ํ•จ์ˆ˜๋ฅผ ์ƒ์„ฑ → @task ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ๋ฅผ ๋ถ™์—ฌ Airflow๊ฐ€ ์ด ํ•จ์ˆ˜๋ฅผ ํ•˜๋‚˜์˜ task๋กœ ์ธ์‹ํ•˜๊ฒŒ ํ•จ
  2. run_id ์ƒ์„ฑ ๋ฐฉ์‹
    1. ์ด์ „: Airflow๊ฐ€ ๋‚ด๋ถ€ ๊ทœ์น™์— ๋”ฐ๋ผ run_id์„ ์ž๋™์œผ๋กœ ๋งŒ๋“ค๋„๋ก ํ•จ, ์ด ๊ณผ์ •์—์„œ ๋™์‹œ์— trigger ๋˜๋Š” ๊ฒฝ์šฐ ๊ฐ™์€ run_id์„ ์ƒ์„ฑํ•˜๋ฉฐ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒ
    2. ํ˜„์žฌ: run_id = f"internal_{category_id}_{timezone.utcnow()...}" ์ฝ”๋“œ๋ฅผ ํ†ตํ•ด ๊ฐ task๋งˆ๋‹ค ๊ณ ์œ ํ•œ run_id๋ฅผ ์ง์ ‘ ๋ช…์‹œ์ ์œผ๋กœ ์ƒ์„ฑ → category_id์™€ ํ˜„์žฌ ์‹œ๊ฐ„์„ ์กฐํ•ฉํ•ด์„œ ์ ˆ๋Œ€ ๊ฒน์น˜์ง€ ์•Š๊ฒŒ ํ•จ
  3. DAG ์‹คํ–‰ ํ˜ธ์ถœ ๋ฐฉ์‹
    1. ์ด์ „: TriggerDagRunOperator๊ฐ€ ๋‚ด๋ถ€์ ์œผ๋กœ DAG ์‹คํ–‰
    2. ํ˜„์žฌ: Airflow์˜ API์ธ trigger_dag() ํ•จ์ˆ˜๋ฅผ ์ง์ ‘ ํ˜ธ์ถœ → dag_id, run_id, conf๋ฅผ ์ „๋‹ฌ

 

๊ฒฐ๋ก 

๋™์ผํ•œ Dag๋ฅผ ๋™์‹œ์— 300์—ฌ๊ฐœ ์ •๋„ ํ˜ธ์ถœํ•ด๋„ ๊ฐ๊ฐ์˜ run_id๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ๊ธฐ์— → ์ค‘๋ณต ํ˜ธ์ถœ ์˜ค๋ฅ˜๊ฐ€ ์‚ฌ๋ผ์ง

 

 

์‹ค์ œ๋กœ ์šด์˜ํ•˜๋ฉด์„œ ์ƒ๊ธฐ๋Š” ๋ฌธ์ œ๋ฅผ ๋ฐœ์ƒํ•˜๋Š” ๊ฑด ์žฌ๋ฐŒ๋‹ค

DAG๋ฅผ ๋” ๋งŽ์ด ํ˜ธ์ถœํ•˜๋ฉด ๋˜ ์–ด๋–ค ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ• ๊นŒ

๋‚ด๊ฐ€ ๋˜ ๋†“์นœ ๋ถ€๋ถ„์€ ์–ด๋””์ผ๊นŒ