개요
2022.12.13 - [Airflow] Executor
Airflow는 기본적으로 작업을 순차적으로 실행하는 SequentialExecutor를 사용한다. 하지만 task 병렬 실행을 지원하지 않아 운영 환경에서는 적합하지 않아 Airflow 공식 문서에서도 변경하여 사용하도록 권장하고 있다.
이 글에서는 task를 병렬 실행할 수 있는 LocalExecutor를 사용한다. 기본값인 SequentialExecutor를 사용할 때와 어떻게 동작이 다른 지도 살펴본다.
테스트 DAG
먼저 task가 병렬로 나열된 DAG를 작성한다.
from datetime import datetime, timedelta
from time import sleep
from airflow import DAG
from airflow.operators.python import PythonOperator
dag_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=1),
}
dag = DAG(
dag_id="parallel_test",
default_args=dag_args,
start_date=datetime(2022, 1, 20),
schedule_interval="@once",
)
def dump():
sleep(3)
task_1 = PythonOperator(task_id="task_1", python_callable=dump, dag=dag)
task_2 = PythonOperator(task_id="task_2", python_callable=dump, dag=dag)
task_3 = PythonOperator(task_id="task_3", python_callable=dump, dag=dag)
task_4 = PythonOperator(task_id="task_4", python_callable=dump, dag=dag)
task_5 = PythonOperator(task_id="task_5", python_callable=dump, dag=dag)
task_1 >> task_2 >> task_5
task_3 >> task_4 >> task_5
SequentialExecutor 사용 시
먼저 SequentialExecutor를 사용할 때 동작 방식을 살펴본다.
1. 현재 Executor 확인
airflow config get-value core executor
2. DAG 트리거
Dag를 트리거하여 동작시키면 task_1, task_3이 병렬로 나열되어 있음에도 동시에 실행되지 않고 하나의 task가 완료되면 다음 task를 실행되는 모습을 확인할 수 있다.
LocalExecutor 사용
LocalExecutor를 사용하도록 변경하고, 테스트 DAG를 다시 트리거한다.
1. Executor 설정 업데이트
구성 파일인 airflow.cfg 파일 내 core.executor 설정을 LocalExecutor로 변경한다.
2. Airflow 스케쥴러 재시작
# 스케쥴러 종료
ps -ef | grep aiflow | grep scheduler
kill <PID>
# 스케쥴러 시작
airflow scheduler -D
3. DAG 트리거
이제 병렬로 나열된 task_1, task_3가 동시에 실행되는 모습을 확인할 수 있다.