airflow 65

[Airflow] CeleryExecutor 사용

개요 2023.08.25 - [Python] Celery 란 이란 글에서 Celery의 개념에 대해서 조금 살펴보았다. 내 경우에는 Celery를 직접 다루는 건 아니지만 Airflow CeleryExecutor를 통해 사용하고 있다. 이 글에서는 Airflow에서 CeleryExecutor를 사용하기 위한 설정이나 사용 명령어 등을 간단히 살펴보려고 한다. CeleryExecutor를 사용하기 위한 RDB, Broker 등은 이미 준비되어 있다고 가정한다. 이 글에서는 MySQL, Redis를 사용하는 것으로 가정할 것이다. ❗ 직접 구성한 결과를 정리한 게 아니기 때문에 정확하지 않은 정보가 포함되어 있을 수 있다. 업무 중에는 Airflow 설치/설정/시작 모두 스크립트를 이용해 직접 명령어를 실행..

Apache Airflow 2023.08.28

[Python] Celery 란

Celery 방대한 양의 메시지를 처리할 수 있는 분산 작업 큐(Distributed Task Queue). 분산 메시지 전달을 기반으로 하는 비동기 작업 큐라고 정의하기도 한다. Celery는 간단하고(simple), 유연하고(flexible), 안정적이며(reliable), Task 스케쥴링을 지원하되 실시간 처리에 중점을 두고 있다. Python 동시성 프로그래밍에서 많이 사용한다. Celery 자체는 Python으로 작성되어 있지만 프로토콜은 모든 언어로 구현할 수 있다. Task Queue 개념 스레드 또는 machine 간에 Task를 분산하는 메커니즘으로 사용된다. Task Queue는 Task라고 불리는 작업 단위를 입력으로 받는다. Celery의 클라이언트가 큐에 추가한 메시지는 brok..

Python 2023.08.25

[Airflow] 스케쥴 설정이 있어도 스케쥴러에 의해 Dag가 실행되지 않는 경우

현상 3개월에 한 번, 한 달에 한 번 등 스케쥴 주기가 긴 Dag를 생성했다. 기본 Dag 매개변수는 아래와 같아서 스케쥴 시간이 되도 자동으로 실행되지 않아 처음에는 트리거하여 실행했었다. DEFAULT_DAG_ARGS = { 'depends_on_past': False, 'retries': 0, 'retry_delay': timedelta(seconds=20), 'provide_context': True, 'start_date': (datetime.now() - timedelta(days=2)), 'on_failure_callback': webhook.airflow_failed_callback, 'queue': 'default' } 이후에 스케쥴러에 의해 자동으로 실행될 거라고 기대했는데, 매번 실..

[Airflow] DB Clean

개요 Airflow는 서비스 동작에 필요한 데이터를 MySQL이나 PostgreSQL과 같은 RDB에 저장한다. 서비스 시간이 길어질수록 dag_run, task_instance 등의 정보가 누적되면서 용량을 차지하는데 이러한 부분은 어떻게 관리해야 할까? 일반적으로 특정 보관주기를 정해두고 주기적으로 오래된 데이터를 삭제할 것이다. Airflow에서는 어떻게 관리하는지 적어둔다. 참고로 GCP에서는 30일 정도의 보관주기를 권하고 있고, 대개 log, task_instance, dag_run, xcom 테이블이 용량을 많이 차지한다고 한다. (참고 : Airflow 데이터베이스 삭제 ) teamclairvoyant - db-cleanup 보통 teamclairvoyant의 airflow-maintena..

Apache Airflow 2023.08.08

[Airflow] Concurrency 설정

개요 Airflow로 많은 Dag와 Task를 동시에 병렬로 처리할 때는 Concurrency와 같은 설정은 서버 스펙이나 다른 요소를 고려하여 조정할 필요가 있다. Airflow는 처리량을 조정하기 위해 몇 가지 설정을 제공하는데 이 글에서는 관련 설정에 대해 정리해두려고 한다. 시스템 수준 설정 parallelism Airflow 스케쥴러 당 동시에 실행할 수 있는 최대 Task Instance 수. 기본값은 32이다. 즉, Airflow 스케쥴러 하나당 최대 32개의 Task 실행을 관리할 수 있다. max_active_runs_per_dag 각 Dag 당 실행할 수 있는 최대 Dag_run 수. 기본값은 16이다. max_active_runs_per_dag 값이 Dag 수준 설정인 max_acti..

