개요
Airflow dag 또는 task 정의 시 retries를 0 이상으로 설정하면 task 실행 중 실패가 발생해도 재실행을 시도한다. 하지만 재시도가 의미 없는 경우가 있을 수 있는데, 이럴 때는 retries 횟수가 남아있더라도 실패로 처리하고 싶다.
Airflow Exception을 이용해 처리해 보자.
관련글 : 2023.05.06 - [Airflow] Exception을 이용한 task 스킵 처리
AirflowFailException
재시도 없이 task를 실패해야 할 때 사용하는 예외이다. 아래와 같이 import 하여 사용한다.
from airflow.exceptions import AirflowFailException
예시
예시로 사용할 함수는 0부터 2까지의 수 중에서 무작위로 선택한 값이 1일 때 AirflowFailException를 발생시킨다. 만약 조건을 충족하지 않으면 일반적인 Exception을 발생시킨다.
정의한 dag는 dag_rags.retries에 의해 task 실패 발생 시 재실행을 한 번 시도한다.
import random
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException, AirflowFailException
from airflow.operators.python import get_current_context
dag_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=1),
}
@task
def task_fail():
context = get_current_context()
val = random.choice(range(3))
print(f"val : {val}")
if val == 1:
raise AirflowFailException(f"Fail task {context['ti'].task_id}")
raise Exception()
@dag(start_date=datetime(2022, 1, 20),
dag_id='exception_test',
default_args=dag_args,
schedule="@once")
def generate_dag():
task_fail()
generate_dag()
실행 결과
Exception - Retry
random 값이 2이므로 Exception이 발생하여 retry를 시도한 것을 확인할 수 있다.
AirflowFailException - Failed
random 값이 1이므로 AirflowFailException을 발생하여 retry 없이 실패한 것을 확인할 수 있다.