Apache Airflow 82

[Airflow] Dag Import Error 확인하기

개요Airflow를 사용하다 보면 Web UI에서 다음과 같은 Import 에러가 발생하는 모습을 확인할 수 있다.Import Errors가 발생하면 Dag 활성화 자체가 되지 않기 때문에 주기적으로 동작 확인이 필요한 경우에는 별도로 모니터링이 필요해 보였다. 관련하여 알람 등을 구성하기 위해 프로그래밍적인 방법으로 확인할 수 있는 방법이 있는지 확인해본다.   import_error 테이블잠깐 Airflow Meta DB 스키마를 확인해 봤더니 바로 import_error 테이블이 존재하는 모습을 확인할 수 있다.-- airflow.import_error definitionCREATE TABLE `import_error` ( `id` int NOT NULL AUTO_INCREMENT, `times..

Apache Airflow 2024.12.13

[Airflow] Web의 Auto-refresh 기능

개요Airflow WebServer는 Dag의 상태를 일정 주기마다 새로 고침하는 auto-refesh 기능을 제공한다.다만 이 기능은 Dag가 너무 많거나, Dag의 Task가 많으면 너무 많은 데이터를 너무 잦게 데이터베이스에서 조회하게 되어 시스템 부하를 줄 수도 있는 것 같다. 때문에 기본적으로 비활성화 상태를 기본으로 둘 수 있는 방법을 찾았는데, 그런 기능은 지원하지 않는 것으로 보인다.이번 글에서는 Auto-refesh 관련 설정에 대해서만 간단히 짚고 넘어가려고 한다.  관련 설정auto_refresh_intervalAuto-refesh 기능이 켜져있을 때 Dag 데이터가 자동으로 새로고침되는 빈도에 해당된다.airflow.cfg나 환경변수로 설정할 수 있는데 Webserver 관련 설정값..

Apache Airflow 2024.10.16

[Celery] inspect - Worker 설정값 확인

개요2024.09.26-[Celery/Flower] 설치 및 실행에서 기본적인 구성으로 Celery를 설치하고 실행해 보았는데, Celery의 현재 설정 등을 확인하는 방법을 알아두어야 할 것 같다.방법을 적어둔다.  CLIcelery cli의 inspect 명령은 Celery worker를 검사하는 기능을 제공한다. celery inspect [OPTIONS] {report|conf|query_task|clock|ping|stats|sched uled|reserved|active|revoked|registered|objgraph|memsamp le|memdump|active_queues} 그중에서도 설정값은 report나 con..

Apache Airflow 2024.10.15

[Celery/Flower] 설치 및 실행

개요업무 상 Airflow에서 CeleryExecutor를 사용하고 있어 이전에  2023.08.09-[Python] Celery 란 글로 간단한 개념을 알아보았었다. 하지만 막상 Celery를 들여다보는 상황이 되면 관련 개념 부족을 느끼곤 했다. 때문에 아주 간단하게 Celery를 직접 설치해 보고 실행해 보는 경험에 대한 필요성을 느껴 적어두려고 한다. 공식 문서의 튜토리얼을 참고하여 작성했고, Message Broker로는 Redis를 사용한다. 단순 테스트이기 때문에 환경은 killercoda를 사용한다.  Message Broker 설치Celery를 사용하기 위해서는 Worker 간 Task를 주고받는데 사용할 Message Broker가 필요하다.이 글에서는 redis를 사용한다고 명시했으므..

Apache Airflow 2024.09.26

[Python] Datetime - isoformat

개요최근 Airflow 2.5.1 버전에서 2.8.2 버전으로 업그레이드 테스트를 진행하고 있는데, Airflow의 Dag run context의 execution_date 값의 문자열 형식이 다음과 같이 바뀐 것 같다. 스케쥴에 의한 트리거매뉴얼 트리거2.5.12024-07-31T00:00:00+00:002023-05-10T01:51:17.686430+00:002.8.22024-07-31 00:00:00+00:002023-05-10 01:51:17.686430+00:00execution_date 값의 문자열 데이터를 파싱 하는 부분이 있어서... 변경된 형식을 기존 형식으로 변경하고 싶다.확인해 보니 기존 형식이 ISO 8601 형식인 것 같다. python에서 datetime 값을 ISO 8601 형..

Apache Airflow 2024.08.30

[Airflow] weight_rule - Task 우선순위 결정

개요Airflow에서 많은 Dag, 많은 Task를 실행할 때 서로 다른 Dag 간의 Task 간의 실행 순서를 결정하기 위해 우선순위를 가질 수 있다. 이번 글에서는 Airflow가 Task의 실행 순서를 결정하는 규칙에 대해서 먼저 알아본다.  priority_weightAirflow는 Executor에서의 task 실행 순서를 결정하기 위해 priority_weight과 weight_rule이라는 두 가지 개념을 사용한다.그중 priority_weight은 Executor의 큐에서의 우선순위를 정의한다. 기본값은 1이며, 각 task는 weight_rule에 의해 계산된 유효한 priority_weight에 따라 실행 순서, 우선순위가 결정된다.Task의 priority_weight 값이 높을 수록..

Apache Airflow 2024.08.23

[Airflow] 버전 다운그레이드

개요업무 환경에서 Airflow 2.5.1 버전을 사용하고 있었는데 보안적인 문제로 인해 2.8.2 버전으로 업그레이드를 진행하고 있다 그런데 막상 테스트로 업그레이드를 해보니 이슈가 조금 있어서…… 다시 Airflow 2.5.1 버전으로 다운그레이드하고 싶다.2023.03.28-[Airflow] 버전 업그레이드에서는 업그레이드 방법을 정리해 두었으니, 이번 글에서는 다운그레이드 진행 방법을 적어둔다.  방법1. Airflow 서비스 중지 작업을 수행하기 전에 Airflow 서비스를 전부 중지한다. Webserver, Scheduler와 CeleryExecutor를 사용하고 있다면 Flower와 Celery도 중지한다. 추가로 필요하다면 데이터베이스나 설정 파일도 백업해 두도록 한다. 2. Airflow..

Apache Airflow 2024.08.14

[Flower] FLOWER_UNAUTHENTICATED_API environment variable is required to enable API without authentication

현상최근 CeleryExecutor를 사용하는 Airflow 버전을 2.5.1에서 2.8.2로 업그레이드를 진행하면서 영향도를 확인하고 있는데, Flower의 업그레이드된 버전에 변경점이 있었는지(확인해 보니 1.2에서 2.0으로 업그레이드되어있었다) Flower를 통한 worker 조작 시 다음과 같은 에러가 발생하면서 설정 변경 등이 적용되지 않았다.문제를 해결해보자.   원인Flower 1.0과 다르게 2.0부터는 보안 상의 이유로 인증, 즉 로그인이 활성화되지 않은 경우에는 기본적으로 API의 사용이 비활성화된다. 따라서 가급적이면 로그인 인증 설정을 활성화하거나 별도의 설정을 통해 로그인 인증 없이 API 사용을 허용해야 한다.   해결이 글에서는 로그인 인증 없이도 API를 사용할 수 있도록 ..

Apache Airflow 2024.08.12

[Airflow] TriggerDagRunOperator - 다른 Dag 트리거 하기

개요운영 중인 Airflow 환경에 아래와 같은 2개의 Dag가 동작 중이다.log_git_importer : 서버의 Git Working Directory 내 로그 파일을 원격 저장소로 Push 한다. 매 시 7분에 동작한다.inf_monitor : 운영 시스템 내 환경에서 모니터링이 필요한 지표를 파일로 생성한다. 매 시 10분에 동작한다.그런데 어느 순간부터 inf_monitor가 [Errno 116] Stale file handle 오류와 함께 간헐적으로 실패하는 경우가 생기기 시작했다. 확인해 보니 log_git_importer가 동작하는 도중에 inf_monitor가 동작하는 것이 오류가 발생하는 원인으로 보였다. 동시에 두 Dag가 동작하는 상황을 피하기 위해 실행 주기를 조정해보려고 했는데..

Apache Airflow 2024.05.22

[Airflow] 사용자 정의 오퍼레이터 (Custom Operator)

개요Airflow에서는 오퍼레이터를 통해 여러 기능을 제공하고 있다. 만약 원하는 기능을 제공하는 오퍼레이터가 없다면 개발자가 직접 구현하여 사용할 수 있다.이 글에서는 사용자 정의 오퍼레이터를 구현하여 테스트해보려고 한다. 구현할 기능은 문자열을 입력받고, 문자열을 반환하는 정도로 한다.   사용자 정의 오퍼레이터사용자 정의 오퍼레이터를 만들기 위해서는 아래 조건을 만족해야 한다.BaseOperator 상속생성자 작성 : 오퍼레이터에 필요한 매개변수를 정의한다. execute 함수 작성 : Executor가 오퍼레이터를 호출할 때 실행할 코드를 작성한다.순서대로 차근차근 작성해 보도록 하겠다.  BaseOperator 상속상속받을 BaseOperator가 정의된 라..

Apache Airflow 2024.05.17