Apache Airflow 2023.07.19

[Airflow] DockerOperator - Bind Mount

개요 2023.07.11 - [Airflow] DockerOperator 에서 살펴본 DockerOperator는 스케쥴링된 작업을 수행할 때 미리 구성해 둔 도커 이미지로 컨테이너를 생성하고, 컨테이너에서 작업을 수행한다. 각 작업이 동작하는데 필요한 환경이 다른 경우 의존성 문제를 해소하기 위해 많이 사용하는 것 같다. 그렇다면 DockerOperator로 실행한 컨테이너에서 작업이 동작하기 위한 소스 코드 등은 어떻게 전달해야 할까? Volume VS Bind Mount 2023.07.12 - [Docker] Volume 2023.07.13 - [Docker] Bind Mount 위 두 개 글에서 컨테이너 내에서 생성한 데이터를 호스트에 유지하거나 호스트의 데이터를 컨테이너에 전달하는 방식인 Vol..

Apache Airflow 2023.07.14

[Airflow] DockerOperator

DockerOperator Airflow에서 작업을 도커 컨테이너 내에서 실행시켜야 한다면 DockerOperator 사용을 고려해 볼 수 있다. 스케쥴링될 작업을 수행할 때 미리 구성한 도커 이미지로 컨테이너를 생성한 후, 컨테이너 안에서 작업을 수행한다. 이 글에서는 간단히 DockerOperator를 이용한 작업을 수행해보려고 한다. 패키지 설치 Airflow에서 도커 관련 기능을 사용하기 위해서는 추가로 패키지를 설치해주어야 한다. pip install apache-airflow-providers-docker airflow 사용자에 도커 실행 권한 추가 airflow를 통해 도커 컨테이너를 실행시키기 위해서는 airflow를 실행하는 사용자에게 도커를 실행시킬 수 있는 권한이 부여되어 있어야 한다..

Apache Airflow 2023.07.11

[Airflow] Dynamic Task Mapping

Dynamic Task Mapping 기존에 Airflow는 DAG가 프로세싱되는 시점에만 Task를 동적으로 생성할 수 있었다. (참고 : 2023.04.12 - [Airflow] Task 반복 생성) 하지만 Airflow 2.3부터는 Dynamic Task Mapping 기능이 도입되면 Runtime에 Task를 동적으로 생성할 수 있게 되었다. 이 글에서는 Dynamic Task Mapping 기능에 대해 알아보려고 한다. Dynamic Task Dynamic Task Mapping 기능은 MapReduce 프로그래밍 모델을 기반으로 한다. Map은 데이터를 연산하기 좋은 형태로 변환하는 작업으로 일반적으로 Trasformation이라고 표현한다. Reduce는 Map을 통해 만들어진 데이터로 연산..

Apache Airflow 2023.06.14

[Airflow] Dag Processing 소요 시간 확인

개요 Airflow Scheduler는 주기적으로 DAG_FOLDER를 읽어 DAG 정의 파일을 읽어 들이고, DAG의 메타데이터를 Airflow meta DB에 저장한다. 스케쥴러와 관련된 프로세스를 찾으면 주기적으로 아래와 같은 DagFileProcessor가 동작하는 모습을 확인할 수 있다. ps -ef | grep scheduler 주기적으로 DAG 정의 파일을 읽어들이므로 DAG 정의 파일을 Processing 하는데 소요되는 시간도 Airflow 성능과 관련이 있다. 이 글에서는 Dag Processing 소요 시간 확인 방법을 적어둔다. DAG 프로세싱 로그 확인 DagFIle을 프로세싱하는 스케쥴러는 관련 정보를 로그로 남기는데, 그 위치는 다음과 같다. $AIRFLOW_HOME/logs/..

Apache Airflow 2023.06.09

[Airflow] @task_group으로 TaskGroup 정의

개요 2023.05.20 - [Airflow] TaskGroup에서 TaskGroup의 개념을 알아보고, 사용방법을 정리했다. 이 글에서는 TaskFlow API로, 즉 @task_group을 이용해 TaskGroup을 정의하는 방법을 정리한다. 예시 dag @task_group를 사용하지 않고 정의한 아래 dag를 데코레이터를 이용해 다시 작성한다. import datetime from airflow import DAG from airflow.utils.task_group import TaskGroup from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator dag = DAG( dag_..

Apache Airflow 2023.05.21
1 2 3 4 5 ··· 7