Apache Airflow

[Airflow] TaskGroup

비번변경 2023. 5. 20. 20:18

TaskGroups

TaskGroups은 Graph view에서 Task를 계층적인 그룹으로 구성할 때 사용한다. 반복적인 패턴을 만들고 시각적인 혼란을 줄이는데 유용하다. Task 실행 순서 등에는 영향을 미치지 않으면 단순 UI 그룹 개념이다.

이 글에서는 사용 방법을 간단히 정리한다. TaskFlow API를 사용하지 않는 방법으로 소개한다.

 

 

예시 dag

먼저 TaskGroup을 사용하지 않은 dag를 살펴본다.

import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

dag = DAG(
    dag_id="test_task_group",
    start_date=datetime.datetime(2016, 1, 1),
    schedule="@daily",
    default_args={"retries": 1},
)

task_start = EmptyOperator(task_id="start", dag=dag)
task_1_1 = EmptyOperator(task_id="task_1_1", dag=dag)
task_1_2 = BashOperator(task_id="task_1_2", bash_command="echo Hello World!", retries=2, dag=dag)
task_1_3 = BashOperator(task_id="task_1_3", bash_command="echo Hello World!", retries=2, dag=dag)
task_2_1 = EmptyOperator(task_id="task_2_1", dag=dag)
task_2_2 = BashOperator(task_id="task_2_2", bash_command="echo 1", dag=dag)
task_2_3 = EmptyOperator(task_id="task_2_3", dag=dag)
task_2_4 = EmptyOperator(task_id="task_2_4", dag=dag)
task_end = EmptyOperator(task_id="end", dag=dag)

task_start >> task_1_1
task_1_1 >> [task_1_2, task_1_3] >> task_2_2 >> task_2_4
task_2_3 >> task_2_4
[task_2_1, task_2_4] >> task_end

한눈에 보기에도 깔끔해 보이진 않다. 이 dag를 TaskGroups을 이용해 정리해 보자.

 

 

모듈 import

TaskGroup을 사용하기 위해서는 아래와 같은 모듈 import가 필요하다.

from airflow.utils.task_group import TaskGroup

 

 

TaskGroup 정의

with문을 이용해 TaskGroup을 정의한다. 정의 시 group_id를 지정할 수 있다.

with TaskGroup(group_id='group_1', dag=dag) as tg_1:
    task_1 = EmptyOperator(task_id="task_1", dag=dag)
    task_2 = BashOperator(task_id="task_2", bash_command="echo Hello World!", retries=2, dag=dag)
    task_3 = BashOperator(task_id="task_3", bash_command="echo Hello World!", retries=2, dag=dag)

    task_1 >> [task_2, task_3]

정의한 TaskGroup은 일반적인 Task와 비슷하게 shift 연산자를 이용해 Task 간 의존성을 지정할 수 있다. TaskGroup과 Task 간 의존성을 정의할 수도 있고, TaskGroup과 TaskGroup 간의 의존성을 정의할 수도 있다.

task_start >> tg_1
tg_1 >> task_2_2 >> task_2_4

TaskGroup 내에 별도의 TaskGroup도 정의할 수 있다.

with TaskGroup(group_id='group_2', dag=dag) as tg_2:
    task_1 = EmptyOperator(task_id="task_1", dag=dag)

    with TaskGroup(group_id='inner_group', dag=dag) as inner_tg:
        task_2 = BashOperator(task_id="task_2", bash_command="echo 1", dag=dag)
        task_3 = EmptyOperator(task_id="task_3", dag=dag)
        task_4 = EmptyOperator(task_id="task_4", dag=dag)

        [task_2, task_3] >> task_4

 

 

예시 Dag에 TaskGroup 사용

위의 예시 Dag에 TaskGroup을 사용해 정리하면 Graph로 다음과 같이 표현할 수 있다.

 

전체 코드는 접은글로 작성한다.

더보기
import datetime

from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

dag = DAG(
    dag_id="test_task_group",
    start_date=datetime.datetime(2016, 1, 1),
    schedule="@daily",
    default_args={"retries": 1},
)

with TaskGroup(group_id='group_1', dag=dag) as tg_1:
    task_1 = EmptyOperator(task_id="task_1", dag=dag)
    task_2 = BashOperator(task_id="task_2", bash_command="echo Hello World!", retries=2, dag=dag)
    task_3 = BashOperator(task_id="task_3", bash_command="echo Hello World!", retries=2, dag=dag)

    task_1 >> [task_2, task_3]

with TaskGroup(group_id='group_2', dag=dag) as tg_2:
    task_1 = EmptyOperator(task_id="task_1", dag=dag)

    with TaskGroup(group_id='inner_group', dag=dag) as inner_tg:
        task_2 = BashOperator(task_id="task_2", bash_command="echo 1", dag=dag)
        task_3 = EmptyOperator(task_id="task_3", dag=dag)
        task_4 = EmptyOperator(task_id="task_4", dag=dag)

        [task_2, task_3] >> task_4

task_start = EmptyOperator(task_id="start", dag=dag)
task_end = EmptyOperator(task_id="end", dag=dag)

task_start >> tg_1 >> tg_2 >> task_end

 

 

 

참고 문서

https://docs.astronomer.io/learn/task-groups

https://magpienote.tistory.com/201

https://dibrary.tistory.com/201