Apache Airflow

[Airflow] chain - Task 간 의존성 설정

비번변경 2023. 4. 13. 19:52

개요

2022.08.03 - [Airflow] Task 간 의존성 설정에서 시프트 연산자와 set_downstream, set_upstream 함수를 이용한 방법을 정리했다. 추가로 chain 함수를 이용한 의존성 설정에 대해 정리한다.

 

 

chain

주어진 여러 개의 task에 대한 의존성 체인을 만든다.

chain(*tasks)

chain 함수는 tasks, Labels, XComArg, TaskGroups, 그리고 이러한 유형을 원소로 하는 리스트를 매개변수로 받을 수 있다.

 

 

모듈 임포트

아래와 같이 임포트 하여 사용할 수 있다.

from airflow.models.baseoperator import chain

 

 

사용 예

테스트 DAG 코드는 접은 글로 작성해 둔다.

더보기

Code

from datetime import datetime, timedelta
from time import sleep

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models.baseoperator import chain

def dump():
    sleep(3)

dag_args = {
    "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=1),
}

dag = DAG(
    dag_id="test_for",
    default_args=dag_args,
    start_date=datetime(2022, 1, 20),
    schedule_interval="@once",
)

# TASK 생성
list_task = list()
for i in range(4):
    t = PythonOperator(task_id=f"task_{i}", python_callable=dump, dag=dag)
    list_task.append(t)

 

Graph

 

 

1. 병렬

chain 함수에 task list를 전달하면 의존성을 병렬로 설정한다.

list_task = list()
for i in range(4):
    t = PythonOperator(task_id=f"task_{i}", python_callable=dump, dag=dag)
    list_task.append(t)

chain(list_task)

 

2. 직렬

chain 함수에 task를 쭉 나열하면 직렬로 의존성을 설정한다.

list_task = list()
for i in range(4):
    t = PythonOperator(task_id=f"task_{i}", python_callable=dump, dag=dag)
    list_task.append(t)

# list_task[0] >> list_task[1] >> list_task[2] >> list_task[3]
chain(list_task[0], list_task[1], list_task[2], list_task[3])

 

리스트 내에 있는 task를 모두 직렬로 연결할 때는 * 연산자를 사용해도 된다.

list_task = list()
for i in range(4):
    t = PythonOperator(task_id=f"task_{i}", python_callable=dump, dag=dag)
    list_task.append(t)

# list_task[0] >> list_task[1] >> list_task[2] >> list_task[3]
chain(*list_task)

 

3. task 리스트 간 종속성 설정

chain 함수는 여러 task 리스트를 전달받아 의존성을 설정할 수 있다. task 리스트 간 의존성을 설정할 때는 같은 인덱스의 task에 대한 downstream/upstream이 설정된다. 따라서 전달하는 리스트의 길이가 동일해야 한다. 

list_task = list()
for i in range(4):
    t = PythonOperator(task_id=f"task_{i}", python_callable=dump, dag=dag)
    list_task.append(t)

# list_task[0] >> list_task[1]
# list_task[2] >> list_task[3]
chain([list_task[i] for i in range(len(list_task)) if i % 2 == 0],
      [list_task[i] for i in range(len(list_task)) if i % 2 == 1])

병렬로 실행할 task가 하나의 리스트 내에 있어야 한다.

 

 

참고 문서

https://airflow.apache.org/docs/apache-airflow/2.5.1/core-concepts/dags.html

airflow.models.baseoperator.chain(*tasks)

https://docs.astronomer.io/learn/managing-dependencies

https://brownbears.tistory.com/584

728x90