개요
2022.08.05 - [Airflow] Slack Webhook 전송 글에서 Airflow DAG를 실행하여 Slack Webhook을 전송해보았다.
이제 이 코드를 이용해 DAG 실행 실패 시 알람을 받을 수 있도록 하고자 한다.
Callbacks
Callback은 지정된 task 또는 지정된 DAG의 모든 task의 상태 변화에 대해 동작하는 구성 요소이다. Callback을 사용하면 특정 Task가 실패했을 때나 성공했을 때 알림을 보내는 등의 동작을 구성할 수 있다.
Airflow에서는 아래와 같은 Callback 유형을 사용할 수 있다.
Callback Type
Name | Description |
on_success_callback | Task 성공 시 호출된다. |
on_failure_callback | Task 실패 시 호출된다. |
sla_miss_callback | Task가 정해진 SLA를 충족하지 못할 때 호출된다. |
on_retry_callback | Task가 재실행될 때 호출된다. |
이 글에서는 실행 실패 시 Slack으로 Webhook을 보낼 것이므로 on_failure_callback을 사용한다.
Callack 작성
Airflow를 사용할 때는 보통 아래와 같은 형식의 알람을 사용하는 것 같다.
형식에 맞게 알람 메시지를 작성하여 SlackWebhook.send_message()을 호출하여 알람을 전송하는 airflow_failed_callback 함수를 작성해보도록 한다. airflow_failed_callback 함수도 SlackWebhook.py에 작성한다.
Context
메시지를 작성하기 위해서는 DAG에 대한 정보가 필요하다. Airflow는 Task 실행 시 여러 변수를 수집하여 execute()의 context 매개변수로 전달한다. context 매개변수엔 현재 작업에 대한 정보가 저장되어 있으며, 저장된 정보에 대한 설명은 Templates reference 에서 확인할 수 있다.
관련 내용을 확인하여 작성한 airflow_failed_callback은 아래와 같다.
import requests
# 메시지 전송
def send_message(name, message):
url = "https://hooks.slack.com/services/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"
icon_emoji = ":crying_cat_face:"
channel = "# test_alarm"
payload = {"channel": channel, "username" : name, "text" : message, "icon_emoji" : icon_emoji }
requests.post(url, json=payload)
def airflow_failed_callback(context):
# message 작성
message = """
:red_circle: Task Failed.
*Dag*: {dag}
*Task*: {task}
*Execution Time*: {exec_date}
*Exception*: {exception}
*Log Url*: {log_url}
""".format(
dag=context.get('task_instance').dag_id,
task=context.get('task_instance').task_id,
exec_date=context.get('execution_time'),
exception=context.get('exception'),
log_url=context.get('task_instance').log_url
)
send_message('Airflow', message)
Operator 매개변수에 on_failure_callback 추가
DAG 정의 파일에서 on_failure_callback 설정을 추가하여 DAG 인스턴스 생성 시 사용할 수 있도록 한다.
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import SlackWebhook
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
# 'sla': timedelta(hours=2),
'on_failure_callback': SlackWebhook.airflow_failed_callback,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
dag = DAG(
'push_slack',
default_args=default_args,
description='push_slack',
schedule_interval=timedelta(days=1),
)
동작 확인
간단히 예외를 발생시키는 Task를 추가하여 정상적으로 동작하는지 확인해본다.
def raise_exception():
err_message = "Error Message"
raise Exception(err_message)
# task
t2 = PythonOperator(
task_id='callback_test',
python_callable=raise_exception,
dag=dag
예외가 발생했으므로 Task가 실패한 모습을 확인할 수 있다.
메시지도 정상적으로 확인할 수 있다.
참고 문서
https://stackoverflow.com/questions/69527239/what-is-context-variable-in-airflow-operators
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-reference