airflow 75

[Airflow] Params - 런타임 매개 변수

개요2025.01.23-[Airflow] 2.5.1 -> 2.8.2 업그레이드 후 Trigger DAG w/ config 미노출 현상에서 Dag의 매개변수를 전달하는 새로운 방법이 있다는 것을 알게 되었다. 해당 글에서는 기존 매개변수 전달 방식을 유지하기 위한 방법에 초점을 맞춘 글을 작성했는데, 이번에는 새로운 매개변수 전달 및 사용 방법에 대해서 알아보려고 한다.  ParamsAirflow Params는 Dag에 런타임 구성을 제공하여, Dag와 Task에 값을 전달하는 방법이다. Dag 정의 코드에서 구성하면 Dag를 트리거할 때 추가로 매개변수를 전달하거나 기존에 설정되어 있던 값을 덮어쓴다. 또한 Trigger UI Form을 렌더링 하는 데 사용한다. Dag 수준 매개변수Task에 전달되는 ..

Apache Airflow 2025.02.11

[Airflow] 2.5.1 -> 2.8.2 업그레이드 후 Trigger DAG w/ config 미노출 현상

개요Airflow 버전 2.5.1에서 취약점이 발견되어 2.8.2 버전으로 업그레이드를 진행했다. 진행 후에 Airflow WebServer에서 Dag Trigger 시 Trigger DAG w/ config 버튼이 노출되지 않는 것을 확인했다. 기존 (2.5.1) 현재 (2.8.2) 원인을 파악하고 수정해 보자.  원인 - Trigger UI Form이 현상의 원인은 Airflow 2.6.0에서 도입된 Trigger UI Form 기능과 관련이 있다.Airflow 2.6.0에서는 Dag 수준에서 params 매개변수가 정의되어 있으면 사용자 친화적인 트리거 양식을 렌더링 해준다. 트리거 양식은 Trigger DAG 버튼을 누르면 제공된다.렌더링 되는 트리거 양식은 DAG 객체의 params로 전달된 값..

Apache Airflow 2025.02.10

[Airflow] Dag Import Error 확인하기

