Apache Airflow

[Airflow] retries 설정 무시하고 task 실패 처리

비번변경 2023. 5. 7. 19:26

개요

Airflow dag 또는 task 정의 시 retries를 0 이상으로 설정하면 task 실행 중 실패가 발생해도 재실행을 시도한다. 하지만 재시도가 의미 없는 경우가 있을 수 있는데, 이럴 때는 retries 횟수가 남아있더라도 실패로 처리하고 싶다.

 

Airflow Exception을 이용해 처리해 보자.

 

관련글 : 2023.05.06 - [Airflow] Exception을 이용한 task 스킵 처리

 

 

AirflowFailException

재시도 없이 task를 실패해야 할 때 사용하는 예외이다. 아래와 같이 import 하여 사용한다.

from airflow.exceptions import AirflowFailException

 

 

예시

예시로 사용할 함수는 0부터 2까지의 수 중에서 무작위로 선택한 값이 1일 때 AirflowFailException를 발생시킨다. 만약 조건을 충족하지 않으면 일반적인 Exception을 발생시킨다.

정의한 dag는 dag_rags.retries에 의해 task 실패 발생 시 재실행을 한 번 시도한다.

import random
from datetime import datetime, timedelta

from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException, AirflowFailException
from airflow.operators.python import get_current_context

dag_args = {
    "owner": "airflow",
    "retries": 1,
    "retry_delay": timedelta(minutes=1),
}

@task
def task_fail():
    context = get_current_context()
    val = random.choice(range(3))
    print(f"val : {val}")
    if val == 1:
        raise AirflowFailException(f"Fail task {context['ti'].task_id}")
    raise Exception()


@dag(start_date=datetime(2022, 1, 20),
     dag_id='exception_test',
     default_args=dag_args,
     schedule="@once")
def generate_dag():
    task_fail()
    

generate_dag()

 

 

 

실행 결과

Exception - Retry

random 값이 2이므로 Exception이 발생하여 retry를 시도한 것을 확인할 수 있다.

 

AirflowFailException - Failed

random 값이 1이므로 AirflowFailException을 발생하여 retry 없이 실패한 것을 확인할 수 있다.

 

 

참고 문서

airflow.exceptions.AirflowFailException