Apache Airflow 71

[Airflow] PythonSensor 사용하기

개요 2024.04.05-[Airflow] ExternalTaskSensor - 다른 dag/task 작업 완료 대기에서 다른 Dag나 Task의 실행을 센서링했는데, 여러 Dag나 Task의 실행을 센서링하기엔 적합하지 않다는 생각이 들었다. 이유는 여러 가지가 있는데, ExternalTaskSensor는 한 번에 하나의 Task나 Dag만을 센터링할 수 있기 때문에 여러 Dag나 Task를 센서링하기 위해서는 여러 개의 Sensor를 생성해서 사용해야 한다. 또, Sensor 간 종속성을 설정하는 경우가 아니라면 여러 Dag나 Task 중 실패가 발생했을 때 센서링을 중단할 방법이 없어 보였다. 이런 이유로 2024.04.14-[Airflow] 센서(Sensor) 란에서 살펴본 여러 Sensor 중 ..

Apache Airflow 2024.05.10

[Airflow] 센서(Sensor) 란

개요 2024.04.05-[Airflow] ExternalTaskSensor - 다른 dag/task 작업 완료 대기에서 다른 Dag나 Task의 실행을 대기하는 Sensor를 살펴보았는데, 확인해 보니 Airflow에서는 다양한 종류의 작업에 대한 Sensor를 구현할 수 있도록 Sensor 라이브러리를 제공하고 있었다. 이번 글에서는 Airflow에서의 Sensor 개념과 지원하고 있는 Sensor에 대해서 간단히 정리해두려고 한다. Sensor Airflow에서 Sensor란 현실 세계에서의 센서와 동일하게 어떠한 일이 발생할 떄까지 기다리도록 설계된 오퍼레이터이다. Sensor가 실행되면 특정 조건이 충족되는지 주기마다 확인하고, 조건이 충족되면 Task를 성공 상태로 표시하고 다음 작업을 실행되..

Apache Airflow 2024.05.06

[Airflow] airflow-code-editor - 웹 서버 내 코드 편집기 사용

개요 Airflow는 Python으로 작성된 Dag 정의 파일을 파싱 하여 TaskFlow를 생성한다. 다만 Dag 정의 파일을 편집하기 위해서는 Airflow 서버에 직접 접속해야 하는데, 이로 인해 dag 테스트하기가 번거로운 부분이 있었다. 그러나 최근 웹 서버에 코드 편집기 기능을 추가해주는 airflow-code-editor라는 플러그인이 존재한다는 것을 확인했다. 설치하고 사용하는 방법을 간단히 적어두려고 한다. 참고로 airflow >=1.10.3 , 그리고 git >=2.0을 요구한다. airflow-code-editor 웹 서버에서 Dag 정의 파일을 편집할 수 있는 Airflow 플러그인이다. 지정된 디렉터리 내 파일 관리 인터페이스를 제공하고, 파일을 편집하거나 업로드, 다운로드할 수..

Apache Airflow 2024.05.01

[Airflow] ExternalTaskSensor - 다른 dag/task 작업 완료 대기

