๊ฐ์
Airflow์ Task๋ค์ ์ค์ผ์ค๋ง์ผ๋ก ๊ด๋ฆฌํ๊ณ ์๋ค.
ํ์ง๋ง ๋จ๋ฐ์ฑ์ผ๋ก ์คํ๋์ด์ผํ๋ Task๋ค๋ ์กด์ฌํ๋ค.
์ด๊ธฐ ๋ฐ์ดํฐ ์ ์ฌ ํน์ ์ฌ์ ์ฌ ์ด์๋ก ์ธํด..
์ด๋ฌํ ๊ฒฝ์ฐ์ Arguments์ ์ ๋ฌํ๊ณ ์ถ์ ๋ ์ด๋ป๊ฒ ํด์ผํ ๊น ์ฐพ์๋ณด์๋ค.
์งํ
BashOperator, PythonOperator ๋๊ฐ์ง ๋ฐฉ๋ฒ์ผ๋ก ํ ์คํธํด๋ณด์๋ค.
from datetime import datetime, timedelta
from airflow import DAG
from pendulum.tz.timezone import Timezone
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
kst = Timezone('Asia/Seoul')
default_args={
'owner' : 'test',
'retries' : 1,
'retry_delay' : timedelta(minutes=1)
}
def print_arguments(**kwargs):
message = kwargs['dag_run'].conf
print(message)
with DAG(
dag_id='Argument_Test',
description='Dag to test how to send arguments',
default_args=default_args,
start_date=datetime(2024, 7, 11, tzinfo=kst),
schedule_interval=None,
tags=['TEST']
) as dag:
## bash operator
bash_task = BashOperator(
task_id="bash_task",
bash_command="echo '{{ dag_run.conf }}'",
dag=dag
)
## python operator
python_task = PythonOperator(
task_id="python_task",
python_callable=print_arguments,
provide_context=True, ## ๊ผญ ์ถ๊ฐํ๊ธฐ
dag=dag
)
bash_task >> python_task
์์ฒ๋ผ ์คํฌ๋ฆฝํธ ์์ฑํด์ Argument_Test๋ผ๋ DAG๋ฅผ ์์ฑํ๋ค.
์คํ๋ฒํผ ํด๋ฆญํด์ config์ ๊ฐ์ด trigger ํ๋ค๋ Trigger DAG w/ config๋ฅผ ์ ํํ๋ค.
JSON ํ์์ผ๋ก message๋ฅผ ์ ๋ ฅํ๋ค.
BashOperator๋ก ์คํ๋ ๊ฒฐ๊ณผ
PythonOperator๋ก ์คํ๋ ๊ฒฐ๊ณผ
๋ด๊ฐ ์ ๋ ฅํ message๋ฅผ ์ ๋ฐ์์ print ์ฐ์ ๊ฒ์ ํ์ธํ ์ ์๋ค.
์ด์ ๋จ๋ฐ์ฑ์ผ๋ก ์คํ๋์ด์ผํ๋ Task ๋ฐ์ ์, airflow๋ฅผ ํ์ฉํ ์ ์๊ฒ ๋ค. ๊ตณ๊ตณ
์ง๊ธ ๋๋ ํฌ๋กค๋ง์ ํ๊ณ ์๋๋ฐ, ๋จ๋ฐ์ฑ์ผ๋ก ์คํํด์ผํ๋ ๊ฒฝ์ฐ
1. ํน์ ๊ธฐ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ํฌ๋กค๋ง ํ๊ณ ์ถ์ ๊ฒฝ์ฐ
2. ํน์ ๋๋ฉ์ธ์ ๋ฐ์ดํฐ๋ฅผ ํฌ๋กค๋ง ํ๊ณ ์ถ์ ๊ฒฝ์ฐ
๊ทธ๋ผ ํน์ ๊ธฐ๊ฐ, ํน์ ๋๋ฉ์ธ์ arguments๋ก ๋๊ฒจ์ ์งํํ๋ฉด ๋๋๊น ํค์ฌ ๊ฐ๋ณ๊ฒ DAG๋ฅผ triggerํ ์ ์๋ค.