개요
Airflow에서 많은 Dag, 많은 Task를 실행할 때 서로 다른 Dag 간의 Task 간의 실행 순서를 결정하기 위해 우선순위를 가질 수 있다. 이번 글에서는 Airflow가 Task의 실행 순서를 결정하는 규칙에 대해서 먼저 알아본다.
priority_weight
Airflow는 Executor에서의 task 실행 순서를 결정하기 위해 priority_weight과 weight_rule이라는 두 가지 개념을 사용한다.
그중 priority_weight은 Executor의 큐에서의 우선순위를 정의한다. 기본값은 1이며, 각 task는 weight_rule에 의해 계산된 유효한 priority_weight에 따라 실행 순서, 우선순위가 결정된다.
Task의 priority_weight 값이 높을 수록 먼저 실행된다.
weight_rule
Airflow는 Task의 유효한 priority_weight를 계산하기 위해 3가지의 weight_rule을 사용한다.
downstream
task의 priority_weight는 다운스트림 task의 총개수에 의해 결정된다. 즉, 업스트림 task일수록 더 높은 priority_weight을 가진다. 여러 dag_run이 실행될 때 각 Dag의 다운스트림 task보다 업스트림 task를 먼저 완료해야 할 때 유용하다.
upstream
task의 priority_weight는 업스트림 task의 총 개수에 의해 결정된다. 즉, 다운스트림 task일수록 더 높은 priority_weight을 가진다. 여러 dag_run이 실행될 때 각 Dag의 업스트림 task를 실행하는 것보다 Dag를 완료하는 것이 더 중요할 때 유용하다.
absolute
task에 지정한 priority_weight가 실제 priority_weight가 된다. 각 작업의 우선순위를 정확하게 알고 있을 때 사용해야 한다.
Airflow는 기본적으로 downstream 규칙을 사용한다.
테스트
다음과 같이 2개의 Task를 가진 Dag가 있다고 하자.
전체 코드는 접은글로 적어둔다.
from airflow import DAG
from airflow.operators.python import PythonOperator
def print_hello(**kwargs):
print(kwargs['ti'].priority_weight)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': '2023-01-01',
'retries': 1,
'retry_delay': 30,
}
dag = DAG(
'priority_test',
default_args=default_args,
schedule='0 0 * * *',
catchup=False
)
task_1 = PythonOperator(
task_id='task_1',
python_callable=print_hello,
dag=dag
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=print_hello,
dag=dag
)
task_1 >> task_2
Airflow Meta DB를 통해 각 task의 priority_weight를 확인해 보면 다음과 같다.
SELECT ti.dag_id
, ti.task_id
, priority_weight
FROM airflow.task_instance ti
WHERE ti.dag_id = 'priority_test'
업스트림의 task가 더 우선순위가 높은 것을 확인할 수 있다. 이제 weight_rule을 upstream으로 바꿔보자.
default_args['weight_rule'] = 'upstream'
이때 priority_weight를 확인해 보면 다음과 같다.
downstream 규칙 과 달리 다운스트림 task의 우선순위가 더 높은 것을 알 수 있다.
이번에는 upstream 규칙을 유지하면서 task에 priority_weight을 5로 지정해 보자.
task_1 = PythonOperator(
task_id='task_1',
python_callable=print_hello,
priority_weight=5,
dag=dag
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=print_hello,
priority_weight=5,
dag=dag
)
이제 각 task의 priority_weight 값이 task 생성했던 priority_weight 값의 배수로 변경된 것을 확인할 수 있다.
참고 문서
https://dodonam.tistory.com/466