Apache Airflow

[Airflow] Slack Webhook 전송

비번변경 2022. 8. 5. 19:32

개요

Airflow DAG 실패 시 Slack 알람을 받고자 한다. 이를 위해 먼저 Airflow DAG를 통해 Slack Webhook을 보내보도록 한다.

이 글에서는 Airflow Slack 관련 Operator는 사용하지 않을 것이다. 따라서 Slack Operator를 따로 설치하거나 인증을 위한 Slack Token 등을 따로 생성하는 내용이 포함되어 있지 않다.

 

알람을 받은 Slack 워크스페이스와 imcoming webhook은 이미 생성해두었다고 가정한다. 필요시 아래 글을 참조한다.

2021.05.16 - slack에 Webhook 추가

 

 

SlackWebhook.py 작성

Slack Webhook을 전송하는 모듈을 작성한다.

이전에 2021.05.16 - python 코드 작성에서 작성했던 함수를 재활용했다.

import requests

# 함수
def send_message(name, message):
    url = "SLACK_WEBHOOK_URL"
    icon_emoji = ":crying_cat_face:"
    channel = "# test_alarm"

    payload = {"channel": channel, "username" : name, "text" : message, "icon_emoji" : icon_emoji }

    # 메세지 전송
    requests.post(url, json=payload)

 

 

DAG 작성

위에서 작성한 SlackWebhook을 import하여 메시지를 전송하는 PythonOperator Task를 가진 DAG를 작성한다.

from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

# ./SlackWebhook.py
import SlackWebhook

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'retries': 0,
    'retry_delay': timedelta(minutes=5)
}

# DAG 인스턴스화
dag = DAG(
    'push_slack',
    default_args=default_args,
    description='push_slack',
    schedule_interval=timedelta(days=1),
)

def send_message():
    SlackWebhook.send_message('Airflow', 'Test Message')

# task
t1 = PythonOperator(
    task_id='push_slack',
    python_callable=send_message,
    dag=dag,
)

 

 

DAG 실행 테스트

스케쥴링이 설정되어 있는 DAG를 On 하면 자동으로 Trigger 되어 실행된다. start_date 설정으로 인해 2번 실행된 상태이다.

DAG 생성

 

DAG 실행 결과

정상적으로 Slack 메세지를 확인할 수 있다.

 

 

참고 문서

 

 

 

728x90