BranchPythonOperator
2023.01.03 - [Airflow] ShortCircuitOperator - 조건부 Task 실행에서 간단히 소개한 작업을 분기하는 오퍼레이터 중 하나이다.
python_callable로 전달받은 함수가 조건에 따라 반환한 Task_id 또는 Task_id 목록을 실행하도록 한다. 분기 논리가 Python 함수로 간단히 구현할 수 있을 때 사용한다.
import
아래와 같이 import 하여 사용할 수 있다.
from airflow.operators.python import BranchPythonOperator
task 정의
PythonOperator와 유사하게 사용한다.
task_branch = BranchPythonOperator(
task_id='TASK_ID',
python_callable='Task 실행 조건 판단 함수',
op_args='callable에 전달할 변수',
op_kwargs='callable에 전달할 키워드 변수',
dag=dag
)
예시
예로 들어 다음과 같이 구성할 수 있다.
task_branch가 task_empty와 task_print_today의 task_id 중 무작위로 하나를 선택해 반환하면, 반환한 task가 실행된다.
import datetime
import random
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
def print_today():
print(datetime.datetime.now())
def get_run_task():
return random.choice([task_print_today.task_id, task_empty.task_id])
dag = DAG(
dag_id='branch_test',
catchup=False,
schedule_interval=None,
start_date=datetime.datetime(2021, 1, 1),
)
task_branch = BranchPythonOperator(
task_id='task_branch',
python_callable=get_run_task,
dag=dag
)
task_print_today = PythonOperator(
task_id='task_print_today',
python_callable=print_today,
dag=dag
)
task_empty = EmptyOperator(
task_id='task_empty',
dag=dag
)
task_branch >> [task_print_today, task_empty]
참고 문서
https://docs.astronomer.io/learn/airflow-branch-operator
https://registry.astronomer.io/providers/apache-airflow/modules/branchpythonoperator
728x90