Apache Airflow/삽질 9

[기록] 데이터 재처리/복구용 Airflow Dag 개발

개요Airflow에는 데이터를 재처리할 수 있는 방법으로 clear, backfill 등의 기능을 제공하고 있다. 다만 단순 clear로 재처리하는 경우에는 이전 시점의 데이터를 처리하기 어렵고, backfill은 forground로 실행되며 로그로 모니터링을 하기 위해서는 리다이렉션 등을 통해 별도로 출력을 저장해야 한다. 또 업무에서 사용하고 있는 airflow 환경에서 backfill 명령이 동작하지 않는다는 치명적인 문제도 있었다…….이런저런 이유로 업무에서 사용하고 있는 airflow 환경에서는 recovery_dag(데이터 재처리를 위한 dag)를 생성하는 dag 정의 파일을 개발하여 사용하고 있었는데 몇 가지 문제점이 발견되었다.어떤 문제점이 있는지, 그리고 어떻게 개선했는지를 정리해 둔다...

[Airflow] OS 업그레이드 작업 기록 - 서비스 이관

개요 2023.08.29 - [Airflow] OS 업그레이드 작업 기록 - 서비스 검증 환경 구성 고민 2023.09.04 - [Airflow] OS 업그레이드 작업 기록 - 서비스 검증 위 두 개 글에서 Airflow OS 업그레이드 작업 중 서비스 계정의 UID:GID 변경을 위한 검증 환경을 어떻게 구성할지, 그리고 서비스 검증 방법에 대해 고민하고 진행한 내용을 정리했다. 이 글은 서비스 검증까지 완료한 상태에서 서비스 이관을 위해 고민했던 점을 적어둔다. 이슈 사항 현재 구성도는 대략 아래 사진과 같다. 인스턴스에 _NEW가 붙은 리소스가 신규 OS 서버이다. 그리고 작업하고 있는 Airflow는 CeleryExecutor를 사용하는 만큼 Worker를 scalable 하게 사용한다. 보통 A..

[Airflow] Celery worker의 queue 삭제 시 프로세스 종료 여부

개요 서비스 운영을 하다 보면 예기치 않은 이유로 프로세스가 종료되는 경우가 있다. 그럴 때를 대비해서 프로세스의 실행 상태를 확인하고, 실행 상태가 아니면 프로세스를 시작하도록 동작하는 스크립트를 crontab에 등록해 두는데, 한 가지 확인해 볼 게 생겼다. Celery worker를 관리하고 모니터링할 수 있는 Airflow Celery Flower를 이용하면 Worker의 Task Queue를 삭제하거나 추가할 수 있다. 이 때 Worker의 Task Queue를 삭제하면 worker 프로세스가 종료되지는 않을까? 직접 실험해봤다. 실험 방법 1. Celery Flower Web UI 접속 Task Queue를 삭제할 Worker 서버를 선택한다. 2. Queues 탭으로 이동 3. 비활성화할 Q..

[Airflow] OS 업그레이드 작업 기록 - 서비스 검증

개요 2023.08.29 - [Airflow] OS 업그레이드 작업 기록 - 서비스 검증 환경 구성 고민에서는 OS 업그레이드 작업 중 서비스 계정의 UID:GID 변경을 위한 검증 환경을 어떻게 구성할 지에 대해 고민했었다. 이 글에서는 구성은 끝났고, 어떻게 서비스를 검증했는지, 검증 작업 수행 순서 등에 대해 적어둔다. 현재 구성도는 대략 아래 사진과 같다. 인스턴스에 _NEW가 붙은 리소스가 신규 OS 서버이다. 서비스 검증 방법 신규 OS를 사용하는 서버에 Airflow 설치를 완료하고 서비스가 동작하기 위한 조건을 모두 갖췄다는 전제하에 서비스 검증을 진행한다. 가급적 서비스 중단 없이 진행하고자 블루-그린 업데이트를 수동으로 수행하는 모습으로 진행을 해보았다. WebServer / Celer..

[Airflow] OS 업그레이드 작업 기록 - 서비스 검증 환경 구성 고민

개요 이 글은 업무 중 만난 Airflow 서버 OS 업그레이드 작업에 대해 기록하기 위한 글이다. 먼저 업무에서 사용하는 Airflow 환경은 다음과 같다. 1. 서버 - AWS EC2 - Master 2대 (Webserver, Scheduler, Celery Flower 동작) - Worker 2대 (Celery Worker 동작) - Ubuntu 18.04 LTS (amd) - 각 서버에 AWS EFS mount (DAG_FOLDER 동기화 및 airflow log/config 등 공유) 2. Airflow 설정 - Airflow HOME : /opt/airflow Airflow home 경로 자체는 root 영역인 /opt/airflow이고, 그 아래에 airflow.cfg, logs, dags ..

[Airflow] 스케쥴 설정이 있어도 스케쥴러에 의해 Dag가 실행되지 않는 경우

현상 3개월에 한 번, 한 달에 한 번 등 스케쥴 주기가 긴 Dag를 생성했다. 기본 Dag 매개변수는 아래와 같아서 스케쥴 시간이 되도 자동으로 실행되지 않아 처음에는 트리거하여 실행했었다. DEFAULT_DAG_ARGS = { 'depends_on_past': False, 'retries': 0, 'retry_delay': timedelta(seconds=20), 'provide_context': True, 'start_date': (datetime.now() - timedelta(days=2)), 'on_failure_callback': webhook.airflow_failed_callback, 'queue': 'default' } 이후에 스케쥴러에 의해 자동으로 실행될 거라고 기대했는데, 매번 실..

[Airflow] trigger 시도 시 DagRunAlreadyExists: A Dag Run already exists for dag id at {execution_date} with run id

개요 모든 Task에 대해 clear 처리한 DAG를 다시 실행시키고자 trigger 명령을 사용했다. airflow dags trigger \ -r scheduled__2022-11-12T00:00:00+00:00 \ test_dag 하지만 아래와 같은 DagRunAlreadyExists 에러가 발생하면서 트리거가 되지 않았다. Traceback (most recent call last): File "/opt/airflow/conda/bin/airflow", line 8, in sys.exit(main()) File "/opt/airflow/conda/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main args.func(args) Fi..

[Airflow] backfill - KeyError: TaskInstanceKey 에러 발생 시

현상 이전 시점의 데이터를 다시 처리해야 하는 일이 발생해서 2022.11.14 - [Airflow] backfill - 스케쥴 시점이 지나간 DAG 실행하기에서 알아봤던 backfill을 시도했다. 하지만 예상과 다르게 아래와 같이 KeyError: TaskInstanceKey 에러가 발생하며 backfill에 실패했다. Traceback (most recent call last): File "/opt/conda/bin/airflow", line 8, in sys.exit(main()) File "/opt/conda/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main args.func(args) File "/opt/conda/lib/p..

[Airflow] lockfile.AlreadyLocked: /home/airflow/airflow/airflow-scheduler.pid is already locked

현상 에어플로우 웹 서버나 스케쥴러를 백그라운드로 실행했다. airflow scheduler -D airflow webserver --port 8081 -D 하지만 웹에서 접속이 되지 않아 확인했더니 스케쥴러의 프로세스가 존재하지 않았다. 원인 웹 서버와 스케쥴러의 에러 로그를 확인해보니 두 요소 모두 아래와 같은 FileExistsError, lockfile.AlreadyLocked가 발생하고 있었다. $ view airflow/airflow-webserver.err Traceback (most recent call last): File "/home/airflow1/venv/airflow/lib/python3.8/site-packages/lockfile/pidlockfile.py", line 77, i..

1