Apache Airflow

[Airflow] BranchPythonOperator - 조건에 따른 분기

비번변경 2023. 1. 5. 15:44

BranchPythonOperator

2023.01.03 - [Airflow] ShortCircuitOperator - 조건부 Task 실행에서 간단히 소개한 작업을 분기하는 오퍼레이터 중 하나이다.

python_callable로 전달받은 함수가 조건에 따라 반환한 Task_id 또는 Task_id 목록을 실행하도록 한다. 분기 논리가 Python 함수로 간단히 구현할 수 있을 때 사용한다.

 

 

import

아래와 같이 import 하여 사용할 수 있다.

from airflow.operators.python import BranchPythonOperator

 

 

task 정의

PythonOperator와 유사하게 사용한다.

task_branch = BranchPythonOperator(
    task_id='TASK_ID',
    python_callable='Task 실행 조건 판단 함수',
    op_args='callable에 전달할 변수',
    op_kwargs='callable에 전달할 키워드 변수',
    dag=dag
)

 

 

예시

예로 들어 다음과 같이 구성할 수 있다.

task_branch가 task_empty와 task_print_today의 task_id 중 무작위로 하나를 선택해 반환하면, 반환한 task가 실행된다.

import datetime
import random

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator


def print_today():
    print(datetime.datetime.now())


def get_run_task():
    return random.choice([task_print_today.task_id, task_empty.task_id])


dag = DAG(
    dag_id='branch_test',
    catchup=False,
    schedule_interval=None,
    start_date=datetime.datetime(2021, 1, 1),
)

task_branch = BranchPythonOperator(
    task_id='task_branch',
    python_callable=get_run_task,
    dag=dag
)

task_print_today = PythonOperator(
    task_id='task_print_today',
    python_callable=print_today,
    dag=dag
)

task_empty = EmptyOperator(
    task_id='task_empty',
    dag=dag
)

task_branch >> [task_print_today, task_empty]

 

 

 

참고 문서

https://docs.astronomer.io/learn/airflow-branch-operator

https://registry.astronomer.io/providers/apache-airflow/modules/branchpythonoperator

https://registry.astronomer.io/dags/example-branch-operator

728x90