개요
2023.04.15 - [Airflow] Variables - 설정/사용에서 Airflow에서 사용할 전역 변수를 정의하고 사용했다.
이 글에서는 설정한 전역변수를 이용해 동적으로 DAG를 생성한다. 즉, Variables로 설정한 값 목록을 매개변수로 사용해 하나의 DAG 정의 파일로 여러 DAG를 생성해 본다.
Variables 설정
2023.04.15 - [Airflow] Variables - 설정/사용을 참조하여 설정한다.
이 글에서는 list_dags라는 키로 dag_id가 될 값들을 개행문자로 구분하여 정의했다.
DAG 정의 파일 작성
간단히 dag_id를 출력하는 DAG를 예시로 작성한다.
DAG 정의부
Variables에 정의된 list_dags를 이용하기 좋게 DAG와 TASK 정의 부분을 하나의 함수로 선언하여 사용한다. 만약 정의할 TASK가 여럿이라면 TASK 생성 함수도 별도로 선언하는 것도 고려할 수 있다.
dag_args = {
"owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=1),
}
val_list_dags = Variable.get('list_dags', default_var='').replace('\r', '')
def print_val(val):
print(val)
def create_dag(dag_id):
dag = DAG(
dag_id=dag_id,
default_args=dag_args,
start_date=datetime(2022, 1, 20),
schedule_interval="@once",
)
t = PythonOperator(task_id=f"print_vals",
python_callable=functools.partial(print_val, dag_id),
dag=dag)
return dag
DAG 생성
Airflow Variables를 읽어 DAG를 생성하는 main 함수이다. DAG를 생성한 뒤, 실행 모듈에 속성으로 추가해주어야 한다.
def main():
list_dags = val_list_dags.split('\n') if val_list_dags else list()
this = sys.modules[__name__]
for dag_id in list_dags:
dag = create_dag(dag_id)
setattr(this, dag_id, dag)
main()
전체 코드는 접은 글로 작성해 둔다.
import functools
import sys
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
dag_args = {
"owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=1),
}
val_list_dags = Variable.get('list_dags', default_var='').replace('\r', '')
def print_val(val):
print(sys.modules[__name__].__getattribute__(dag_id))
print(val)
def create_dag(dag_id):
dag = DAG(
dag_id=dag_id,
default_args=dag_args,
start_date=datetime(2022, 1, 20),
schedule_interval="@once",
)
t = PythonOperator(task_id=f"print_vals",
python_callable=functools.partial(print_val, dag_id),
dag=dag)
return dag
def main():
list_dags = val_list_dags.split('\n') if val_list_dags else list()
this = sys.modules[__name__]
for dag_id in list_dags:
dag = create_dag(dag_id)
setattr(this, dag_id, dag)
main()
DAG 확인
생성된 DAG를 확인하고 동작시켜 본다.
DAG 목록
DAG 동작
의도했던 대로 DAG ID를 출력하는 모습을 확인할 수 있다.
DAG 소스 파일 확인
Airflow CLI로 확인해 보면 하나의 소스 파일로 여러 DAG가 생성된 것을 확인할 수 있다.
참고 문서
https://medium.com/@jw_ng/using-airflow-variables-to-create-a-dynamic-workflow-291a759c77be
https://docs.astronomer.io/learn/dynamically-generating-dags