airflow 72

[Airflow] Web의 Auto-refresh 기능

개요Airflow WebServer는 Dag의 상태를 일정 주기마다 새로 고침하는 auto-refesh 기능을 제공한다.다만 이 기능은 Dag가 너무 많거나, Dag의 Task가 많으면 너무 많은 데이터를 너무 잦게 데이터베이스에서 조회하게 되어 시스템 부하를 줄 수도 있는 것 같다. 때문에 기본적으로 비활성화 상태를 기본으로 둘 수 있는 방법을 찾았는데, 그런 기능은 지원하지 않는 것으로 보인다.이번 글에서는 Auto-refesh 관련 설정에 대해서만 간단히 짚고 넘어가려고 한다.  관련 설정auto_refresh_intervalAuto-refesh 기능이 켜져있을 때 Dag 데이터가 자동으로 새로고침되는 빈도에 해당된다.airflow.cfg나 환경변수로 설정할 수 있는데 Webserver 관련 설정값..

Apache Airflow 2024.10.16

[Airflow] weight_rule - Task 우선순위 결정

개요Airflow에서 많은 Dag, 많은 Task를 실행할 때 서로 다른 Dag 간의 Task 간의 실행 순서를 결정하기 위해 우선순위를 가질 수 있다. 이번 글에서는 Airflow가 Task의 실행 순서를 결정하는 규칙에 대해서 먼저 알아본다.  priority_weightAirflow는 Executor에서의 task 실행 순서를 결정하기 위해 priority_weight과 weight_rule이라는 두 가지 개념을 사용한다.그중 priority_weight은 Executor의 큐에서의 우선순위를 정의한다. 기본값은 1이며, 각 task는 weight_rule에 의해 계산된 유효한 priority_weight에 따라 실행 순서, 우선순위가 결정된다.Task의 priority_weight 값이 높을 수록..

Apache Airflow 2024.08.23

[Airflow] 버전 다운그레이드

개요업무 환경에서 Airflow 2.5.1 버전을 사용하고 있었는데 보안적인 문제로 인해 2.8.2 버전으로 업그레이드를 진행하고 있다 그런데 막상 테스트로 업그레이드를 해보니 이슈가 조금 있어서…… 다시 Airflow 2.5.1 버전으로 다운그레이드하고 싶다.2023.03.28-[Airflow] 버전 업그레이드에서는 업그레이드 방법을 정리해 두었으니, 이번 글에서는 다운그레이드 진행 방법을 적어둔다.  방법1. Airflow 서비스 중지 작업을 수행하기 전에 Airflow 서비스를 전부 중지한다. Webserver, Scheduler와 CeleryExecutor를 사용하고 있다면 Flower와 Celery도 중지한다. 추가로 필요하다면 데이터베이스나 설정 파일도 백업해 두도록 한다. 2. Airflow..

Apache Airflow 2024.08.14

[Flower] FLOWER_UNAUTHENTICATED_API environment variable is required to enable API without authentication

현상최근 CeleryExecutor를 사용하는 Airflow 버전을 2.5.1에서 2.8.2로 업그레이드를 진행하면서 영향도를 확인하고 있는데, Flower의 업그레이드된 버전에 변경점이 있었는지(확인해 보니 1.2에서 2.0으로 업그레이드되어있었다) Flower를 통한 worker 조작 시 다음과 같은 에러가 발생하면서 설정 변경 등이 적용되지 않았다.문제를 해결해보자.   원인Flower 1.0과 다르게 2.0부터는 보안 상의 이유로 인증, 즉 로그인이 활성화되지 않은 경우에는 기본적으로 API의 사용이 비활성화된다. 따라서 가급적이면 로그인 인증 설정을 활성화하거나 별도의 설정을 통해 로그인 인증 없이 API 사용을 허용해야 한다.   해결이 글에서는 로그인 인증 없이도 API를 사용할 수 있도록 ..

Apache Airflow 2024.08.12

[Airflow] TriggerDagRunOperator - 다른 Dag 트리거 하기

개요운영 중인 Airflow 환경에 아래와 같은 2개의 Dag가 동작 중이다.log_git_importer : 서버의 Git Working Directory 내 로그 파일을 원격 저장소로 Push 한다. 매 시 7분에 동작한다.inf_monitor : 운영 시스템 내 환경에서 모니터링이 필요한 지표를 파일로 생성한다. 매 시 10분에 동작한다.그런데 어느 순간부터 inf_monitor가 [Errno 116] Stale file handle 오류와 함께 간헐적으로 실패하는 경우가 생기기 시작했다. 확인해 보니 log_git_importer가 동작하는 도중에 inf_monitor가 동작하는 것이 오류가 발생하는 원인으로 보였다. 동시에 두 Dag가 동작하는 상황을 피하기 위해 실행 주기를 조정해보려고 했는데..

Apache Airflow 2024.05.22