개요 Airflow에서는 여러 작업 간의 종속성을 설정하여 Task Flow을 구성할 수 있는데, 종속성을 설정할 작업이 서로 다른 Dag에 속해있는 경우가 있을 수 있다. 이런 경우 Airflow에서 제공하는 Sensor Operator인 ExternalTaskSensor의 사용을 고려해 볼 수 있다. 이 글에서는 ExternalTaskSensor의 사용법 정도를 간단히 적어둔다. ExternalTaskSensor ExternalTaskSensor는 특정 logical_date(execution_date)에 대한 다른 dag, task group, task가 완료되기를 기다리는 Operator이다. airflow.sensors.external_task.ExternalTaskSensor(*, extern..

Apache Airflow 2024.04.30

[기록] 데이터 재처리/복구용 Airflow Dag 개발

개요Airflow에는 데이터를 재처리할 수 있는 방법으로 clear, backfill 등의 기능을 제공하고 있다. 다만 단순 clear로 재처리하는 경우에는 이전 시점의 데이터를 처리하기 어렵고, backfill은 forground로 실행되며 로그로 모니터링을 하기 위해서는 리다이렉션 등을 통해 별도로 출력을 저장해야 한다. 또 업무에서 사용하고 있는 airflow 환경에서 backfill 명령이 동작하지 않는다는 치명적인 문제도 있었다…….이런저런 이유로 업무에서 사용하고 있는 airflow 환경에서는 recovery_dag(데이터 재처리를 위한 dag)를 생성하는 dag 정의 파일을 개발하여 사용하고 있었는데 몇 가지 문제점이 발견되었다.어떤 문제점이 있는지, 그리고 어떻게 개선했는지를 정리해 둔다...

[Airflow] airflow_db_cleanup Dag 사용하기

개요 2023.08.08 - [Airflow] DB Clean에서 Airflow Meta DB를 관리하는 두 가지 방법을 소개했었다. airflow_db_cleanup Dag 사용 airflow db clean 명령 사용 기존 글에서는 airflow 명령어를 사용한 방법에 초점을 맞췄는데, 이번 글에서는 airflow_db_cleanup Dag를 사용한 방법에 초점을 맞춰서 정리해보려고 한다. airflow_db_cleanup airflow_db_cleanup Dag는 https://github.com/teamclairvoyant/airflow-maintenance-dags 에서 개발되었다. 하지만 저장소 이력을 보면 전혀 관리가 되어 있지 않은 상태로 보이는데, 좀 더 찾아보니 GCP의 https://..

Apache Airflow 2024.04.19

[Airflow] DagFileProcessor - DAG 파일 처리

개요 Airflow 스케쥴러의 DagFileProcessor는 DAG 정의 파일을 처리해 JSON 형식으로 직렬화한 후, Airflow MetaData DB SerializedDagModel에 저장한다. 이 작업은 일정 주기마다 반복적으로 수행되는 것으로 보이는데, 최근에 관련 설정을 확인하게 되어서 기록할 겸 DAG 파일 처리에 대해서 정리한다. DAG 파일 처리 DAG 파일 처리는 dag_folder에 저장된 Dag 정의 파일을 DAG 객체로 변환하는 작업으로, 아래의 두 개 프로세스가 동작하여 수행한다. - DagFileProcessorManager : 무한 루프로 실행되는 프로세스로, 처리해야 할 파일을 결정한다. - DagFileProcessorProcess : 개별 파일을 DAG 객체로 변환한..

Apache Airflow 2024.04.18

[Airflow] 웹에서 여러 Task를 한 번에 클리어하기

개요 Airflow를 사용하다보면 여러 Task 인스턴스를 클리어하거나 상태를 변경하는 등의 작업을 할 때가 있다. 이 때 Task 인스턴스가 적으면 하나씩 처리하면 되지만, 많은 경우에는 한 번에 처리하고 싶기 마련이다. 물론 Airflow 서버에서 CLI로 처리하면 되는데 (참고 : 2022.11.15 - [Airflow] Clear - Task 재실행), 최근에 웹에서도 처리할 수 있다는 걸 새로 알게 되어 적어둔다. 방법 1. 상단 메뉴 > Browse > Task Instances 이동 2. 처리할 Task 인스턴스 검색 및 선택 검색 기능을 이용해 처리할 Task를 찾은 뒤, 처리할 Task 인스턴스를 선택한다. 3. Actions에서 원하는 작업 선택 Task 인스턴스를 클리어하거나 상태를 ..

Apache Airflow 2024.04.11

[Airflow/Celery Flower] worker autoscale 조정

개요 2023.07.19 - [Airflow] Concurrency 설정에서 Airflow, Celery Worker의 동시성에 대한 설정들을 확인해 보았다. 이 글에서는 Celery Flower에서 관련 설정을 확인하고 조정하는 방법을 적어둔다. 설정 확인 Celery Worker의 Concurrency, Autoscale과 같은 정보는 Pool 탭에서 확인할 수 있다. worker autoscale 설정 조정 Pool 탭 오른편에서 볼 수 있는 Pool size control의 Min/Max autoscale로 worker autoscale 설정을 조정할 수 있다. 원하는 설정을 입력 후 Apply 버튼을 누르면, 상단에 반영 성공이라는 메세지를 확인할 수 있다. 그리고 화면을 새로고침 하면 변경된 ..

Apache Airflow 2024.01.12

[Airflow] Dag 직렬화 (Serialization)

직렬화 직렬화(Serialization)란 데이터 구조나 객체 상태를 나중에 재구성할 수 있는 형식으로 변환하는 과정을 말한다. 다르게 말하면 객체를 바이트 스트림으로 인코딩하는 과정으로, 직렬화된 데이터는 동일하거나 다른 환경에 저장될 수 있다. 반대로 직렬화된 데이터를 개체나 데이터 구조로 다시 재구성하는 과정, 인코딩 되어있는 바이트 스트림으로부터 객체를 복원하는 과정을 역직렬화(Deserialization)라고 한다. Airflow 1 DAG 프로세싱 Airflow는 DAG_FOLDER 내 DAG 정의 파일을 읽어 들이고 처리한다. Airflow 1에서는 DAG를 스케쥴링하고 UI로 표시하기 위해 스케쥴러와 웹 서버 모두 DAG 정의 파일에 접근해 처리해야 했다. 웹 서버를 시작/재시작하는 경우에..

Apache Airflow 2023.10.27