Apache Airflow

[Airflow] PythonSensor 사용하기

비번변경 2024. 5. 10. 22:32

개요

2024.04.05-[Airflow] ExternalTaskSensor - 다른 dag/task 작업 완료 대기에서 다른 Dag나 Task의 실행을 센싱 했는데, 여러 Dag나 Task의 실행을 센싱 하기엔 적합하지 않다는 생각이 들었다.

이유는 여러 가지가 있는데, ExternalTaskSensor는 한 번에 하나의 Task나 Dag만을 센싱 할 수 있기 때문에 여러 Dag나 Task를 센싱 하기 위해서는 여러 개의 Sensor를 생성해서 사용해야 한다. 또, Sensor 간 종속성을 설정하는 경우가 아니라면 여러 Dag나 Task 중 실패가 발생했을 때 센싱을 중단할 방법이 없어 보였다.

 

이런 이유로 2024.04.14-[Airflow] 센서(Sensor) 란에서 살펴본 여러 Sensor 중 PythonSensor를 사용하여 Dag/Task 센서를 구현해보려고 한다.

 

 

PythonSensor

PythonSensor는 python_callable 매개변수로 전달한 함수가 True를 반환할 때까지 대기한다.

airflow.sensors.python.PythonSensor(*, python_callable, 
                                    op_args=None, op_kwargs=None, 
                                    templates_dict=None, **kwargs)

op_args, op_kwargs에는 python_callable로 전달한 함수의 매개변수를 지정한다.

 

t10 = PythonSensor(
    task_id="failure_timeout_sensor_python", 
    timeout=3, 
    soft_fail=True, 
    python_callable=failure_callable
)

@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def wait_for_upstream() -> PokeReturnValue:
    return PokeReturnValue(is_done=True, xcom_value="xcom_value")

필요시 baseSensor의 매개변수를 사용하여 동작 방식을 조정할 수 있고, @task.sensor 데코레이터를 사용하여 함수를 Task로 변환하여 사용할 수도 있다.

 

 

모듈 임포트

PythonSensor나 데코레이터를 사용할 때는 아래와 같은 라이브러리 임포트가 필요하다.

# 센서
from airflow.sensors.python import PythonSensor

# 데코레이터
from airflow.decorators import dag, task

 

 

테스트

테스트 목적으로 간단한 dag/task 센싱 함수를 구현하여 실행해 본다.

 

실행할 함수 (python_callable)

센터링할 시간 범위와 Dag, Task 쌍을 'dag_id.task_id' 형식으로 지정한다.

지정한 값을 사용하여 Airflow Meta DB에서 dag_run, task_instance 테이블에서 task의 상태를 조회한다.

 

센서 동작 방식

  • Task 중 실패한 게 존재하면 : 
    센싱을 중지한다.
  • Task 중 실패한 게 존재하지 않으면 :
    • Dag, Task 쌍의 길이와 조회한 데이터의 행 수가 같지 않으면 :
      아직 실행되지 않은 Dag/Task가 존재한다는 의미이기 때문에 대기가 필요하다. 따라서 False를 반환한다.
    • Dag, Task 쌍의 길이와 조회한 데이터의 행 수가 같으면 :
      • Task가 전부 success 상태라면 :
        센싱을 완료하고 Task를 실행한다.

코드

import psycopg2
from datetime import datetime
from datetime import timedelta
from airflow.exceptions import AirflowFailException

def connect():
    return psycopg2.connect(host='localhost', port=5432, user='airflow', password='비밀번호', dbname='airflow')

