개요
2024.04.05-[Airflow] ExternalTaskSensor - 다른 dag/task 작업 완료 대기에서 다른 Dag나 Task의 실행을 센싱 했는데, 여러 Dag나 Task의 실행을 센싱 하기엔 적합하지 않다는 생각이 들었다.
이유는 여러 가지가 있는데, ExternalTaskSensor는 한 번에 하나의 Task나 Dag만을 센싱 할 수 있기 때문에 여러 Dag나 Task를 센싱 하기 위해서는 여러 개의 Sensor를 생성해서 사용해야 한다. 또, Sensor 간 종속성을 설정하는 경우가 아니라면 여러 Dag나 Task 중 실패가 발생했을 때 센싱을 중단할 방법이 없어 보였다.
이런 이유로 2024.04.14-[Airflow] 센서(Sensor) 란에서 살펴본 여러 Sensor 중 PythonSensor를 사용하여 Dag/Task 센서를 구현해보려고 한다.
PythonSensor
PythonSensor는 python_callable 매개변수로 전달한 함수가 True를 반환할 때까지 대기한다.
airflow.sensors.python.PythonSensor(*, python_callable,
op_args=None, op_kwargs=None,
templates_dict=None, **kwargs)
op_args, op_kwargs에는 python_callable로 전달한 함수의 매개변수를 지정한다.
t10 = PythonSensor(
task_id="failure_timeout_sensor_python",
timeout=3,
soft_fail=True,
python_callable=failure_callable
)
@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def wait_for_upstream() -> PokeReturnValue:
return PokeReturnValue(is_done=True, xcom_value="xcom_value")
필요시 baseSensor의 매개변수를 사용하여 동작 방식을 조정할 수 있고, @task.sensor 데코레이터를 사용하여 함수를 Task로 변환하여 사용할 수도 있다.
모듈 임포트
PythonSensor나 데코레이터를 사용할 때는 아래와 같은 라이브러리 임포트가 필요하다.
# 센서
from airflow.sensors.python import PythonSensor
# 데코레이터
from airflow.decorators import dag, task
테스트
테스트 목적으로 간단한 dag/task 센싱 함수를 구현하여 실행해 본다.
실행할 함수 (python_callable)
센터링할 시간 범위와 Dag, Task 쌍을 'dag_id.task_id' 형식으로 지정한다.
지정한 값을 사용하여 Airflow Meta DB에서 dag_run, task_instance 테이블에서 task의 상태를 조회한다.
센서 동작 방식
- Task 중 실패한 게 존재하면 :
센싱을 중지한다. - Task 중 실패한 게 존재하지 않으면 :
- Dag, Task 쌍의 길이와 조회한 데이터의 행 수가 같지 않으면 :
아직 실행되지 않은 Dag/Task가 존재한다는 의미이기 때문에 대기가 필요하다. 따라서 False를 반환한다. - Dag, Task 쌍의 길이와 조회한 데이터의 행 수가 같으면 :
- Task가 전부 success 상태라면 :
센싱을 완료하고 Task를 실행한다.
- Task가 전부 success 상태라면 :
- Dag, Task 쌍의 길이와 조회한 데이터의 행 수가 같지 않으면 :
코드
import psycopg2
from datetime import datetime
from datetime import timedelta
from airflow.exceptions import AirflowFailException
def connect():
return psycopg2.connect(host='localhost', port=5432, user='airflow', password='비밀번호', dbname='airflow')
def execute():
end_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
start_date = end_date - timedelta(days=1)
sensored_tasks = ('dag_sensor_test.task_sensor_test', 'example_branch_python_operator_decorator.join')
sql = f"""\
SELECT *
FROM (
SELECT concat(dag_run.dag_id, '.', task_instance.task_id) AS task_id, dag_run.execution_date, task_instance.state
FROM dag_run join task_instance
ON dag_run.dag_id = task_instance.dag_id
AND dag_run.run_id = task_instance.run_id
WHERE TRUE
AND dag_run.execution_date >= '{start_date}'
AND dag_run.execution_date < '{end_date}'
) a
WHERE TRUE
AND task_id IN {sensored_tasks}
"""
conn = connect()
cursor = conn.cursor()
print(sql)
cursor.execute(sql)
result = cursor.fetchall()
print(result)
if any(data[2] in ('failed', 'upstream_failed') for data in result):
raise AirflowFailException('sensored_tasks failed.')
else:
if len(result) == len(sensored_tasks) and all(data[2] == 'success' for data in result):
return True
return False
Task 정의 및 Grid
이 글에서는 PythonSensor를 사용해 센서를 정의한다. Dag/Task 실행 센서이기 때문에 실행 시간이 짧은 편은 아니라고 생각했다. 따라서 실행 간격을 1분으로 지정하고 reschedule 모드로 지정했다. Timeout도 적절하게 지정한다.
from airflow import DAG
from airflow.sensors.python import PythonSensor
task_sensor = PythonSensor(
task_id='task_sensor_test',
python_callable=execute,
poke_interval=60,
timeout=600,
mode='reschedule',
dag=dag
)
전체 코드는 접은글로 적어둔다.
import psycopg2
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.python import PythonSensor
from datetime import datetime
from datetime import timedelta
from airflow.exceptions import AirflowFailException
def connect():
return psycopg2.connect(host='localhost', port=5432, user='airflow', password='airflow1234', dbname='airflow')
def execute():
end_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
start_date = end_date - timedelta(days=1)
sensored_tasks = ('dag_sensor_test.task_sensor_test', 'example_branch_python_operator_decorator.join')
sql = f"""\
SELECT *
FROM (
SELECT concat(dag_run.dag_id, '.', task_instance.task_id) AS task_id, dag_run.execution_date, task_instance.state
FROM dag_run join task_instance
ON dag_run.dag_id = task_instance.dag_id
AND dag_run.run_id = task_instance.run_id
WHERE TRUE
AND dag_run.execution_date >= '{start_date}'
AND dag_run.execution_date < '{end_date}'
) a
WHERE TRUE
AND task_id IN {sensored_tasks}
"""
conn = connect()
cursor = conn.cursor()
print(sql)
cursor.execute(sql)
result = cursor.fetchall()
print(result)
if any(data[2] in ('failed', 'upstream_failed') for data in result):
raise AirflowFailException('sensored_tasks failed.')
else:
if len(result) == len(sensored_tasks) and all(data[2] == 'success' for data in result):
return True
return False
def print_hello():
print('hello')
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': '2023-01-01',
'retries': 1,
'retry_delay': 30,
}
dag = DAG(
'dag_sensor_test_2',
default_args=default_args,
schedule='0 0 * * *',
catchup=False
)
task_sensor = PythonSensor(
task_id='task_sensor_test',
python_callable=execute,
poke_interval=60,
timeout=600,
mode='reschedule',
dag=dag
)
task_test = PythonOperator(
task_id='task_hello',
python_callable=print_hello,
dag=dag
)
task_sensor >> task_test
동작 테스트
- 센싱 Dag/Task가 전부/일부 실행되지 않은 경우
다음 주기를 기다린다.
- 센싱 Dag/Task에 실패 상태가 포함된 경우
AirflowFailException을 사용하여 센싱을 중단시켰다.
- 센싱 Dag/Task가 전부 성공한 경우
센싱을 완료하고 Task를 실행시킨다.
참고 문서
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/python/index.html