Apache Airflow 82

[Airflow] Dag 직렬화 (Serialization)

직렬화 직렬화(Serialization)란 데이터 구조나 객체 상태를 나중에 재구성할 수 있는 형식으로 변환하는 과정을 말한다. 다르게 말하면 객체를 바이트 스트림으로 인코딩하는 과정으로, 직렬화된 데이터는 동일하거나 다른 환경에 저장될 수 있다. 반대로 직렬화된 데이터를 개체나 데이터 구조로 다시 재구성하는 과정, 인코딩 되어있는 바이트 스트림으로부터 객체를 복원하는 과정을 역직렬화(Deserialization)라고 한다. Airflow 1 DAG 프로세싱 Airflow는 DAG_FOLDER 내 DAG 정의 파일을 읽어 들이고 처리한다. Airflow 1에서는 DAG를 스케쥴링하고 UI로 표시하기 위해 스케쥴러와 웹 서버 모두 DAG 정의 파일에 접근해 처리해야 했다. 웹 서버를 시작/재시작하는 경우에..

Apache Airflow 2023.10.27

[Airflow] celery control - Celery Worker 원격 제어

개요 일반적으로 Airflow Celery Worker는 워커 프로세스를 실행한 서버에서 제어할 수 있다. 하지만 필요에 따라 원격으로 제어해야 하는 경우가 생기는데, 이런 경우에는 Celery CLI를 이용할 수 있다. celery control celery CLI 중 control 명령은 worker를 원격 제어하는 기능을 제공한다. Celery Worker 가이드에서 확인할 수 있는 명령 대부분은 control 명령을 통해 원격으로 제어할 수 있는 것 같다. celery control [OPTIONS] {revoke|revoke_by_stamped_headers|terminate|rate_limit| time_limit|election|enable_events|disable_events|heartb..

Apache Airflow 2023.09.12

[Airflow] OS 업그레이드 작업 기록 - 서비스 이관

개요 2023.08.29 - [Airflow] OS 업그레이드 작업 기록 - 서비스 검증 환경 구성 고민 2023.09.04 - [Airflow] OS 업그레이드 작업 기록 - 서비스 검증 위 두 개 글에서 Airflow OS 업그레이드 작업 중 서비스 계정의 UID:GID 변경을 위한 검증 환경을 어떻게 구성할지, 그리고 서비스 검증 방법에 대해 고민하고 진행한 내용을 정리했다. 이 글은 서비스 검증까지 완료한 상태에서 서비스 이관을 위해 고민했던 점을 적어둔다. 이슈 사항 현재 구성도는 대략 아래 사진과 같다. 인스턴스에 _NEW가 붙은 리소스가 신규 OS 서버이다. 그리고 작업하고 있는 Airflow는 CeleryExecutor를 사용하는 만큼 Worker를 scalable 하게 사용한다. 보통 A..

[Airflow] Celery worker의 queue 삭제 시 프로세스 종료 여부

개요 서비스 운영을 하다 보면 예기치 않은 이유로 프로세스가 종료되는 경우가 있다. 그럴 때를 대비해서 프로세스의 실행 상태를 확인하고, 실행 상태가 아니면 프로세스를 시작하도록 동작하는 스크립트를 crontab에 등록해 두는데, 한 가지 확인해 볼 게 생겼다. Celery worker를 관리하고 모니터링할 수 있는 Airflow Celery Flower를 이용하면 Worker의 Task Queue를 삭제하거나 추가할 수 있다. 이 때 Worker의 Task Queue를 삭제하면 worker 프로세스가 종료되지는 않을까? 직접 실험해봤다. 실험 방법 1. Celery Flower Web UI 접속 Task Queue를 삭제할 Worker 서버를 선택한다. 2. Queues 탭으로 이동 3. 비활성화할 Q..

[Airflow] OS 업그레이드 작업 기록 - 서비스 검증

개요 2023.08.29 - [Airflow] OS 업그레이드 작업 기록 - 서비스 검증 환경 구성 고민에서는 OS 업그레이드 작업 중 서비스 계정의 UID:GID 변경을 위한 검증 환경을 어떻게 구성할 지에 대해 고민했었다. 이 글에서는 구성은 끝났고, 어떻게 서비스를 검증했는지, 검증 작업 수행 순서 등에 대해 적어둔다. 현재 구성도는 대략 아래 사진과 같다. 인스턴스에 _NEW가 붙은 리소스가 신규 OS 서버이다. 서비스 검증 방법 신규 OS를 사용하는 서버에 Airflow 설치를 완료하고 서비스가 동작하기 위한 조건을 모두 갖췄다는 전제하에 서비스 검증을 진행한다. 가급적 서비스 중단 없이 진행하고자 블루-그린 업데이트를 수동으로 수행하는 모습으로 진행을 해보았다. WebServer / Celer..

[Airflow] OS 업그레이드 작업 기록 - 서비스 검증 환경 구성 고민

개요 이 글은 업무 중 만난 Airflow 서버 OS 업그레이드 작업에 대해 기록하기 위한 글이다. 먼저 업무에서 사용하는 Airflow 환경은 다음과 같다. 1. 서버 - AWS EC2 - Master 2대 (Webserver, Scheduler, Celery Flower 동작) - Worker 2대 (Celery Worker 동작) - Ubuntu 18.04 LTS (amd) - 각 서버에 AWS EFS mount (DAG_FOLDER 동기화 및 airflow log/config 등 공유) 2. Airflow 설정 - Airflow HOME : /opt/airflow Airflow home 경로 자체는 root 영역인 /opt/airflow이고, 그 아래에 airflow.cfg, logs, dags ..

[Airflow] CeleryExecutor 사용

개요 2023.08.25 - [Python] Celery 란 이란 글에서 Celery의 개념에 대해서 조금 살펴보았다. 내 경우에는 Celery를 직접 다루는 건 아니지만 Airflow CeleryExecutor를 통해 사용하고 있다. 이 글에서는 Airflow에서 CeleryExecutor를 사용하기 위한 설정이나 사용 명령어 등을 간단히 살펴보려고 한다. CeleryExecutor를 사용하기 위한 RDB, Broker 등은 이미 준비되어 있다고 가정한다. 이 글에서는 MySQL, Redis를 사용하는 것으로 가정할 것이다. ❗ 직접 구성한 결과를 정리한 게 아니기 때문에 정확하지 않은 정보가 포함되어 있을 수 있다. 업무 중에는 Airflow 설치/설정/시작 모두 스크립트를 이용해 직접 명령어를 실행..

Apache Airflow 2023.08.28

[Airflow] 스케쥴 설정이 있어도 스케쥴러에 의해 Dag가 실행되지 않는 경우

현상 3개월에 한 번, 한 달에 한 번 등 스케쥴 주기가 긴 Dag를 생성했다. 기본 Dag 매개변수는 아래와 같아서 스케쥴 시간이 되도 자동으로 실행되지 않아 처음에는 트리거하여 실행했었다. DEFAULT_DAG_ARGS = { 'depends_on_past': False, 'retries': 0, 'retry_delay': timedelta(seconds=20), 'provide_context': True, 'start_date': (datetime.now() - timedelta(days=2)), 'on_failure_callback': webhook.airflow_failed_callback, 'queue': 'default' } 이후에 스케쥴러에 의해 자동으로 실행될 거라고 기대했는데, 매번 실..

[Airflow] DB Clean

개요 Airflow는 서비스 동작에 필요한 데이터를 MySQL이나 PostgreSQL과 같은 RDB에 저장한다. 서비스 시간이 길어질수록 dag_run, task_instance 등의 정보가 누적되면서 용량을 차지하는데 이러한 부분은 어떻게 관리해야 할까? 일반적으로 특정 보관주기를 정해두고 주기적으로 오래된 데이터를 삭제할 것이다. Airflow에서는 어떻게 관리하는지 적어둔다. 참고로 GCP에서는 30일 정도의 보관주기를 권하고 있고, 대개 log, task_instance, dag_run, xcom 테이블이 용량을 많이 차지한다고 한다. (참고 : Airflow 데이터베이스 삭제 ) teamclairvoyant - db-cleanup 보통 teamclairvoyant의 airflow-maintena..

Apache Airflow 2023.08.08

[Airflow] Concurrency 설정

개요 Airflow로 많은 Dag와 Task를 동시에 병렬로 처리할 때는 Concurrency와 같은 설정은 서버 스펙이나 다른 요소를 고려하여 조정할 필요가 있다. Airflow는 처리량을 조정하기 위해 몇 가지 설정을 제공하는데 이 글에서는 관련 설정에 대해 정리해두려고 한다. 시스템 수준 설정 parallelism Airflow 스케쥴러 당 동시에 실행할 수 있는 최대 Task Instance 수. 기본값은 32이다. 즉, Airflow 스케쥴러 하나당 최대 32개의 Task 실행을 관리할 수 있다. max_active_runs_per_dag 각 Dag 당 실행할 수 있는 최대 Dag_run 수. 기본값은 16이다. max_active_runs_per_dag 값이 Dag 수준 설정인 max_acti..

Apache Airflow 2023.07.19
1 2 3 4 5 6 ··· 9