Apache Airflow

[Airflow] Task 반복 생성

비번변경 2023. 4. 12. 17:34

개요

Airflow DAG는 Python 코드로 정의되기 때문에 반복문을 이용해 task를 생성할 수 있다. 이를 이용하면 매개변수나 설정값만 다른 같은 작업을 Task 내에서 실행하지 않고, 각 Task로 정의하여 실행할 수 있다.

 

예제로 정리한다.

 

 

예제 DAG

아래 코드는 반복문을 이용해 task를 정의한다.

from datetime import datetime, timedelta
from time import sleep

from airflow import DAG
from airflow.operators.python import PythonOperator

def dump():
    sleep(3)

dag_args = {
    "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=1),
}

dag = DAG(
    dag_id="test_for",
    default_args=dag_args,
    start_date=datetime(2022, 1, 20),
    schedule_interval="@once",
)

# TASK 생성
list_task = list()
for i in range(3):
    t = PythonOperator(task_id=f"task_{i}", python_callable=dump, dag=dag)
    list_task.append(t)

task id는 dag 내에서 유일해야 하므로 인덱스 등을 이용해 명명하는 것이 좋다.

 

 

DAG 그래프

예제 코드로 생성된 DAG의 그래프는 다음과 같다. task 간 의존관계가 없으므로 병렬로 나열된다.

 

 

참고 문서

병렬 Task 리스트 의존성 작성하기