Apache Airflow

[Airflow] 함수 내에서 다른 Operator 실행하기(?)

비번변경 2024. 5. 15. 22:48

개요

Airflow Dag 정의 파일에서 정의된 Dag 인스턴스는 전역 변수여야 하는 것으로 인지하고 있었다. 그런데 최근에 업무에서 사용하고 있는 Airflow Dag 정의 파일을 살펴보다가, PythonOperator로 실행하는 함수 내에서 Dag와 Task를 생성하는 부분을 발견했다.

확인해 보니 기능 동작은 하고 있는데…… 함수 내에서 정의한 Dag와 Task 모두 Airflow 시스템 내에서 확인이 되지 않는 것 같다. 개발자에게 의도를 물어보고 싶지만 퇴사를 하셨으므로, 무엇을 의도한 결과인지 추측해보려고 한다.

 

 

예시 코드

아래 코드는 운영 중인 코드를 간단한 형태로 정리한 것이다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator


def func(**kwargs):
    dag = DAG(
        'dag_test_operator_in_func_bash',
        default_args=default_args,
        schedule='0 0 * * *',
        catchup=False
    )
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag
    )
    t1.execute(context=kwargs)


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': '2023-01-01',
    'retries': 1,
    'retry_delay': 20
}

dag = DAG(
    'dag_test_operator_in_func',
    default_args=default_args,
    schedule='0 0 * * *',
    catchup=False
)

python_task = PythonOperator(
    task_id='python_task',
    python_callable=func,
    dag=dag
)

PythonOperator로 호출하는 func 함수 내에서 dag와 BashOperator로 task를 정의하고 있는 것을 확인할 수 있다.

 

해당 코드로 생성된 Dag를 확인해 보자. 

dag_id로 검색했을 때 func 함수 밖에서 정의한 dag_test_operator_in_func는 정상적으로 생성되었지만, func 함수 내에서 정의한 dag인 dag_test_operator_in_func_bash는 확인되지 않는 상태이다. 

dag_test_operator_in_func의 Graph 탭에서도 함수 밖에서 정의한 python_task만 확인되고, 내부에서 정의한 task는 확인되지 않는 상태이다.

하지만 python_task의 실행 로그를 확인해 보면, BashOperator를 통해 실행하고자 한 명령어가 실행된 모습을 확인할 수 있다.

 

 

추측

확인해보니 몇몇 경우에서 Dag 정의 파일의 함수 내에서 Operator를 정의하고 실행하고자 하는 요구사항이 있는 것 같다. 업무에서 사용하고 있는 Dag의 경우에는 PythonOperator로 추출한 값을 이용해 BashOperator를 실행시키는 구조로 개발한 것으로 확인했다.

구글링 해보니 함수 내에서 Operator로 task를 생성하고 execute 함수로 실행하면 된다는 스택 오버플로우 문서를 찾을 수 있었는데, 비슷한 문서들을 참고해서 개발한 것 같다. 

def func(**kwargs):
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date'
    )
    t1.execute(context=kwargs)

참고로 PythonOperator로 실행하는 함수 내에서 Dag를 정의하는 코드는 없어도 아무 문제없다.

 

 

문제점

다만 이렇게 Operator로 실제 Task를 정의하는 것이 아니라 단순히 Operator 클래스를 사용해서 Airflow Dag를 구현하는 것은 지양하는 편이 좋아 보인다.

Airflow는 템플릿 관리, execute() 함수 실행, 실패 시 재시도 등의 전반적인 Task 실행을 관리하는데, 단순히 Python 클래스으로서만 Operator를 사용하면 Airflow가 Task의 수명주기를 관리할 수 없다. 함수 내에서 정의한 Dag와 Task를 Airflow 시스템에서 찾을 수 없는 것도 이러한 이유 때문인 것 같다.

 

따라서 이 글에서 작성된 방식은 사용하지 않는 것을 권한다.

Airflow에서 제공하지 않는 Operator가 필요하다면 사용자 정의 오퍼레이터의 구현이 필요하고, 동적 Task 생성이 필요하면 Dynamic Task Mapping 등을 활용하여 적절하게 구현하는 것이 좋겠다.

 

 

참고 문서

https://stackoverflow.com/questions/64295326/airflow-use-an-operator-inside-a-function/64297270#64297270

https://stackoverflow.com/questions/71998282/airflow-mysql-operator-trying-to-execute-script-path-string-as-sql-rather-than/71998563#71998563

https://forum.astronomer.io/t/call-a-bashoperator-which-is-inside-a-function-from-a-python-operator/847