Apache Airflow

[Airflow] Task 실패 시 Slack 알람 전송하기

비번변경 2022. 8. 7. 22:46

개요

2022.08.05 - [Airflow] Slack Webhook 전송 글에서 Airflow DAG를 실행하여 Slack Webhook을 전송해보았다.

이제 이 코드를 이용해 DAG 실행 실패 시 알람을 받을 수 있도록 하고자 한다.

 

 

Callbacks

Callback은 지정된 task 또는 지정된 DAG의 모든 task의 상태 변화에 대해 동작하는 구성 요소이다. Callback을 사용하면 특정 Task가 실패했을 때나 성공했을 때 알림을 보내는 등의 동작을 구성할 수 있다.

 

Airflow에서는 아래와 같은 Callback 유형을 사용할 수 있다.

 

Callback Type

Name Description
on_success_callback Task 성공 시 호출된다.
on_failure_callback Task 실패 시 호출된다.
sla_miss_callback Task가 정해진 SLA를 충족하지 못할 때 호출된다.
on_retry_callback Task가 재실행될 때 호출된다.

 

이 글에서는 실행 실패 시 Slack으로 Webhook을 보낼 것이므로 on_failure_callback을 사용한다.

 

Callack 작성

Airflow를 사용할 때는 보통 아래와 같은 형식의 알람을 사용하는 것 같다. 

Airflow 알람 형식

형식에 맞게 알람 메시지를 작성하여 SlackWebhook.send_message()을 호출하여 알람을 전송하는 airflow_failed_callback 함수를 작성해보도록 한다. airflow_failed_callback 함수도 SlackWebhook.py에 작성한다.

 

Context

메시지를 작성하기 위해서는 DAG에 대한 정보가 필요하다. Airflow는 Task 실행 시 여러 변수를 수집하여 execute()의 context 매개변수로 전달한다. context 매개변수엔 현재 작업에 대한 정보가 저장되어 있으며, 저장된 정보에 대한 설명은 Templates reference 에서 확인할 수 있다.

 

관련 내용을 확인하여 작성한 airflow_failed_callback은 아래와 같다.

import requests

# 메시지 전송
def send_message(name, message):
    url = "https://hooks.slack.com/services/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"
    icon_emoji = ":crying_cat_face:"
    channel = "# test_alarm"
    payload = {"channel": channel, "username" : name, "text" : message, "icon_emoji" : icon_emoji }
    
    requests.post(url, json=payload)


def airflow_failed_callback(context):
    # message 작성
    message = """
            :red_circle: Task Failed.
            *Dag*: {dag}
            *Task*: {task}
            *Execution Time*: {exec_date}
            *Exception*: {exception}
            *Log Url*: {log_url}
            """.format(
        dag=context.get('task_instance').dag_id,
        task=context.get('task_instance').task_id,
        exec_date=context.get('execution_time'),
        exception=context.get('exception'),
        log_url=context.get('task_instance').log_url
    )
    send_message('Airflow', message)

 

Operator 매개변수에 on_failure_callback 추가

DAG 정의 파일에서 on_failure_callback 설정을 추가하여 DAG 인스턴스 생성 시 사용할 수 있도록 한다.

from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

import SlackWebhook

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
    # 'sla': timedelta(hours=2),
    'on_failure_callback': SlackWebhook.airflow_failed_callback,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

dag = DAG(
    'push_slack',
    default_args=default_args,
    description='push_slack',
    schedule_interval=timedelta(days=1),
)

 

동작 확인

간단히 예외를 발생시키는 Task를 추가하여 정상적으로 동작하는지 확인해본다.

def raise_exception():
    err_message = "Error Message"
    raise Exception(err_message)

# task
t2 = PythonOperator(
    task_id='callback_test',
    python_callable=raise_exception,
    dag=dag

DAG 실행

예외가 발생했으므로 Task가 실패한 모습을 확인할 수 있다.

DAG 실패 알람

메시지도 정상적으로 확인할 수 있다.

 

참고 문서

https://stackoverflow.com/questions/69527239/what-is-context-variable-in-airflow-operators

https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-reference

 

728x90