Apache Airflow 71

[Airflow] PythonOperator 매개변수 전달

개요 DAG 실행 시 실행할 PythonOperator가 호출할 함수에 매개변수를 전달하고 싶다. 테스트로 아래와 같은 코드를 작성해 실행해보았으나, python_callable param must be callable 에러가 발생했다. ... 생략 ... t1 = PythonOperator( task_id='print_string', python_callable=print("hello, world!"), dag=dag, ) Airflow에서 python_callable로 실행할 함수에 인자값은 어떻게 전달할 수 있을까? 방법을 정리해둔다. op_args Airflow PythonOperator API 문서에 따르면 PythonOperator는 아래와 같은 매개변수를 가진다. 그 중 op_args는 함수..

Apache Airflow 2022.08.11

[Airflow] Task 실패 시 Slack 알람 전송하기

개요 2022.08.05 - [Airflow] Slack Webhook 전송 글에서 Airflow DAG를 실행하여 Slack Webhook을 전송해보았다. 이제 이 코드를 이용해 DAG 실행 실패 시 알람을 받을 수 있도록 하고자 한다. Callbacks Callback은 지정된 task 또는 지정된 DAG의 모든 task의 상태 변화에 대해 동작하는 구성 요소이다. Callback을 사용하면 특정 Task가 실패했을 때나 성공했을 때 알림을 보내는 등의 동작을 구성할 수 있다. Airflow에서는 아래와 같은 Callback 유형을 사용할 수 있다. Callback Type Name Description on_success_callback Task 성공 시 호출된다. on_failure_callbac..

Apache Airflow 2022.08.07

[Airflow] Slack Webhook 전송

개요 Airflow DAG 실패 시 Slack 알람을 받고자 한다. 이를 위해 먼저 Airflow DAG를 통해 Slack Webhook을 보내보도록 한다. 이 글에서는 Airflow Slack 관련 Operator는 사용하지 않을 것이다. 따라서 Slack Operator를 따로 설치하거나 인증을 위한 Slack Token 등을 따로 생성하는 내용이 포함되어 있지 않다. 알람을 받은 Slack 워크스페이스와 imcoming webhook은 이미 생성해두었다고 가정한다. 필요시 아래 글을 참조한다. 2021.05.16 - slack에 Webhook 추가 SlackWebhook.py 작성 Slack Webhook을 전송하는 모듈을 작성한다. 이전에 2021.05.16 - python 코드 작성에서 작성했던..

Apache Airflow 2022.08.05

[Airflow] Task 간 의존성 설정

Task 간 의존성 여러 개의 Task가 존재할 때 의존성을 결정하는 방법을 정리한다. set_downstream() / set_upstream() Task 간 의존성은 set_downstream 또는 set_upstream과 같은 함수로 나타낼 수 있다. set_downstream : Task 실행 후에 수행할 task를 설정한다. set_upstream : Task 실행 전에 수행할 task를 설정한다. 다음과 같이 두 개의 Task를 가진 DAG가 존재한다고 하자. # DAG 인스턴스화 dag = DAG( 'helloworld', default_args=default_args, description='print "hello, world!"', schedule_interval=timedelta(day..

Apache Airflow 2022.08.03

[Airflow] lockfile.AlreadyLocked: /home/airflow/airflow/airflow-scheduler.pid is already locked

현상 에어플로우 웹 서버나 스케쥴러를 백그라운드로 실행했다. airflow scheduler -D airflow webserver --port 8081 -D 하지만 웹에서 접속이 되지 않아 확인했더니 스케쥴러의 프로세스가 존재하지 않았다. 원인 웹 서버와 스케쥴러의 에러 로그를 확인해보니 두 요소 모두 아래와 같은 FileExistsError, lockfile.AlreadyLocked가 발생하고 있었다. $ view airflow/airflow-webserver.err Traceback (most recent call last): File "/home/airflow1/venv/airflow/lib/python3.8/site-packages/lockfile/pidlockfile.py", line 77, i..

[Airflow] DAG 테스트

