Apache Airflow

[Airflow] CeleryExecutor 사용

비번변경 2023. 8. 28. 19:19

개요

2023.08.25 - [Python] Celery 란 이란 글에서 Celery의 개념에 대해서 조금 살펴보았다. 내 경우에는 Celery를 직접 다루는 건 아니지만 Airflow CeleryExecutor를 통해 사용하고 있다.

이 글에서는 Airflow에서 CeleryExecutor를 사용하기 위한 설정이나 사용 명령어 등을 간단히 살펴보려고 한다.

CeleryExecutor를 사용하기 위한 RDB, Broker 등은 이미 준비되어 있다고 가정한다. 이 글에서는 MySQL, Redis를 사용하는 것으로 가정할 것이다.

 

❗ 직접 구성한 결과를 정리한 게 아니기 때문에 정확하지 않은 정보가 포함되어 있을 수 있다.
업무 중에는 Airflow 설치/설정/시작 모두 스크립트를 이용해 직접 명령어를 실행할 일이 없기 때문이다.
이 글은 관련 스크립트나 서버 환경 설정 등을 살펴보고 정리한 내용이다.

 

 

설치

Airflow에서 CeleryExecutor를 사용하기 위해서는 Celery와 Celery 모니터링 도구인 Flower 설치가 필요하다.

pip 명령어로 직접 설치하거나 Airflow Celery 번들로 설치해도 된다.

pip install celery flower

# 또는
pip install 'apache-airflow[celery]' --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

 

 

airflow.cfg 설정

CeleryExecutor를 사용하기 위해서는 airflow 설정 변경이 필요하다. 기본적인 설정은 아래와 같은 부분을 살펴보면 될 것 같다.

 

core.executor

CeleryExecutor로 변경한다.

[core]
# The executor class that airflow should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
# full import path to the class when using a custom executor.
executor = CeleryExecutor

 

celery

[celery]
# Celery broker URL
broker_url = redis://redis:6379/0

# Celery가 작업을 완료하고 작업의 메타데이터를 업데이트하기 위해 사용하는 백엔드.
# 스케쥴러가 Task의 상태를 업데이트하는데 사용된다.
# Example: result_backend = db+postgresql://postgres:airflow@postgres/airflow
# result_backend =

# Celery Flower 실행 IP 정의
flower_host = 0.0.0.0

# Flower Root URL
flower_url_prefix =

# Celery Flower 실행 포트
flower_port = 5555

 

worker의 성능, 병렬성 등을 조정할 필요가 있다면 아래와 같은 부분을 봐야 한다.

# worker에서 사용할 동시성
worker_concurrency = 16

# worekr가 사용할 최대 및 최소 동시성
# 기본적으로 최소 프로세스 수를 유지하지만 필요한 경우 최대로 증가한다.
# 예시: worker_autoscale = 16,12
worker_autoscale = max_concurrency,min_concurrency

 

 

worker 실행

celery worker 명령으로 worker를 실행할 수 있다. task_queue의 기본값은 default이다.

airflow celery worker -q <TASK_QUEUE>

 

만약 Airflow Dag 정의 파일에서 task의 queue 값이 test라면 

t1 = BashOperator(
    task_id='echo_hello_world',
    bash_command='echo "Hello, World!"',
    queue="test",
    dag=dag,
)

task_queue가 default로 지정된 worker에는 스케쥴링되지 않는다. task_queue가 test로 지정된 worker에서만 실행된다.

 

 

worker 종료

worker를 종료할 때는 celery stop 명령을 사용할 수 있다.

airflow celery stop

하지만 한 번도 사용해 본 적 없다. worker를 종료할 때는 큐를 비활성화하고 프로세스를 kill 하는 방법을 더 많이 사용했다.

참고 : 2022.12.09 - [Airflow] CeleryExecutor - Worker 종료

 

 

Flower 실행

celery flower 명령으로 flower를 실행할 수 있다.

airflow celery flower -p <PORT>

 

 

참고 문서

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/celery.html

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#celery

728x90