Apache Airflow

[Airflow] @task를 이용한 Task 반복 생성

비번변경 2023. 5. 19. 23:05

개요

2023.04.12 - [Airflow] Task 반복 생성에서 Operator 생성자를 이용해 task를 반복 생성하는 방법을 알아보고,

2023.04.23 - [Airflow] @task를 이용한 Task 선언에서 TaskFlow API를 이용해 task을 생성하는 방법을 알아보았다.

 

그렇다면 TaskFlow API를 이용해 task를 반복 생성해야 할 때는 어떻게 해야 할까? 방법을 정리한다.

 

 

예시) 생성자 사용 DAG

아래의 생성자를 사용한 dag를 TaskFlow API를 사용해 재정의해본다.

from datetime import datetime, timedelta
from time import sleep

from airflow import DAG
from airflow.operators.python import PythonOperator

def dump():
    sleep(3)

dag_args = {
    "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=1),
}

dag = DAG(
    dag_id="test_for",
    default_args=dag_args,
    start_date=datetime(2022, 1, 20),
    schedule_interval="@once",
)

# TASK 생성
list_task = list()
for val in ['cat', 'dog', 'lion']:
    t = PythonOperator(task_id=f"task_{val}", python_callable=dump, dag=dag)
    list_task.append(t)

 

task 매개변수 override

@task 사용 시 task_id를 지정하지 않으면 함수의 이름이 task_id가 되고, task_id를 지정하면 지정한 task_id가 task_id가 된다.

 

task_id 미지정

@task
def dump():
    sleep(3)


@dag(
    dag_id="dag_task_deco_for",
    default_args=dag_args,
    start_date=datetime(2022, 1, 20),
    schedule_interval="@once",
)
def generate_dag():
    dump()

 

task_id 지정

@task(task_id='task_dump')
def dump():
    sleep(3)


@dag(
    dag_id="dag_task_deco_for",
    default_args=dag_args,
    start_date=datetime(2022, 1, 20),
    schedule_interval="@once",
)
def generate_dag():
    dump()

위 코드에서 dump 함수를 반복문 내에서 호출하면 task_id가 동일한 task가 반복 생성된다. dag 정의 파일 내에 task_id가 동일한 경우 task_id, task_id__1, task_id__2, …… 와 같이 task_id에  '__n'을 postfix로 붙여 생성하는 것으로 보인다.

@task(task_id='task_dump')
def dump():
    sleep(3)


@dag(
    dag_id="dag_task_deco_for",
    default_args=dag_args,
    start_date=datetime(2022, 1, 20),
    schedule_interval="@once",
)
def generate_dag():
    for i in range(3):
        dump()

만약 각 task를 보다 유의미하게 식별해야 한다면 위와 같은 네이밍 방식은 적절하지 않다. 반복문을 이용해 task를 생성하는 시점에 task_id와 같은 매개변수 변경이 필요하다. 이 경우에는 override 함수를 호출하여 매개변수 값을 변경할 수 있다.

@task(task_id='task_dump')
def dump():
    sleep(3)


@dag(
    dag_id="dag_task_deco_for",
    default_args=dag_args,
    start_date=datetime(2022, 1, 20),
    schedule_interval="@once",
)
def generate_dag():
    dump.override(task_id=f"task_id_override")()

task 매개변수 override가 dump 함수 매개변수 호출보다 선행되어야 한다.

 

task 반복 생성

즉, @task를 이용한 경우 task를 반복 생성할 때는 다음과 같이 구현할 수 있다.

from datetime import datetime, timedelta
from time import sleep

from airflow.decorators import dag, task

dag_args = {
    "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=1),
}


@task(task_id='task_dump')
def dump():
    sleep(3)


@dag(
    dag_id="dag_task_deco_for",
    default_args=dag_args,
    start_date=datetime(2022, 1, 20),
    schedule_interval="@once",
)
def generate_dag():
    for val in ['cat', 'dog', 'lion']:
        dump.override(task_id=f"task_{val}")()

generate_dag()

 

 

참고 문서

Apache Airflow : create tasks using for loop in one dag, I want tasks made of for loop to respond to each xcom

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/decorators/base/index.html#