Apache Airflow

[Airflow] Variables를 이용한 동적 DAG 생성

비번변경 2023. 4. 19. 18:58

개요

2023.04.15 - [Airflow] Variables - 설정/사용에서 Airflow에서 사용할 전역 변수를 정의하고 사용했다.

이 글에서는 설정한 전역변수를 이용해 동적으로 DAG를 생성한다. 즉, Variables로 설정한 값 목록을 매개변수로 사용해 하나의 DAG 정의 파일로 여러 DAG를 생성해 본다.

 

 

Variables 설정

2023.04.15 - [Airflow] Variables - 설정/사용을 참조하여 설정한다.

이 글에서는 list_dags라는 키로 dag_id가 될 값들을 개행문자로 구분하여 정의했다.

 

 

 

 

DAG 정의 파일 작성

간단히 dag_id를 출력하는 DAG를 예시로 작성한다.

 

DAG 정의부

Variables에 정의된 list_dags를 이용하기 좋게 DAG와 TASK 정의 부분을 하나의 함수로 선언하여 사용한다. 만약 정의할 TASK가 여럿이라면 TASK 생성 함수도 별도로 선언하는 것도 고려할 수 있다.

dag_args = {
    "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=1),
}
val_list_dags = Variable.get('list_dags', default_var='').replace('\r', '')


def print_val(val):
    print(val)


def create_dag(dag_id):
    dag = DAG(
        dag_id=dag_id,
        default_args=dag_args,
        start_date=datetime(2022, 1, 20),
        schedule_interval="@once",
    )
    t = PythonOperator(task_id=f"print_vals",
                       python_callable=functools.partial(print_val, dag_id),
                       dag=dag)

    return dag

 

DAG 생성

Airflow Variables를 읽어 DAG를 생성하는 main 함수이다. DAG를 생성한 뒤, 실행 모듈에 속성으로 추가해주어야 한다.

def main():
    list_dags = val_list_dags.split('\n') if val_list_dags else list()

    this = sys.modules[__name__]
    for dag_id in list_dags:
        dag = create_dag(dag_id)
        setattr(this, dag_id, dag)


main()

 

전체 코드는 접은 글로 작성해 둔다.

더보기
import functools
import sys
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

dag_args = {
    "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=1),
}
val_list_dags = Variable.get('list_dags', default_var='').replace('\r', '')


def print_val(val):
    print(sys.modules[__name__].__getattribute__(dag_id))
    print(val)


def create_dag(dag_id):
    dag = DAG(
        dag_id=dag_id,
        default_args=dag_args,
        start_date=datetime(2022, 1, 20),
        schedule_interval="@once",
    )
    t = PythonOperator(task_id=f"print_vals",
                       python_callable=functools.partial(print_val, dag_id),
                       dag=dag)

    return dag


def main():
    list_dags = val_list_dags.split('\n') if val_list_dags else list()

    this = sys.modules[__name__]
    for dag_id in list_dags:
        dag = create_dag(dag_id)
        setattr(this, dag_id, dag)


main()

 

 

DAG 확인

생성된 DAG를 확인하고 동작시켜 본다.

 

DAG 목록

 

DAG 동작

의도했던 대로 DAG ID를 출력하는 모습을 확인할 수 있다.

 

DAG 소스 파일 확인

Airflow CLI로 확인해 보면 하나의 소스 파일로 여러 DAG가 생성된 것을 확인할 수 있다.

 

 

참고 문서

https://medium.com/@jw_ng/using-airflow-variables-to-create-a-dynamic-workflow-291a759c77be

https://docs.astronomer.io/learn/dynamically-generating-dags