airflow 73

[Airflow] airflow-code-editor - 웹 서버 내 코드 편집기 사용

개요 Airflow는 Python으로 작성된 Dag 정의 파일을 파싱 하여 TaskFlow를 생성한다. 다만 Dag 정의 파일을 편집하기 위해서는 Airflow 서버에 직접 접속해야 하는데, 이로 인해 dag 테스트하기가 번거로운 부분이 있었다. 그러나 최근 웹 서버에 코드 편집기 기능을 추가해주는 airflow-code-editor라는 플러그인이 존재한다는 것을 확인했다. 설치하고 사용하는 방법을 간단히 적어두려고 한다. 참고로 airflow >=1.10.3 , 그리고 git >=2.0을 요구한다. airflow-code-editor 웹 서버에서 Dag 정의 파일을 편집할 수 있는 Airflow 플러그인이다. 지정된 디렉터리 내 파일 관리 인터페이스를 제공하고, 파일을 편집하거나 업로드, 다운로드할 수..

Apache Airflow 2024.05.01

[Airflow] ExternalTaskSensor - 다른 dag/task 작업 완료 대기

개요Airflow에서는 여러 작업 간의 종속성을 설정하여 Task Flow을 구성할 수 있는데, 종속성을 설정할 작업이 서로 다른 Dag에 속해있는 경우가 있을 수 있다.이런 경우 Airflow에서 제공하는 Sensor Operator인 ExternalTaskSensor의 사용을 고려해 볼 수 있다.이 글에서는 ExternalTaskSensor의 사용법 정도를 간단히 적어둔다.  ExternalTaskSensorExternalTaskSensor는 특정 logical_date(execution_date)에 대한 다른 dag, task group, task가 완료되기를 기다리는 Operator이다.airflow.sensors.external_task.ExternalTaskSensor(*, external_d..

Apache Airflow 2024.04.30

[Airflow] DagFileProcessor - DAG 파일 처리

개요 Airflow 스케쥴러의 DagFileProcessor는 DAG 정의 파일을 처리해 JSON 형식으로 직렬화한 후, Airflow MetaData DB SerializedDagModel에 저장한다. 이 작업은 일정 주기마다 반복적으로 수행되는 것으로 보이는데, 최근에 관련 설정을 확인하게 되어서 기록할 겸 DAG 파일 처리에 대해서 정리한다. DAG 파일 처리 DAG 파일 처리는 dag_folder에 저장된 Dag 정의 파일을 DAG 객체로 변환하는 작업으로, 아래의 두 개 프로세스가 동작하여 수행한다. - DagFileProcessorManager : 무한 루프로 실행되는 프로세스로, 처리해야 할 파일을 결정한다. - DagFileProcessorProcess : 개별 파일을 DAG 객체로 변환한..

Apache Airflow 2024.04.18

[Airflow] 웹에서 여러 Task를 한 번에 클리어하기

개요 Airflow를 사용하다보면 여러 Task 인스턴스를 클리어하거나 상태를 변경하는 등의 작업을 할 때가 있다. 이 때 Task 인스턴스가 적으면 하나씩 처리하면 되지만, 많은 경우에는 한 번에 처리하고 싶기 마련이다. 물론 Airflow 서버에서 CLI로 처리하면 되는데 (참고 : 2022.11.15 - [Airflow] Clear - Task 재실행), 최근에 웹에서도 처리할 수 있다는 걸 새로 알게 되어 적어둔다. 방법 1. 상단 메뉴 > Browse > Task Instances 이동 2. 처리할 Task 인스턴스 검색 및 선택 검색 기능을 이용해 처리할 Task를 찾은 뒤, 처리할 Task 인스턴스를 선택한다. 3. Actions에서 원하는 작업 선택 Task 인스턴스를 클리어하거나 상태를 ..

Apache Airflow 2024.04.11

[Airflow/Celery Flower] worker autoscale 조정

개요 2023.07.19 - [Airflow] Concurrency 설정에서 Airflow, Celery Worker의 동시성에 대한 설정들을 확인해 보았다. 이 글에서는 Celery Flower에서 관련 설정을 확인하고 조정하는 방법을 적어둔다. 설정 확인 Celery Worker의 Concurrency, Autoscale과 같은 정보는 Pool 탭에서 확인할 수 있다. worker autoscale 설정 조정 Pool 탭 오른편에서 볼 수 있는 Pool size control의 Min/Max autoscale로 worker autoscale 설정을 조정할 수 있다. 원하는 설정을 입력 후 Apply 버튼을 누르면, 상단에 반영 성공이라는 메세지를 확인할 수 있다. 그리고 화면을 새로고침 하면 변경된 ..

Apache Airflow 2024.01.12

[Airflow] celery control - Celery Worker 원격 제어

개요 일반적으로 Airflow Celery Worker는 워커 프로세스를 실행한 서버에서 제어할 수 있다. 하지만 필요에 따라 원격으로 제어해야 하는 경우가 생기는데, 이런 경우에는 Celery CLI를 이용할 수 있다. celery control celery CLI 중 control 명령은 worker를 원격 제어하는 기능을 제공한다. Celery Worker 가이드에서 확인할 수 있는 명령 대부분은 control 명령을 통해 원격으로 제어할 수 있는 것 같다. celery control [OPTIONS] {revoke|revoke_by_stamped_headers|terminate|rate_limit| time_limit|election|enable_events|disable_events|heartb..

Apache Airflow 2023.09.12

[Airflow] Celery worker의 queue 삭제 시 프로세스 종료 여부

개요 서비스 운영을 하다 보면 예기치 않은 이유로 프로세스가 종료되는 경우가 있다. 그럴 때를 대비해서 프로세스의 실행 상태를 확인하고, 실행 상태가 아니면 프로세스를 시작하도록 동작하는 스크립트를 crontab에 등록해 두는데, 한 가지 확인해 볼 게 생겼다. Celery worker를 관리하고 모니터링할 수 있는 Airflow Celery Flower를 이용하면 Worker의 Task Queue를 삭제하거나 추가할 수 있다. 이 때 Worker의 Task Queue를 삭제하면 worker 프로세스가 종료되지는 않을까? 직접 실험해봤다. 실험 방법 1. Celery Flower Web UI 접속 Task Queue를 삭제할 Worker 서버를 선택한다. 2. Queues 탭으로 이동 3. 비활성화할 Q..

[Airflow] OS 업그레이드 작업 기록 - 서비스 검증 환경 구성 고민

개요 이 글은 업무 중 만난 Airflow 서버 OS 업그레이드 작업에 대해 기록하기 위한 글이다. 먼저 업무에서 사용하는 Airflow 환경은 다음과 같다. 1. 서버 - AWS EC2 - Master 2대 (Webserver, Scheduler, Celery Flower 동작) - Worker 2대 (Celery Worker 동작) - Ubuntu 18.04 LTS (amd) - 각 서버에 AWS EFS mount (DAG_FOLDER 동기화 및 airflow log/config 등 공유) 2. Airflow 설정 - Airflow HOME : /opt/airflow Airflow home 경로 자체는 root 영역인 /opt/airflow이고, 그 아래에 airflow.cfg, logs, dags ..

[Airflow] CeleryExecutor 사용

개요 2023.08.25 - [Python] Celery 란 이란 글에서 Celery의 개념에 대해서 조금 살펴보았다. 내 경우에는 Celery를 직접 다루는 건 아니지만 Airflow CeleryExecutor를 통해 사용하고 있다. 이 글에서는 Airflow에서 CeleryExecutor를 사용하기 위한 설정이나 사용 명령어 등을 간단히 살펴보려고 한다. CeleryExecutor를 사용하기 위한 RDB, Broker 등은 이미 준비되어 있다고 가정한다. 이 글에서는 MySQL, Redis를 사용하는 것으로 가정할 것이다. ❗ 직접 구성한 결과를 정리한 게 아니기 때문에 정확하지 않은 정보가 포함되어 있을 수 있다. 업무 중에는 Airflow 설치/설정/시작 모두 스크립트를 이용해 직접 명령어를 실행..

Apache Airflow 2023.08.28

[Python] Celery 란

Celery 방대한 양의 메시지를 처리할 수 있는 분산 작업 큐(Distributed Task Queue). 분산 메시지 전달을 기반으로 하는 비동기 작업 큐라고 정의하기도 한다. Celery는 간단하고(simple), 유연하고(flexible), 안정적이며(reliable), Task 스케쥴링을 지원하되 실시간 처리에 중점을 두고 있다. Python 동시성 프로그래밍에서 많이 사용한다. Celery 자체는 Python으로 작성되어 있지만 프로토콜은 모든 언어로 구현할 수 있다. Task Queue 개념 스레드 또는 machine 간에 Task를 분산하는 메커니즘으로 사용된다. Task Queue는 Task라고 불리는 작업 단위를 입력으로 받는다. Celery의 클라이언트가 큐에 추가한 메시지는 brok..

Python 2023.08.25
1 2 3 4 5 ··· 8