개요
Airflow DAG는 Python 코드로 정의되기 때문에 반복문을 이용해 task를 생성할 수 있다. 이를 이용하면 매개변수나 설정값만 다른 같은 작업을 Task 내에서 실행하지 않고, 각 Task로 정의하여 실행할 수 있다.
예제로 정리한다.
예제 DAG
아래 코드는 반복문을 이용해 task를 정의한다.
from datetime import datetime, timedelta
from time import sleep
from airflow import DAG
from airflow.operators.python import PythonOperator
def dump():
sleep(3)
dag_args = {
"owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=1),
}
dag = DAG(
dag_id="test_for",
default_args=dag_args,
start_date=datetime(2022, 1, 20),
schedule_interval="@once",
)
# TASK 생성
list_task = list()
for i in range(3):
t = PythonOperator(task_id=f"task_{i}", python_callable=dump, dag=dag)
list_task.append(t)
task id는 dag 내에서 유일해야 하므로 인덱스 등을 이용해 명명하는 것이 좋다.
DAG 그래프
예제 코드로 생성된 DAG의 그래프는 다음과 같다. task 간 의존관계가 없으므로 병렬로 나열된다.
참고 문서