조건부 실행
조건에 따라 Airflow Task를 실행하거나 실행하지 않고 싶다. 또는 조건에 따라 실행할 Airflow Task를 결정하고 싶다.
이런 경우를 위해 Airflow에서는 다음과 같은 조건부 논리 구현 방법을 제공하거나 분기를 나눌 수 있는 방법을 제공한다.
- BranchPythonOperator : 조건에 따라 반환한 task_id에 해당하는 Task 실행
- ShortCircuitOperator : 조건이 만족하는 경우에만 다음 Task 실행
- BranchSQLOperator : SQL 쿼리 결과의 True/False 여부로 실행할 Task 분기
- BranchDayOfWeekOperator : 현재 요일이 지정된 week_day 매개변수와 동일한지에 따라 실행할 Task 분기
- BranchDateTimeOperator : 현재 시각이 target_lower와 target_upper 매개변수 사이의 시각인지에 따라 실행한 Task 분기
이 글에서는 그중에서도 ShortCircuitOperator에 대해 정리한다.
ShortCircuitOperator
조건을 만족해야 downstream Task를 실행하는 Operator이다.
PythonOperator에서 파생된 Operator로, python_callable로 전달받은 함수가 True를 반환하면 downstream Task를 실행하고 False를 반환하면 downstream Task 실행을 건너뛴다(skip).
DAG의 일부 Task가 가끔 실행되는 경우, 즉 DAG 자체는 매일 실행되지만 일부 Task는 특정 요일에만 실행되어야 하는 경우 등에 적합하다. 즉, 조건에 따라 Task 실행 여부를 결정할 때 사용한다.
import
아래와 같이 import하여 사용할 수 있다.
from airflow.operators.python import ShortCircuitOperator
Task 정의
PythonOperator와 유사하게 사용한다.
task_trigger = ShortCircuitOperator(
task_id='TASK_ID',
python_callable='Task 실행 조건 판단 함수',
op_args='callable에 전달할 변수',
op_kwargs='callable에 전달할 키워드 변수',
dag=dag
)
예시
예로 들어 다음과 같이 구성할 수 있다.
import datetime
import functools
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.python import ShortCircuitOperator
def check_trigger(flag=True):
return flag
def print_today():
print(datetime.datetime.now())
dag = DAG(
dag_id='short_circuit_test',
catchup=False,
schedule_interval=None,
start_date=datetime.datetime(2021, 1, 1),
)
task_trigger = ShortCircuitOperator(
task_id='task_trigger',
python_callable=functools.partial(check_trigger, flag=False),
dag=dag
)
task_echo = PythonOperator(
task_id='task_echo',
python_callable=print_today,
dag=dag
)
task_trigger >> task_echo
task_trigger가 True를 반환하면 task_echo를 실행하고, task_trigger가 False를 반환하면 task_echo를 실행하지 않고 DAG 실행을 종료한다.
참고 문서
[airflow] branch operator 로 조건별로 task 수행시키기
https://docs.astronomer.io/learn/airflow-branch-operator
https://registry.astronomer.io/providers/apache-airflow/modules/shortcircuitoperator
https://registry.astronomer.io/dags/example-short-circuit-operator