개요Airflow를 사용하다 보면 Web UI에서 다음과 같은 Import 에러가 발생하는 모습을 확인할 수 있다.Import Errors가 발생하면 Dag 활성화 자체가 되지 않기 때문에 주기적으로 동작 확인이 필요한 경우에는 별도로 모니터링이 필요해 보였다. 관련하여 알람 등을 구성하기 위해 프로그래밍적인 방법으로 확인할 수 있는 방법이 있는지 확인해본다.   import_error 테이블잠깐 Airflow Meta DB 스키마를 확인해 봤더니 바로 import_error 테이블이 존재하는 모습을 확인할 수 있다.-- airflow.import_error definitionCREATE TABLE `import_error` ( `id` int NOT NULL AUTO_INCREMENT, `times..

Apache Airflow 2024.12.13

[Airflow] Web의 Auto-refresh 기능

개요Airflow WebServer는 Dag의 상태를 일정 주기마다 새로 고침하는 auto-refesh 기능을 제공한다.다만 이 기능은 Dag가 너무 많거나, Dag의 Task가 많으면 너무 많은 데이터를 너무 잦게 데이터베이스에서 조회하게 되어 시스템 부하를 줄 수도 있는 것 같다. 때문에 기본적으로 비활성화 상태를 기본으로 둘 수 있는 방법을 찾았는데, 그런 기능은 지원하지 않는 것으로 보인다.이번 글에서는 Auto-refesh 관련 설정에 대해서만 간단히 짚고 넘어가려고 한다.  관련 설정auto_refresh_intervalAuto-refesh 기능이 켜져있을 때 Dag 데이터가 자동으로 새로고침되는 빈도에 해당된다.airflow.cfg나 환경변수로 설정할 수 있는데 Webserver 관련 설정값..

Apache Airflow 2024.10.16

[Airflow] weight_rule - Task 우선순위 결정

개요Airflow에서 많은 Dag, 많은 Task를 실행할 때 서로 다른 Dag 간의 Task 간의 실행 순서를 결정하기 위해 우선순위를 가질 수 있다. 이번 글에서는 Airflow가 Task의 실행 순서를 결정하는 규칙에 대해서 먼저 알아본다.  priority_weightAirflow는 Executor에서의 task 실행 순서를 결정하기 위해 priority_weight과 weight_rule이라는 두 가지 개념을 사용한다.그중 priority_weight은 Executor의 큐에서의 우선순위를 정의한다. 기본값은 1이며, 각 task는 weight_rule에 의해 계산된 유효한 priority_weight에 따라 실행 순서, 우선순위가 결정된다.Task의 priority_weight 값이 높을 수록..

Apache Airflow 2024.08.23

[Airflow] 버전 다운그레이드

개요업무 환경에서 Airflow 2.5.1 버전을 사용하고 있었는데 보안적인 문제로 인해 2.8.2 버전으로 업그레이드를 진행하고 있다 그런데 막상 테스트로 업그레이드를 해보니 이슈가 조금 있어서…… 다시 Airflow 2.5.1 버전으로 다운그레이드하고 싶다.2023.03.28-[Airflow] 버전 업그레이드에서는 업그레이드 방법을 정리해 두었으니, 이번 글에서는 다운그레이드 진행 방법을 적어둔다.  방법1. Airflow 서비스 중지 작업을 수행하기 전에 Airflow 서비스를 전부 중지한다. Webserver, Scheduler와 CeleryExecutor를 사용하고 있다면 Flower와 Celery도 중지한다. 추가로 필요하다면 데이터베이스나 설정 파일도 백업해 두도록 한다. 2. Airflow..

Apache Airflow 2024.08.14

[Flower] FLOWER_UNAUTHENTICATED_API environment variable is required to enable API without authentication

현상최근 CeleryExecutor를 사용하는 Airflow 버전을 2.5.1에서 2.8.2로 업그레이드를 진행하면서 영향도를 확인하고 있는데, Flower의 업그레이드된 버전에 변경점이 있었는지(확인해 보니 1.2에서 2.0으로 업그레이드되어있었다) Flower를 통한 worker 조작 시 다음과 같은 에러가 발생하면서 설정 변경 등이 적용되지 않았다.문제를 해결해보자.   원인Flower 1.0과 다르게 2.0부터는 보안 상의 이유로 인증, 즉 로그인이 활성화되지 않은 경우에는 기본적으로 API의 사용이 비활성화된다. 따라서 가급적이면 로그인 인증 설정을 활성화하거나 별도의 설정을 통해 로그인 인증 없이 API 사용을 허용해야 한다.   해결이 글에서는 로그인 인증 없이도 API를 사용할 수 있도록 ..

Apache Airflow 2024.08.12

[Airflow] TriggerDagRunOperator - 다른 Dag 트리거 하기

개요운영 중인 Airflow 환경에 아래와 같은 2개의 Dag가 동작 중이다.log_git_importer : 서버의 Git Working Directory 내 로그 파일을 원격 저장소로 Push 한다. 매 시 7분에 동작한다.inf_monitor : 운영 시스템 내 환경에서 모니터링이 필요한 지표를 파일로 생성한다. 매 시 10분에 동작한다.그런데 어느 순간부터 inf_monitor가 [Errno 116] Stale file handle 오류와 함께 간헐적으로 실패하는 경우가 생기기 시작했다. 확인해 보니 log_git_importer가 동작하는 도중에 inf_monitor가 동작하는 것이 오류가 발생하는 원인으로 보였다. 동시에 두 Dag가 동작하는 상황을 피하기 위해 실행 주기를 조정해보려고 했는데..

Apache Airflow 2024.05.22

[Airflow] 사용자 정의 오퍼레이터 (Custom Operator)

개요Airflow에서는 오퍼레이터를 통해 여러 기능을 제공하고 있다. 만약 원하는 기능을 제공하는 오퍼레이터가 없다면 개발자가 직접 구현하여 사용할 수 있다.이 글에서는 사용자 정의 오퍼레이터를 구현하여 테스트해보려고 한다. 구현할 기능은 문자열을 입력받고, 문자열을 반환하는 정도로 한다.   사용자 정의 오퍼레이터사용자 정의 오퍼레이터를 만들기 위해서는 아래 조건을 만족해야 한다.BaseOperator 상속생성자 작성 : 오퍼레이터에 필요한 매개변수를 정의한다. execute 함수 작성 : Executor가 오퍼레이터를 호출할 때 실행할 코드를 작성한다.순서대로 차근차근 작성해 보도록 하겠다.  BaseOperator 상속상속받을 BaseOperator가 정의된 라..

Apache Airflow 2024.05.17

[Airflow] 함수 내에서 다른 Operator 실행하기(?)

개요Airflow Dag 정의 파일에서 정의된 Dag 인스턴스는 전역 변수여야 하는 것으로 인지하고 있었다. 그런데 최근에 업무에서 사용하고 있는 Airflow Dag 정의 파일을 살펴보다가, PythonOperator로 실행하는 함수 내에서 Dag와 Task를 생성하는 부분을 발견했다.확인해 보니 기능 동작은 하고 있는데…… 함수 내에서 정의한 Dag와 Task 모두 Airflow 시스템 내에서 확인이 되지 않는 것 같다. 개발자에게 의도를 물어보고 싶지만 퇴사를 하셨으므로, 무엇을 의도한 결과인지 추측해보려고 한다.  예시 코드아래 코드는 운영 중인 코드를 간단한 형태로 정리한 것이다.from airflow import DAGfrom airflow.operators.python i..

Apache Airflow 2024.05.15