๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
  • ๐Ÿ‘ฉ๐Ÿปโ€๐Ÿ’ป ๐ŸŒฎ ๐Ÿ’ฌ
๐Ÿ‘ฉ๐Ÿป‍๐Ÿ’ป/airflow

[Airflow] airflow ui์—์„œ arguments ์ „๋‹ฌํ•˜๊ธฐ

by ๋ฐ”์ฟ„๋ฆฌ 2024. 7. 11.

๊ฐœ์š”

Airflow์˜ Task๋“ค์€ ์Šค์ผ€์ค„๋ง์œผ๋กœ ๊ด€๋ฆฌํ•˜๊ณ  ์žˆ๋‹ค.

ํ•˜์ง€๋งŒ ๋‹จ๋ฐœ์„ฑ์œผ๋กœ ์‹คํ–‰๋˜์–ด์•ผํ•˜๋Š” Task๋“ค๋„ ์กด์žฌํ•œ๋‹ค.

์ดˆ๊ธฐ ๋ฐ์ดํ„ฐ ์ ์žฌ ํ˜น์€ ์žฌ์ ์žฌ ์ด์Šˆ๋กœ ์ธํ•ด..

์ด๋Ÿฌํ•œ ๊ฒฝ์šฐ์— Arguments์„ ์ „๋‹ฌํ•˜๊ณ  ์‹ถ์„ ๋•Œ ์–ด๋–ป๊ฒŒ ํ•ด์•ผํ• ๊นŒ ์ฐพ์•„๋ณด์•˜๋‹ค.

์ง„ํ–‰

BashOperator, PythonOperator ๋‘๊ฐ€์ง€ ๋ฐฉ๋ฒ•์œผ๋กœ ํ…Œ์ŠคํŠธํ•ด๋ณด์•˜๋‹ค.

from datetime import datetime, timedelta
from airflow import DAG
from pendulum.tz.timezone import Timezone
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
kst = Timezone('Asia/Seoul')
default_args={
    'owner' : 'test',
    'retries' : 1,
    'retry_delay' : timedelta(minutes=1)
}
def print_arguments(**kwargs):
    message = kwargs['dag_run'].conf
    print(message)
with DAG(
    dag_id='Argument_Test',
    description='Dag to test how to send arguments',
    default_args=default_args,
    start_date=datetime(2024, 7, 11, tzinfo=kst),
    schedule_interval=None,
    tags=['TEST']
) as dag:
    ## bash operator
    bash_task = BashOperator(
        task_id="bash_task",
        bash_command="echo '{{ dag_run.conf }}'",
        dag=dag
    )
    ## python operator
    python_task = PythonOperator(
        task_id="python_task",
        python_callable=print_arguments,
        provide_context=True, ## ๊ผญ ์ถ”๊ฐ€ํ•˜๊ธฐ
        dag=dag
    )
    bash_task >> python_task

 

์œ„์ฒ˜๋Ÿผ ์Šคํฌ๋ฆฝํŠธ ์ž‘์„ฑํ•ด์„œ Argument_Test๋ผ๋Š” DAG๋ฅผ ์ƒ์„ฑํ–ˆ๋‹ค.

 

์‹คํ–‰๋ฒ„ํŠผ ํด๋ฆญํ•ด์„œ config์™€ ๊ฐ™์ด trigger ํ•œ๋‹ค๋Š” Trigger DAG w/ config๋ฅผ ์„ ํƒํ•œ๋‹ค.

 

 

JSON ํ˜•์‹์œผ๋กœ message๋ฅผ ์ž…๋ ฅํ•œ๋‹ค.

 

BashOperator๋กœ ์‹คํ–‰๋œ ๊ฒฐ๊ณผ

 

PythonOperator๋กœ ์‹คํ–‰๋œ ๊ฒฐ๊ณผ

๋‚ด๊ฐ€ ์ž…๋ ฅํ•œ message๋ฅผ ์ž˜ ๋ฐ›์•„์„œ print ์ฐ์€ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

์ด์ œ ๋‹จ๋ฐœ์„ฑ์œผ๋กœ ์‹คํ–‰๋˜์–ด์•ผํ•˜๋Š” Task ๋ฐœ์ƒ ์‹œ, airflow๋ฅผ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๊ฒ ๋‹ค. ๊ตณ๊ตณ

 

์ง€๊ธˆ ๋‚˜๋Š” ํฌ๋กค๋ง์„ ํ•˜๊ณ  ์žˆ๋Š”๋ฐ, ๋‹จ๋ฐœ์„ฑ์œผ๋กœ ์‹คํ–‰ํ•ด์•ผํ•˜๋Š” ๊ฒฝ์šฐ

1. ํŠน์ • ๊ธฐ๊ฐ„์˜ ๋ฐ์ดํ„ฐ๋ฅผ ํฌ๋กค๋ง ํ•˜๊ณ  ์‹ถ์€ ๊ฒฝ์šฐ

2. ํŠน์ • ๋„๋ฉ”์ธ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ํฌ๋กค๋ง ํ•˜๊ณ  ์‹ถ์€ ๊ฒฝ์šฐ

๊ทธ๋Ÿผ ํŠน์ • ๊ธฐ๊ฐ„, ํŠน์ • ๋„๋ฉ”์ธ์€ arguments๋กœ ๋„˜๊ฒจ์„œ ์ง„ํ–‰ํ•˜๋ฉด ๋˜๋‹ˆ๊นŒ ํ›ค์”ฌ ๊ฐ€๋ณ๊ฒŒ DAG๋ฅผ triggerํ•  ์ˆ˜ ์žˆ๋‹ค.