Dynamic Task Mapping
기존에 Airflow는 DAG가 프로세싱되는 시점에만 Task를 동적으로 생성할 수 있었다. (참고 : 2023.04.12 - [Airflow] Task 반복 생성) 하지만 Airflow 2.3부터는 Dynamic Task Mapping 기능이 도입되면 Runtime에 Task를 동적으로 생성할 수 있게 되었다.
이 글에서는 Dynamic Task Mapping 기능에 대해 알아보려고 한다.
Dynamic Task
Dynamic Task Mapping 기능은 MapReduce 프로그래밍 모델을 기반으로 한다.
Map은 데이터를 연산하기 좋은 형태로 변환하는 작업으로 일반적으로 Trasformation이라고 표현한다. Reduce는 Map을 통해 만들어진 데이터로 연산하는 작업으로 Action이라고 표현할 수 있다. Reduce는 선택적으로 수행할 수 있다.
즉, Map은 각 입력 데이터에 대한 단일 Task를 생성하고, Reduce는 Map 작업의 결과로 연산을 수행한다.
Airflow와 관련지어서 정리하면,
- Map : Runtime에 입력 매개변수를 기반으로 임의의 개수의 병렬 Task를 생성할 수 있다.
- Redude : 필요한 경우, 출력에 따라 병렬 Mapping 된 Task의 downstream 작업을 수행할 수 있다.
Airflow는 Dynamic Task Mapping 기능에서 Map을 구현하는 두 가지 함수를 제공한다.
- expand : mapping할 매개변수를 전달하여 각 입력에 대한 병렬 Task를 생성한다.
- partial : expand에 의해 생성된 모든 Task에 동일하게 유지할 매개변수를 전달한다.
예시 DAG
아래 DAG는 get_val Task가 동작할 때 랜덤한 크기의 숫자 리스트를 생성하여 get_sum 함수에 y 값으로 전달한다.
from datetime import datetime, timedelta
from airflow.decorators import dag, task
import random
@task
def get_val():
list_len = random.choice(range(10))
print(list_len)
return list(range(list_len))
@task
def get_sum(x, y):
print(f"x={x}, y={y}")
return x + y
dag_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=1),
}
@dag(start_date=datetime(2022, 1, 20),
dag_id='dag_dynamic_test',
default_args=dag_args,
schedule="@once")
def generate_dag():
get_sum.partial(x=2).expand(y=get_val())
generate_dag()
Graph 상으로는 위와 같이 표현된다.
이 글에서는 Taskflow API를 이용해 DAG를 생성했는데, 기존 Operator를 이용한 경우에도 Dynamic Task Mapping이 가능하다.
동작 테스트
DAG를 트리거하여 get_sum Task가 동적으로 생성되는지 확인해 본다.
1.
2.
3.
동작시킬 때마다 get_sum Task 수가 다른 모습을 확인할 수 있다.
참고 문서
https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html