[Airflow] 사용자 정의 오퍼레이터 (Custom Operator)

개요Airflow에서는 오퍼레이터를 통해 여러 기능을 제공하고 있다. 만약 원하는 기능을 제공하는 오퍼레이터가 없다면 개발자가 직접 구현하여 사용할 수 있다.이 글에서는 사용자 정의 오퍼레이터를 구현하여 테스트해보려고 한다. 구현할 기능은 문자열을 입력받고, 문자열을 반환하는 정도로 한다.   사용자 정의 오퍼레이터사용자 정의 오퍼레이터를 만들기 위해서는 아래 조건을 만족해야 한다.BaseOperator 상속생성자 작성 : 오퍼레이터에 필요한 매개변수를 정의한다. execute 함수 작성 : Executor가 오퍼레이터를 호출할 때 실행할 코드를 작성한다.순서대로 차근차근 작성해 보도록 하겠다.  BaseOperator 상속상속받을 BaseOperator가 정의된 라..

Apache Airflow 2024.05.17

[Airflow] 함수 내에서 다른 Operator 실행하기(?)

개요Airflow Dag 정의 파일에서 정의된 Dag 인스턴스는 전역 변수여야 하는 것으로 인지하고 있었다. 그런데 최근에 업무에서 사용하고 있는 Airflow Dag 정의 파일을 살펴보다가, PythonOperator로 실행하는 함수 내에서 Dag와 Task를 생성하는 부분을 발견했다.확인해 보니 기능 동작은 하고 있는데…… 함수 내에서 정의한 Dag와 Task 모두 Airflow 시스템 내에서 확인이 되지 않는 것 같다. 개발자에게 의도를 물어보고 싶지만 퇴사를 하셨으므로, 무엇을 의도한 결과인지 추측해보려고 한다.  예시 코드아래 코드는 운영 중인 코드를 간단한 형태로 정리한 것이다.from airflow import DAGfrom airflow.operators.python i..

Apache Airflow 2024.05.15

[Airflow] PythonSensor 사용하기

개요2024.04.05-[Airflow] ExternalTaskSensor - 다른 dag/task 작업 완료 대기에서 다른 Dag나 Task의 실행을 센싱 했는데, 여러 Dag나 Task의 실행을 센싱 하기엔 적합하지 않다는 생각이 들었다.이유는 여러 가지가 있는데, ExternalTaskSensor는 한 번에 하나의 Task나 Dag만을 센싱 할 수 있기 때문에 여러 Dag나 Task를 센싱 하기 위해서는 여러 개의 Sensor를 생성해서 사용해야 한다. 또, Sensor 간 종속성을 설정하는 경우가 아니라면 여러 Dag나 Task 중 실패가 발생했을 때 센싱을 중단할 방법이 없어 보였다. 이런 이유로 2024.04.14-[Airflow] 센서(Sensor) 란에서 살펴본 여러 Sensor 중 Pyt..

Apache Airflow 2024.05.10

[Airflow] 센서(Sensor) 란

개요 2024.04.05-[Airflow] ExternalTaskSensor - 다른 dag/task 작업 완료 대기에서 다른 Dag나 Task의 실행을 대기하는 Sensor를 살펴보았는데, 확인해 보니 Airflow에서는 다양한 종류의 작업에 대한 Sensor를 구현할 수 있도록 Sensor 라이브러리를 제공하고 있었다. 이번 글에서는 Airflow에서의 Sensor 개념과 지원하고 있는 Sensor에 대해서 간단히 정리해두려고 한다. Sensor Airflow에서 Sensor란 현실 세계에서의 센서와 동일하게 어떠한 일이 발생할 떄까지 기다리도록 설계된 오퍼레이터이다. Sensor가 실행되면 특정 조건이 충족되는지 주기마다 확인하고, 조건이 충족되면 Task를 성공 상태로 표시하고 다음 작업을 실행되..

Apache Airflow 2024.05.06

[Airflow] airflow-code-editor - 웹 서버 내 코드 편집기 사용

개요 Airflow는 Python으로 작성된 Dag 정의 파일을 파싱 하여 TaskFlow를 생성한다. 다만 Dag 정의 파일을 편집하기 위해서는 Airflow 서버에 직접 접속해야 하는데, 이로 인해 dag 테스트하기가 번거로운 부분이 있었다. 그러나 최근 웹 서버에 코드 편집기 기능을 추가해주는 airflow-code-editor라는 플러그인이 존재한다는 것을 확인했다. 설치하고 사용하는 방법을 간단히 적어두려고 한다. 참고로 airflow >=1.10.3 , 그리고 git >=2.0을 요구한다. airflow-code-editor 웹 서버에서 Dag 정의 파일을 편집할 수 있는 Airflow 플러그인이다. 지정된 디렉터리 내 파일 관리 인터페이스를 제공하고, 파일을 편집하거나 업로드, 다운로드할 수..

Apache Airflow 2024.05.01