Apache Airflow

[Airflow] Executor

비번변경 2022. 12. 13. 21:23

Executor

Task Instance를 실행하는 메커니즘으로, Executor를 어떻게 설정하느냐에 따라 Task가 순차적으로 실행되는지 병렬로 실행되는지, Scale up이 가능한지 등의 실행 방식이 달라진다.

한 번에 하나의 executor만 사용 가능하지만 필요에 따라 변경할 수 있다.

 

 

구성 방법

Executor는 airflow 구성 파일인 airflow.cfg [core] 섹션 executor 옵션을 이용해 설정할 수 있다. 기본값은 Sequential Executor이다.

[core]
executor = KubernetesExecutor

사용자 정의 Executor를 사용할 수도 있다.

 

 

구성 확인

현재 사용 중인 Executor는 아래 명령으로 확인할 수 있다.

airflow config get-value core executor

 

 

Executor 종류

Executor는 크게 스케쥴러 프로세스 내에서 local 하게 실행하는 유형과 worker pool을 통해 원격으로 실행하는 유형으로 나눌 수 있다. 소규모 단일 시스템인 경우에는 Local Executor를, 다중 시스템/클라우드 환경에서는 원격 executor를 사용하는 것이 좋다.

 

Local Executors

Sequential Executor

Airflow 기본 실행기로, 다중 연결을 지원하지 않은 sqlite를 사용하는 유일한 executor이다. 한 번에 하나의 Task instance만 실행할 수 있어 운영 환경에는 적합하지 않다.

 

Local Executor

sequential executor와 다르게 병렬성을 제공하며, 병렬 수준을 결정하는 self.parallelism 값에 따라 프로세스를 생성하여 Task를 실행시키는 executor이다. 

self.parallelism이 0인 경우, execute_async가 호출될 때마다 프로세스를 생성하여 처리한다. task가 처리되면 result_queue에 저장되고 프로세스를 종료한다.

self.parallelism > 0인 경우, task_queue를 사용해 self.parallelism 값만큼의 프로세스를 생성하여 task를 처리한다.

 

 

Remote Executors

Celery Executor

worker의 수를 동적으로 늘리고 줄이며 task를 병렬 실행할 수 있어 Local Executor보다 운영 환경에 적합하다. Celery는 실행할 명령을 저장하는 브로커(RabbitMQ, Redis 등)와 완료된 명령의 상태를 저장하는 결과 백엔드(airflow meta DB)로 구성되며, 브로커에 저장된 task를 실행하는 방식으로 동작한다.

Celery Executor를 사용하기 위해서는 클러스터 전반에 걸쳐 구성이 동일해야 하며, DAGS_FOLDER 또한 모든 worker에 배포되어야 한다. 즉, 파일 시스템 동기화가 필요하다.

 

CeleryKubernetes Executor

Celery Executor와 Kubernetes Executor를 동시에 실행할 수 있게 한다.

피크 때 스케줄 되어야 하는 task의 수가 쿠버네티스 클러스터에서 수월하게 처리할 수 있는 규모를 초과할 때, 그리고 Celery worker에서 처리할 수 있는 작은 작업도 많으면서 많은 리소스가 필요한 task도 있는 경우 사용하는 것이 좋다.


Dask Executor

단일 시스템 또는 원격 네트워크에서 동작하는 Dask 분산 클러스터에서 task를 실행할 수 있게 한다. airflow.cfg에서 다음과 같은 부분이 설정되어야 한다.

[core]
executor = DaskExecutor

[dask]
# This section only applies if you are using the DaskExecutor in
# [core] section above
# The IP address and port of the Dask cluster's scheduler.
cluster_address = 127.0.0.1:8786

# This section only applies if you are using the DaskExecutor in
# [core] section above

 

Kubernetes Executor

쿠버네티스 클러스터 포드에서 task instance를 실행하게 한다. Kubernetes Executor는 airflow 스케쥴러 프로세스로 실행되며 쿠버네티스 클러스터에 대한 접근이 필요하다. Kubernetes Executor가 쿠버네티스에서 동작할 필요는 없다. DAG가 task를 실행시키면 Kubernetes Executor가 worker pod를 요청하고, 생성된 pod가 task를 실행한 뒤 종료된다.

 

LocalKubernetes Executor

Local Executor와 Kubernetes Executor를 동시에 실행할 수 있게 한다.

 

 

 

참고 문서

https://airflow.apache.org/docs/apache-airflow/stable/executor/index.html

https://brandenpleines.medium.com/apache-airflow-dask-executor-17eea5d26a8b

https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html

https://dydwnsekd.tistory.com/98