개요
운영 중인 Airflow 환경에 아래와 같은 2개의 Dag가 동작 중이다.
- log_git_importer : 서버의 Git Working Directory 내 로그 파일을 원격 저장소로 Push 한다. 매 시 7분에 동작한다.
- inf_monitor : 운영 시스템 내 환경에서 모니터링이 필요한 지표를 파일로 생성한다. 매 시 10분에 동작한다.
그런데 어느 순간부터 inf_monitor가 [Errno 116] Stale file handle 오류와 함께 간헐적으로 실패하는 경우가 생기기 시작했다. 확인해 보니 log_git_importer가 동작하는 도중에 inf_monitor가 동작하는 것이 오류가 발생하는 원인으로 보였다. 동시에 두 Dag가 동작하는 상황을 피하기 위해 실행 주기를 조정해보려고 했는데, 사실 log_git_importer가 Airflow 환경이 총 9개라 Push 도중 충돌이 나는 상황을 피하기 위해 서로 다른 환경에서 5분 간격으로 동작 중이라 스케줄을 조정하기도 쉽지 않은 상태였다.
그래서 Dag 간 의존성을 설정해 log_git_importer 동작 후 inf_monitor를 동작하는 방식으로 변경을 해보면 어떤가 고민하고 있다.
이 글에서는 다른 Dag를 트리거하는 TriggerDagRunOperator에 대해서 정리해 본다.
TriggerDagRunOperator
TriggerDagRunOperator는 선행 Dag에서 Dag 간 종속성을 구현할 수 있는 방법 중 하나이다. TriggerDagRunOperator를 사용하면 같은 Airflow 환경에서 동작하는 다른 Dag를 트리거할 수 있다.
airflow.operators.trigger_dagrun.TriggerDagRunOperator(*, trigger_dag_id, trigger_run_id=None, conf=None,
execution_date=None, reset_dag_run=False,
wait_for_completion=False, poke_interval=60, allowed_states=None,
failed_states=None,
deferrable=conf.getboolean('operators', 'default_deferrable',
fallback=False), **kwargs)
- trigger_dag_id : 트리거 되는 dag_id
- trigger_run_id : 트리거 되는 Dag run에서 사용하는 run ID. 기재하지 않으면 자동으로 생성된다.
- conf : Dag run에 전달된 설정
- execution_date
- reset_dag_run : 트리거되는 Dag run이 이미 존재하는 경우 클리어한다. 기존 Dag run을 backfill 하거나 재실행하는 것을 허용할 때 True로 지정한다.
- wait_for_completion : 트리거한 Dag run의 동작 완료를 기다린다. poke_interval 매개변수와 함께 적용된다.
아래 매개변수들은 wait_for_completion을 True로 지정했을 때 유효한 것 같다.
- poke_interval : 트리거한 Dag run의 상태를 확인할 주기를 지정한다. 기본값은 60초이다.
- allowed_states : 다음 Task를 실행할 조건. 기본적으로 대기하는 dag, task가 SUCCESS일 때 실행된다.
- failed_states : 다음 Task를 실행하지 않을 조건.
모듈 임포트
TriggerDagRunOperator는 아래와 같이 임포트 하여 사용할 수 있다.
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
Task 사용법
task_id와 트리거 되는 dag_id를 지정하여 사용한다.
task_trigger = TriggerDagRunOperator(
task_id='task_trigger',
trigger_dag_id='example_branch_operator',
dag=dag
)
TriggerDagRunOperator에 의해 설정된 Dag 간 의존성은 상단바 > Browse > DAG Dependencies에서 확인할 수 있다.
테스트로 다음과 같은 Dag를 정의하여 실행시켜 보았다.
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
def print_task_type(**kwargs):
print(f"The {kwargs['task_type']} task has completed.")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': '2023-01-01',
'retries': 1,
'retry_delay': 30,
'catchup': False
}
dag = DAG(
dag_id='dag_trigger_test',
default_args=default_args,
schedule='0 0 * * *',
catchup=False
)
task_trigger = TriggerDagRunOperator(
task_id='task_trigger',
trigger_dag_id='example_branch_operator',
dag=dag
)
task_print = PythonOperator(
task_id="task_print",
python_callable=print_task_type,
op_kwargs={"task_type": "ending"},
)
task_trigger >> task_print
별다른 옵션 없이 사용한 경우, 트리거한 Dag에는 manual 타입의 Dag run이 생성되어 동작한다. 그리고트리거 되는 Dag의 동작을 대기하지 않고, 바로 다음 Task를 수행한다.
만약 TriggerDagRunOperator 이후의 Task가 트리거되는 Dag의 실행을 기다려야 한다면 wait_for_completion 매개변수를 True로 지정한다.
task_trigger = TriggerDagRunOperator(
task_id='task_trigger',
trigger_dag_id='example_branch_operator',
wait_for_completion=True,
dag=dag
)
이제 트리거 된 Dag run의 완료를 대기하고 다음 Task를 수행하는 모습을 확인할 수 있다.
참고 문서
https://docs.astronomer.io/learn/cross-dag-dependencies#triggerdagrunoperator
https://medium.com/gumgum-tech/implement-dag-dependencies-using-triggerdagrun-operator-318b46036347
https://blog.devgenius.io/airflow-cross-dag-dependency-b127 dd3 b69 d8