Apache Airflow

[Airflow] ExternalTaskSensor - 다른 dag/task 작업 완료 대기

비번변경 2024. 4. 30. 23:14

개요

Airflow에서는 여러 작업 간의 종속성을 설정하여 Task Flow을 구성할 수 있는데, 종속성을 설정할 작업이 서로 다른 Dag에 속해있는 경우가 있을 수 있다.

이런 경우 Airflow에서 제공하는 Sensor Operator인 ExternalTaskSensor의 사용을 고려해 볼 수 있다.

이 글에서는 ExternalTaskSensor의 사용법 정도를 간단히 적어둔다.

 

 

ExternalTaskSensor

ExternalTaskSensor는 특정 logical_date(execution_date)에 대한 다른 dag, task group, task가 완료되기를 기다리는 Operator이다.

airflow.sensors.external_task.ExternalTaskSensor(*, external_dag_id, 
                                                 external_task_id=None, external_task_ids=None,
                                                 external_task_group_id=None, 
                                                 allowed_states=None, failed_states=None,
                                                 execution_delta=None, execution_date_fn=None, 
                                                 check_existence=False,
                                                 **kwargs)

- external_dag_id : 완료를 대기할 Dag의 ID

- external_task_id : 완료를 대기할 Task의 ID

- external_task_ids : 완료를 대기할 Task의 ID 목록. external_task_id와 중복으로 지정할 수 없다.

- allowed_states : 다음 Task를 실행할 조건. 기본적으로 대기하는 dag, task가 SUCCESS일 때 실행된다.

- failed_states : 다음 Task를 실행하지 않을 조건

- execution_delta : 이전 실행과의 시간 차이. 기본값은 현재 task, dag와 동일한 logical_date이다.

- execution_date_fn : 현재 logical_date를 입력으로 받아 센서링할 logical_date를 반환하는 함수 지정

- check_existence : 대기할 Dag나 Task의 존재 확인 여부 지정

 

이 외 다른 매개변수도 존재하니 필요 시 공식 문서를 참조한다. ExternalTaskSensor는 BaseSensorOperator를 상속받아 정의되어 있으므로 BaseSensorOperator의 매개변수도 적용할 수 있다.

 

 

모듈 임포트

ExternalTaskSensor는 아래와 같이 임포트하여 사용할 수 있다.

from airflow.sensors.external_task import ExternalTaskSensor

 

 

Task 사용법

task_id와 대기할 external_dag_id를 지정하여 Task를 생성할 수 있다.

task_sensor = ExternalTaskSensor(
    task_id='test_sensor',
    external_dag_id='example_branch_python_operator_decorator',
    dag=dag
)

참고로 Dag 간 종속성은 상단바 > Browse > DAG Dependencies에서 확인할 수 있다.

별다른 옵션 없이 사용한 경우, Sensoring Dag와 동일한 logical_date(execution_date)를 가진 dag_run 실행을 모니터링한다. 즉, 두 Dag의 스케줄 설정이 동일해야 한다! 만약 스케줄 설정이 다르다면 execution_date_fn, execution_delta를 적절하게 맞춰주어야 한다.

 

일반적으로 mode, poke_interval, timeout 정도의 매개변수와 필요 시 external_task_id와 allowed_states, failed_states 정도를 추가하여 사용하는 것 같다.

task_sensor = ExternalTaskSensor(
    task_id='task_sensor_test',
    external_dag_id='example_branch_python_operator_decorator',
    external_task_id='join',
    allowed_states=["success"],
    failed_states=["failed", "skipped"],
    poke_interval=60,
    timeout=600,
    mode='reschedule',
    dag=dag
)
  • mode : 센서의 작동 방식. poke 아니면 reschedule을 지정할 수 있다.
    poke 모드를 사용하면 poke 사이에는 sleep 상태를 유지하여 sensor의 전체 실행 시간 동안 worker의 슬롯을 점유한다. 실행 시간이 짧은 센서일 때 적절하다.
    reschedule 모드를 사용하면 조건을 만족하지 않았을 때 task가 worker 슬롯을 놓아주고 up_for_reschedule 상태를 유지하다가 다시 스케쥴링된다. 실행 시간이 길어질 센서일 때 적절하다.
  • poke_interval : 조건을 확인하는 주기. 초 단위로 지정한다.
  • timeout : task의 최대 실행 시간. 초 단위로 지정한다.

 

- Sensoring Target 성공 상태 시

- ExternalTaskSensor 실행 로그

 

- Sensoring Target 성공 실패 상태 시

- ExternalTaskSensor 실행 로그

 

 

 

참고 문서

https://airflow.apache.org/docs/apache-airflow/2.5.1/howto/operator/external_task_sensor.html

airflow.sensors.external_task.ExternalTaskSensor

https://docs.astronomer.io/learn/cross-dag-dependencies

https://registry.astronomer.io/providers/apache-airflow/versions/latest/modules/externaltasksensor

https://amazelimi.tistory.com/entry/Airflow-Sensor-ExternalTaskSensor-LIM

https://wookiist.dev/169

https://brownbears.tistory.com/593

728x90