Apache Airflow

[Airflow] @task_group으로 TaskGroup 정의

비번변경 2023. 5. 21. 17:10

개요

2023.05.20 - [Airflow] TaskGroup에서 TaskGroup의 개념을 알아보고, 사용방법을 정리했다. 이 글에서는 TaskFlow API로, 즉 @task_group을 이용해 TaskGroup을 정의하는 방법을 정리한다.

 

 

예시 dag

@task_group를 사용하지 않고 정의한 아래 dag를 데코레이터를 이용해 다시 작성한다.

import datetime

from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

dag = DAG(
    dag_id="test_task_group",
    start_date=datetime.datetime(2016, 1, 1),
    schedule="@daily",
    default_args={"retries": 1},
)

with TaskGroup(group_id='group_1', dag=dag) as tg_1:
    task_1 = EmptyOperator(task_id="task_1", dag=dag)
    task_2 = BashOperator(task_id="task_2", bash_command="echo Hello World!", retries=2, dag=dag)
    task_3 = BashOperator(task_id="task_3", bash_command="echo Hello World!", retries=2, dag=dag)

    task_1 >> [task_2, task_3]

with TaskGroup(group_id='group_2', dag=dag) as tg_2:
    task_1 = EmptyOperator(task_id="task_1", dag=dag)

    with TaskGroup(group_id='inner_group', dag=dag) as inner_tg:
        task_2 = BashOperator(task_id="task_2", bash_command="echo 1", dag=dag)
        task_3 = EmptyOperator(task_id="task_3", dag=dag)
        task_4 = EmptyOperator(task_id="task_4", dag=dag)

        [task_2, task_3] >> task_4

task_start = EmptyOperator(task_id="start", dag=dag)
task_end = EmptyOperator(task_id="end", dag=dag)

task_start >> tg_1 >> tg_2 >> task_end

 

 

모듈 import

task_group 데코레이터를 사용하기 위해서 관련 모듈 import가 필요하다.

from airflow.decorators import task_group

 

 

task_group 정의

@task_group 사용 방법은 @dag, @task나 크게 다르지 않다. task를 선언하는 함수를 정의하고, 그 위에 데코레이터를 사용하면 된다. 작업이 보다 일반적이라면 task_group을 재사용할 수도 있을 것 같다.

@task_group(group_id='group_1')
def tg_1():
    task_1 = EmptyOperator(task_id="task_1")
    task_2 = BashOperator(task_id="task_2", bash_command="echo Hello World!", retries=2)
    task_3 = BashOperator(task_id="task_3", bash_command="echo Hello World!", retries=2)

    task_1 >> [task_2, task_3]

정의 시 group_id를 지정하지 않으면 함수 이름이 group_id가 된다.

@task_group
def inner_group():
    task_2 = BashOperator(task_id="task_2", bash_command="echo 1")
    task_3 = EmptyOperator(task_id="task_3")
    task_4 = EmptyOperator(task_id="task_4")
    [task_2, task_3] >> task_4

 

 

task_group 사용

정의한 TaskGroup은 함수 호출하듯이 호출하여 사용한다.

@dag(
    dag_id="test_task_group_deco",
    start_date=datetime.datetime(2016, 1, 1),
    schedule="@daily",
    default_args={"retries": 1}
)
def generate_dag():
    task_start = EmptyOperator(task_id="start")
    task_end = EmptyOperator(task_id="end")
    task_start >> tg_1() >> tg_2() >> task_end

 

 

예시 dag에 @task_group 사용

예시 dag를 TaskFlow API를 이용해 다시 작성하면 다음과 같다. 

import datetime
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.decorators import dag, task, task_group

@task_group(group_id='group_1')
def tg_1():
    task_1 = EmptyOperator(task_id="task_1")
    task_2 = BashOperator(task_id="task_2", bash_command="echo Hello World!", retries=2)
    task_3 = BashOperator(task_id="task_3", bash_command="echo Hello World!", retries=2)
    task_1 >> [task_2, task_3]

@task_group(group_id='group_2')
def tg_2():
    task_1 = EmptyOperator(task_id="task_1")
    @task_group
    def inner_group():
        task_2 = BashOperator(task_id="task_2", bash_command="echo 1")
        task_3 = EmptyOperator(task_id="task_3")
        task_4 = EmptyOperator(task_id="task_4")
        [task_2, task_3] >> task_4
    inner_group()

@dag(
    dag_id="test_task_group_deco",
    start_date=datetime.datetime(2016, 1, 1),
    schedule="@daily",
    default_args={"retries": 1}
)
def generate_dag():
    task_start = EmptyOperator(task_id="start")
    task_end = EmptyOperator(task_id="end")
    task_start >> tg_1() >> tg_2() >> task_end

generate_dag()

 

 

참고 문서

https://docs.astronomer.io/learn/task-groups

https://docs.astronomer.io/learn/airflow-decorators

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#taskgroups