개요 2022.07.25 - [Airflow] DAG 생성 에서 생성한 DAG가 정상적으로 동작할지 간단히 확인해보자. 아래와 같은 방식으로 테스트를 수행하면 기본적인 에러 여부를 확인할 수 있다. 참고로 명령어는 1.10 버전 기준이다. python 실행 테스트 python 명령으로 DAG 정의 파일을 실행해본다. python dag_helloworld.py 발생한 예외가 없으면 정상적인 상태이다. 메타데이터 검사 DAG 정의 파일의 유효성을 추가로 검증해본다. DAG 인식 Airflow에서 DAG를 정상적으로 인식하는지 확인한다. airflow dags list Task 인식 Airflow에서 DAG의 Task를 정상적으로 인식하는지 확인한다. airflow list_tasks DAG_ID # 예시 ..

Apache Airflow 2022.07.29

[Airflow] DAG 생성

DAG Airflow의 핵심 개념으로, 여러 Task와 각 Task가 어떻게 실행되어야 하는지를 정의하는 종속성과 관계로 구성된다. DAG는 사진의 Task a, b, c, d를 정의하고 실행 순서, 그리고 의존성 등을 지정한다. 또한 DAG가 얼마나 자주 실행되어야 하는지를 지정해야 한다. Airflow의 Python 스크립트는 이러한 DAG의 구조를 코드로 지정하는 설정 파일이다. DAG 정의 파일은 실제 데이터 처리를 수행하지 않으며, 단순히 DAG 객체만을 정의한다. 이 글에서는 DAG 정의 방법을 정리해둔다. DAG 관리 디렉터리 생성 Airflow는 DAG를 관리하는 디렉터리를 지정하여 관리한다. 해당 설정은 airflow.cfg에서 설정할 수 있다. 해당 경로에 맞게 Airflow 홈 디렉터..

Apache Airflow 2022.07.25

[Apache Airflow] 디렉터리 구조

디렉터리 구조 Airflow를 설치하면 Airflow 홈 디렉터리에 아래와 같은 기본 파일이 생성된다. 1.10과 2.3 모두 존재하는 공통 파일부터 서로 존재하지 않는 파일 모두 간단히 정리한다. airflow.cfg Airflow 설정 파일 Airflow DAG 경로, 로그 수준 등 실행에 필요한 정보를 저장한다. airflow.db SQLite DB file 어떤 DAG가 존재하고 어떤 태스크로 구성되는지, 어떤 태스크가 실행 중이고 실행 가능한 상태인지 등의 메타데이터가 저장되는 데이터베이스 파일 logs Airflow의 로그 디렉터리 다만 웹 서버나 스케쥴러를 백그라운드로 실행했을 때는 airflow 홈 디렉터리에 바로 로그가 쌓이는 것 같다. unittest.cfg (1.10 버전) 실제 ai..

Apache Airflow 2022.07.24

[Apache Airflow] 설치 / 실행 (버전 1.10)

Apache Airflow 설치 2022.07.22 - [Apache Airflow] 설치/실행 (2. 3 버전) 에서 2.3 버전 설치 / 실행 방법을 정리했는데, 생각해보니 1 버전이 필요할 것 같다. 따라서 1버전 설치 / 실행 방법도 정리한다. 방법 자체가 크게 달라지진 않는다. 테스트 목적이므로 마찬가지로 단일 로컬 서버에 설치해보고자 한다. 사용한 OS는 Ubuntu 20.04 arm64이며, Python 버전은 3.8이다. 또한 Apache Airflow 1.10.15 버전을 설치할 것이다. 로컬에서 설치 Airflow 공식 사이트의 로컬에서 Airflow 설치 문서를 따른다. Airflow 설치에는 Python 3이 필요하고, pip를 통한 설치를 지원하고 있다. 1. (선택) python..

Apache Airflow 2022.07.23

[Apache Airflow] 설치 / 실행 (버전 2.3)

Apache Airflow 설치 2022.07.18 - Apache Airflow란? 에서 Apache Airflow에 대해서 정리해보았으므로, 이 글에서는 설치 방법을 정리한다. 테스트 목적이므로 단일 로컬 서버에 설치해보고자 한다. 사용한 OS는 Ubuntu 20.04 arm64이며, Python 버전은 3.8이다. 또한 Apache Airflow 2.3 버전을 설치할 것이다. 로컬에서 설치 Airflow 공식 사이트의 로컬에서 Airflow 설치 문서를 따른다. Airflow 설치에는 Python 3이 필요하고, pip를 통한 설치를 지원하고 있다. 1. (선택) python 가상 환경 생성 기존 python 환경에 영향이 없도록 별도의 가상 환경을 생성하여 사용할 것이다. 이 부분은 선택 사항이다..

Apache Airflow 2022.07.22
1 ··· 4 5 6 7 8