TaskFlow
2023.04.22 - [Airflow] with, @dag를 이용한 DAG 선언
2023.04.23 - [Airflow] @task를 이용한 Task 선언
위 두 개 글에서 데코레이터를 이용해 Dag와 Task를 선언하는 방법을 정리했는데, 데코레이터를 이용한 선언 방법은 TaskFlow API를 이용한 방법이다.
TaskFlow는 종속성을 자동으로 계산하고 XCom을 이용해 task 간 입출력 이동을 처리한다.
이 글에서는 @task로 생성한 Task의 의존성을 설정하는 방법을 정리한다.
함수 정의
Task로 생성할 함수를 먼저 정의한다. 간단히 데이터를 읽고, 읽은 데이터에서 나이 데이터만 추출해서 총합을 출력하고자 한다.
def data_load():
data = [
{"name": 'james', "age": 29},
{"name": 'lily', "age": 29},
{"name": 'harry', "age": 7},
]
return data
def get_ages(data):
ages = [d.ages for d in data]
return ages
def print_total_age(ages):
print(sum(ages))
Task 선언
호출할 함수를 정의했다면, 정의한 함수를 Task로 생성한다.
@dag(start_date=datetime(2022, 1, 20),
dag_id='taskflow_test',
default_args=dag_args,
schedule="@once")
def generate_dag():
@task()
def data_load():
data = [
{"name": 'james', "age": 29},
{"name": 'lily', "age": 29},
{"name": 'harry', "age": 7},
]
return data
@task
def get_ages(data):
ages = [d["age"] for d in data]
return ages
@task
def print_total_age(ages):
print(sum(ages))
Task 의존성 설정
이 task는 data_load -> get_ages -> print_total_age 순으로 처리되어야 한다. task를 정의할 때 입출력이 이미 정해져 있으므로 그에 맞춰서 함수를 호출하면 된다.
@dag(start_date=datetime(2022, 1, 20),
dag_id='taskflow_test',
default_args=dag_args,
schedule="@once")
def generate_dag():
@task()
def data_load():
data = [
{"name": 'james', "age": 29},
{"name": 'lily', "age": 29},
{"name": 'harry', "age": 7},
]
return data
@task
def get_ages(data):
print(data)
ages = [d["age"] for d in data]
return ages
@task
def print_total_age(ages):
print(sum(ages))
data = data_load()
ages = get_ages(data)
print_total_age(ages)
만약 한 줄로 표현하고 싶다면 아래와 같이 호출해도 된다.
print_total_age(get_ages(data_load()))
만약 @task가 아닌 Operator를 이용해 생성한 Task와 의존성을 설정할 때는 기존과 동일하게 shift 연산자를 이용해 설정할 수 있다.
Dag 실행 확인
전체 코드
from datetime import datetime, timedelta
from airflow.decorators import dag, task
dag_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=1),
}
@dag(start_date=datetime(2022, 1, 20),
dag_id='taskflow_test',
default_args=dag_args,
schedule="@once")
def generate_dag():
@task()
def data_load():
data = [
{"name": 'james', "age": 29},
{"name": 'lily', "age": 29},
{"name": 'harry', "age": 7},
]
return data
@task
def get_ages(data):
ages = [d["age"] for d in data]
return ages
@task
def print_total_age(ages):
print(sum(ages))
print_total_age(get_ages(data_load()))
generate_dag()
생성한 Dag를 Trigger해서 실행하면 정상적으로 동작해 값이 출력되는 모습을 확인할 수 있다.
그리고 반환값이 있었던 함수는 return_value라는 이름으로 XCom이 생성되어 있는 것을 확인할 수 있다.
참고 문서
https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html