def execute():
    end_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
    start_date = end_date - timedelta(days=1)
    sensored_tasks = ('dag_sensor_test.task_sensor_test', 'example_branch_python_operator_decorator.join')
    sql = f"""\
SELECT *
  FROM (
    SELECT concat(dag_run.dag_id, '.', task_instance.task_id) AS task_id, dag_run.execution_date, task_instance.state
      FROM dag_run join task_instance
        ON dag_run.dag_id = task_instance.dag_id 
       AND dag_run.run_id = task_instance.run_id 
     WHERE TRUE
       AND dag_run.execution_date >= '{start_date}'
       AND dag_run.execution_date < '{end_date}'
       ) a
 WHERE TRUE
   AND task_id IN {sensored_tasks}
   """

    conn = connect()
    cursor = conn.cursor()
    print(sql)
    cursor.execute(sql)
    result = cursor.fetchall()
    print(result)
    if any(data[2] in ('failed', 'upstream_failed') for data in result):
        raise AirflowFailException('sensored_tasks failed.')
    else:
        if len(result) == len(sensored_tasks) and all(data[2] == 'success' for data in result):
            return True
    return False

 

Task 정의 및 Grid

이 글에서는 PythonSensor를 사용해 센서를 정의한다. Dag/Task 실행 센서이기 때문에 실행 시간이 짧은 편은 아니라고 생각했다. 따라서 실행 간격을 1분으로 지정하고 reschedule 모드로 지정했다. Timeout도 적절하게 지정한다.

from airflow import DAG
from airflow.sensors.python import PythonSensor

task_sensor = PythonSensor(
    task_id='task_sensor_test',
    python_callable=execute,
    poke_interval=60,
    timeout=600,
    mode='reschedule',
    dag=dag
)

 

전체 코드는 접은글로 적어둔다.

더보기
import psycopg2
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.python import PythonSensor
from datetime import datetime
from datetime import timedelta
from airflow.exceptions import AirflowFailException


def connect():
    return psycopg2.connect(host='localhost', port=5432, user='airflow', password='airflow1234', dbname='airflow')


def execute():
    end_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
    start_date = end_date - timedelta(days=1)
    sensored_tasks = ('dag_sensor_test.task_sensor_test', 'example_branch_python_operator_decorator.join')
    sql = f"""\
SELECT *
  FROM (
    SELECT concat(dag_run.dag_id, '.', task_instance.task_id) AS task_id, dag_run.execution_date, task_instance.state
      FROM dag_run join task_instance
        ON dag_run.dag_id = task_instance.dag_id 
       AND dag_run.run_id = task_instance.run_id 
     WHERE TRUE
       AND dag_run.execution_date >= '{start_date}'
       AND dag_run.execution_date < '{end_date}'
       ) a
 WHERE TRUE
   AND task_id IN {sensored_tasks}
   """

    conn = connect()
    cursor = conn.cursor()
    print(sql)
    cursor.execute(sql)
    result = cursor.fetchall()
    print(result)
    if any(data[2] in ('failed', 'upstream_failed') for data in result):
        raise AirflowFailException('sensored_tasks failed.')
    else:
        if len(result) == len(sensored_tasks) and all(data[2] == 'success' for data in result):
            return True
    return False


def print_hello():
    print('hello')


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': '2023-01-01',
    'retries': 1,
    'retry_delay': 30,
}

dag = DAG(
    'dag_sensor_test_2',
    default_args=default_args,
    schedule='0 0 * * *',
    catchup=False
)

task_sensor = PythonSensor(
    task_id='task_sensor_test',
    python_callable=execute,
    poke_interval=60,
    timeout=600,
    mode='reschedule',
    dag=dag
)

task_test = PythonOperator(
    task_id='task_hello',
    python_callable=print_hello,
    dag=dag
)
task_sensor >> task_test

 

동작 테스트

- 센싱 Dag/Task가 전부/일부 실행되지 않은 경우

다음 주기를 기다린다.

 

- 센싱 Dag/Task에 실패 상태가 포함된 경우

AirflowFailException을 사용하여 센싱을 중단시켰다.

 

- 센싱 Dag/Task가 전부 성공한 경우

센싱을 완료하고 Task를 실행시킨다.

 

 

참고 문서

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/python/index.html