Apache Airflow

[Airflow] @task를 이용한 Task 선언

비번변경 2023. 4. 23. 19:16

Task 선언

2023.04.22 - [Airflow] with, @dag를 이용한 DAG 선언에서 데코레이터를 이용해 DAG를 생성하는 방법을 살펴봤는데, Task도 데코레이터를 이용해 선언할 수 있다. 다만 지금은 Python, SQL 함수에만 사용할 수 있는 것 같다.

 

이 글에서는 표준 생성자를 이용해 task가 선언된 DAG를 데코레이터를 이용해 task를 선언해 본다.

 

 

기존 선언 방식

기존에는 Task도 표준 생성자를 이용해 선언했다.

from datetime import datetime, timedelta

from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from airflow.operators.python import task, get_current_context

dag_args = {
    "owner": "airflow",
    "retries": 1,
    "retry_delay": timedelta(minutes=1),
}

def print_dag_id():
    context = get_current_context()
    print(context['dag_run'].dag_id)

@dag(start_date=datetime(2022, 1, 20),
     dag_id='create_task_test_decorator',
     default_args=dag_args,
     schedule_interval="@once")
def generate_dag():
    t = PythonOperator(task_id='print_dag_id', python_callable=print_dag_id)

generate_dag()

 

 

@task

Airflow 2부터는 데코레이터를 이용해 task를 선언할 수 있다. @task를 사용하기 위해서는 decorator가 정의된 모듈 import가 필요하다.

from airflow.decorators import task

그리고 실행할 함수 위에 @task 데코레이터를 사용한다. DAG 생성 함수 내에 task 함수를 정의하고 호출해야 한다.

from datetime import datetime, timedelta

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

dag_args = {
    "owner": "airflow",
    "retries": 1,
    "retry_delay": timedelta(minutes=1),
}


@dag(start_date=datetime(2022, 1, 20),
     dag_id='create_task_test_decorator',
     default_args=dag_args,
     schedule_interval="@once")
def generate_dag():
    @task()
    def print_dag_id():
        context = get_current_context()
        print(context['dag_run'].dag_id)
    
    print_dag_id()

generate_dag()

데코레이터 사용 시 task_id를 전달하지 않으면 함수의 이름이 task_id가 된다.

 

 

참고 문서

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html

https://docs.astronomer.io/learn/airflow-decorators?tab=traditional#how-to-use-airflow-decorators