Apache Airflow

[Airflow] BranchDayOfWeekOperator - 실행 요일에 따른 분기

비번변경 2023. 1. 4. 18:02

BranchDayOfWeekOperator

2023.01.03 - [Airflow] ShortCircuitOperator - 조건부 Task 실행에서 간단히 소개한 작업을 분기하는 오퍼레이터 중 하나이다.

실행한 날짜의 요일과 지정한 요일의 일치 여부에 따라 실행할 Task를 결정한다.

 

 

import

BranchDayOfWeekOperator는 아래와 같이 import 하여 사용할 수 있다.

from airflow.operators.weekday import BranchDayOfWeekOperator
from airflow.utils.weekday import WeekDay

WeekDay 모듈은 요일 지정 시 사용한다.

 

 

Task 정의

BranchDayOfWeekOperator로 Task를 정의할 때는 조건을 충족할 때 실행할 Task id와 일치하지 않을 때 실행할 Task id, 그리고 조건이 될 요일을 전달해야 한다. 

branch_weekday = BranchDayOfWeekOperator(
    task_id="TASK ID",
    follow_task_ids_if_true="지정한 요일이 맞을 때 실행할 Task id",
    follow_task_ids_if_false="지정한 요일이 아닐 때 실행할 Task id",
    week_day='조건 요일',
    dag=dag
)

 

여러 요일을 전달할 때는 중괄호로 감싸서 전달한다.

branch_weekend = BranchDayOfWeekOperator(
    task_id="make_weekend_choice",
    follow_task_ids_if_true="branch_weekend",
    follow_task_ids_if_false="branch_mid_week",
    week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
)

 

요일은 WeekDay 모듈을 사용하지 않고 문자열로 전달할 수도 있다.

branch = BranchDayOfWeekOperator(
    task_id="make_choice",
    follow_task_ids_if_true="branch_true",
    follow_task_ids_if_false="branch_false",
    week_day="Monday",
)

 

 

예시

예로 들어 다음과 같이 구성할 수 있다.

import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.weekday import BranchDayOfWeekOperator
from airflow.utils.weekday import WeekDay
from airflow.operators.empty import EmptyOperator


def print_today():
    print(datetime.datetime.now())


dag = DAG(
    dag_id='short_circuit_test',
    catchup=False,
    schedule_interval=None,
    start_date=datetime.datetime(2021, 1, 1),
)

branch_weekday = BranchDayOfWeekOperator(
    task_id="branch_weekday",
    follow_task_ids_if_true="task_print_today",
    follow_task_ids_if_false="branch_false",
    week_day=WeekDay.SATURDAY,
    dag=dag
)

task_print_today = PythonOperator(
    task_id='task_print_today',
    python_callable=print_today,
    dag=dag
)

task_empty = EmptyOperator(
    task_id='task_empty',
    dag=dag
)

branch_weekday >> [task_print_today, task_empty]

DAG가 실행된 요일이 토요일이면 task_print_today를 실행하고, 토요일이 아니면 task_empty를 실행한다.

 

 

참고 문서

https://registry.astronomer.io/providers/apache-airflow/modules/branchdayofweekoperator

https://registry.astronomer.io/dags/example-branch-day-of-week-operator

 

728x90