Apache Airflow

[Airflow] @task 의존성 설정

비번변경 2023. 4. 26. 18:08

TaskFlow

2023.04.22 - [Airflow] with, @dag를 이용한 DAG 선언

2023.04.23 - [Airflow] @task를 이용한 Task 선언

 

위 두 개 글에서 데코레이터를 이용해 Dag와 Task를 선언하는 방법을 정리했는데, 데코레이터를 이용한 선언 방법은 TaskFlow API를 이용한 방법이다.

TaskFlow는 종속성을 자동으로 계산하고 XCom을 이용해 task 간 입출력 이동을 처리한다.

 

이 글에서는 @task로 생성한 Task의 의존성을 설정하는 방법을 정리한다.

 

 

함수 정의

Task로 생성할 함수를 먼저 정의한다. 간단히 데이터를 읽고, 읽은 데이터에서 나이 데이터만 추출해서 총합을 출력하고자 한다.

def data_load():
    data = [
        {"name": 'james', "age": 29},
        {"name": 'lily', "age": 29},
        {"name": 'harry', "age": 7},
    ]
    return data

def get_ages(data):
    ages = [d.ages for d in data]
    return ages

def print_total_age(ages):
    print(sum(ages))

 

 

Task 선언

호출할 함수를 정의했다면, 정의한 함수를 Task로 생성한다.

@dag(start_date=datetime(2022, 1, 20),
     dag_id='taskflow_test',
     default_args=dag_args,
     schedule="@once")
def generate_dag():
    @task()
    def data_load():
        data = [
            {"name": 'james', "age": 29},
            {"name": 'lily', "age": 29},
            {"name": 'harry', "age": 7},
        ]
        return data

    @task
    def get_ages(data):
        ages = [d["age"] for d in data]
        return ages

    @task
    def print_total_age(ages):
        print(sum(ages))

 

 

Task 의존성 설정

이 task는 data_load -> get_ages -> print_total_age 순으로 처리되어야 한다. task를 정의할 때 입출력이 이미 정해져 있으므로 그에 맞춰서 함수를 호출하면 된다.

@dag(start_date=datetime(2022, 1, 20),
     dag_id='taskflow_test',
     default_args=dag_args,
     schedule="@once")
def generate_dag():
    @task()
    def data_load():
        data = [
            {"name": 'james', "age": 29},
            {"name": 'lily', "age": 29},
            {"name": 'harry', "age": 7},
        ]
        return data

    @task
    def get_ages(data):
        print(data)
        ages = [d["age"] for d in data]
        return ages

    @task
    def print_total_age(ages):
        print(sum(ages))

    data = data_load()
    ages = get_ages(data)
    print_total_age(ages)

만약 한 줄로 표현하고 싶다면 아래와 같이 호출해도 된다.

    print_total_age(get_ages(data_load()))

만약 @task가 아닌 Operator를 이용해 생성한 Task와 의존성을 설정할 때는 기존과 동일하게 shift 연산자를 이용해 설정할 수 있다.

 

 

Dag 실행 확인

전체 코드

더보기
from datetime import datetime, timedelta

from airflow.decorators import dag, task

dag_args = {
    "owner": "airflow",
    "retries": 1,
    "retry_delay": timedelta(minutes=1),
}


@dag(start_date=datetime(2022, 1, 20),
     dag_id='taskflow_test',
     default_args=dag_args,
     schedule="@once")
def generate_dag():
    @task()
    def data_load():
        data = [
            {"name": 'james', "age": 29},
            {"name": 'lily', "age": 29},
            {"name": 'harry', "age": 7},
        ]
        return data

    @task
    def get_ages(data):
        ages = [d["age"] for d in data]
        return ages

    @task
    def print_total_age(ages):
        print(sum(ages))

    print_total_age(get_ages(data_load()))


generate_dag()

생성한 Dag를 Trigger해서 실행하면 정상적으로 동작해 값이 출력되는 모습을 확인할 수 있다.

그리고 반환값이 있었던 함수는 return_value라는 이름으로 XCom이 생성되어 있는 것을 확인할 수 있다.

 

참고 문서

https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html