개요
2023.05.20 - [Airflow] TaskGroup에서 TaskGroup의 개념을 알아보고, 사용방법을 정리했다. 이 글에서는 TaskFlow API로, 즉 @task_group을 이용해 TaskGroup을 정의하는 방법을 정리한다.
예시 dag
@task_group를 사용하지 않고 정의한 아래 dag를 데코레이터를 이용해 다시 작성한다.
import datetime
from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
dag = DAG(
dag_id="test_task_group",
start_date=datetime.datetime(2016, 1, 1),
schedule="@daily",
default_args={"retries": 1},
)
with TaskGroup(group_id='group_1', dag=dag) as tg_1:
task_1 = EmptyOperator(task_id="task_1", dag=dag)
task_2 = BashOperator(task_id="task_2", bash_command="echo Hello World!", retries=2, dag=dag)
task_3 = BashOperator(task_id="task_3", bash_command="echo Hello World!", retries=2, dag=dag)
task_1 >> [task_2, task_3]
with TaskGroup(group_id='group_2', dag=dag) as tg_2:
task_1 = EmptyOperator(task_id="task_1", dag=dag)
with TaskGroup(group_id='inner_group', dag=dag) as inner_tg:
task_2 = BashOperator(task_id="task_2", bash_command="echo 1", dag=dag)
task_3 = EmptyOperator(task_id="task_3", dag=dag)
task_4 = EmptyOperator(task_id="task_4", dag=dag)
[task_2, task_3] >> task_4
task_start = EmptyOperator(task_id="start", dag=dag)
task_end = EmptyOperator(task_id="end", dag=dag)
task_start >> tg_1 >> tg_2 >> task_end
모듈 import
task_group 데코레이터를 사용하기 위해서 관련 모듈 import가 필요하다.
from airflow.decorators import task_group
task_group 정의
@task_group 사용 방법은 @dag, @task나 크게 다르지 않다. task를 선언하는 함수를 정의하고, 그 위에 데코레이터를 사용하면 된다. 작업이 보다 일반적이라면 task_group을 재사용할 수도 있을 것 같다.
@task_group(group_id='group_1')
def tg_1():
task_1 = EmptyOperator(task_id="task_1")
task_2 = BashOperator(task_id="task_2", bash_command="echo Hello World!", retries=2)
task_3 = BashOperator(task_id="task_3", bash_command="echo Hello World!", retries=2)
task_1 >> [task_2, task_3]
정의 시 group_id를 지정하지 않으면 함수 이름이 group_id가 된다.
@task_group
def inner_group():
task_2 = BashOperator(task_id="task_2", bash_command="echo 1")
task_3 = EmptyOperator(task_id="task_3")
task_4 = EmptyOperator(task_id="task_4")
[task_2, task_3] >> task_4
task_group 사용
정의한 TaskGroup은 함수 호출하듯이 호출하여 사용한다.
@dag(
dag_id="test_task_group_deco",
start_date=datetime.datetime(2016, 1, 1),
schedule="@daily",
default_args={"retries": 1}
)
def generate_dag():
task_start = EmptyOperator(task_id="start")
task_end = EmptyOperator(task_id="end")
task_start >> tg_1() >> tg_2() >> task_end
예시 dag에 @task_group 사용
예시 dag를 TaskFlow API를 이용해 다시 작성하면 다음과 같다.
import datetime
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.decorators import dag, task, task_group
@task_group(group_id='group_1')
def tg_1():
task_1 = EmptyOperator(task_id="task_1")
task_2 = BashOperator(task_id="task_2", bash_command="echo Hello World!", retries=2)
task_3 = BashOperator(task_id="task_3", bash_command="echo Hello World!", retries=2)
task_1 >> [task_2, task_3]
@task_group(group_id='group_2')
def tg_2():
task_1 = EmptyOperator(task_id="task_1")
@task_group
def inner_group():
task_2 = BashOperator(task_id="task_2", bash_command="echo 1")
task_3 = EmptyOperator(task_id="task_3")
task_4 = EmptyOperator(task_id="task_4")
[task_2, task_3] >> task_4
inner_group()
@dag(
dag_id="test_task_group_deco",
start_date=datetime.datetime(2016, 1, 1),
schedule="@daily",
default_args={"retries": 1}
)
def generate_dag():
task_start = EmptyOperator(task_id="start")
task_end = EmptyOperator(task_id="end")
task_start >> tg_1() >> tg_2() >> task_end
generate_dag()
참고 문서
https://docs.astronomer.io/learn/task-groups
https://docs.astronomer.io/learn/airflow-decorators
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#taskgroups