Apache Airflow

[Airflow] weight_rule - Task 우선순위 결정

비번변경 2024. 8. 23. 18:44

개요

Airflow에서 많은 Dag, 많은 Task를 실행할 때 서로 다른 Dag 간의 Task 간의 실행 순서를 결정하기 위해 우선순위를 가질 수 있다. 이번 글에서는 Airflow가 Task의 실행 순서를 결정하는 규칙에 대해서 먼저 알아본다.

 

 

priority_weight

Airflow는 Executor에서의 task 실행 순서를 결정하기 위해 priority_weight과 weight_rule이라는 두 가지 개념을 사용한다.

그중 priority_weight은 Executor의 큐에서의 우선순위를 정의한다. 기본값은 1이며, 각 task는 weight_rule에 의해 계산된 유효한 priority_weight에 따라 실행 순서, 우선순위가 결정된다.

Task의 priority_weight 값이 높을 수록 먼저 실행된다.

 

 

weight_rule

Airflow는 Task의 유효한 priority_weight를 계산하기 위해 3가지의 weight_rule을 사용한다.

 

downstream

task의 priority_weight는 다운스트림 task의 총개수에 의해 결정된다. 즉, 업스트림 task일수록 더 높은 priority_weight을 가진다. 여러 dag_run이 실행될 때 각 Dag의 다운스트림 task보다 업스트림 task를 먼저 완료해야 할 때 유용하다.

 

upstream

task의 priority_weight는 업스트림 task의 총 개수에 의해 결정된다. 즉, 다운스트림 task일수록 더 높은 priority_weight을 가진다. 여러 dag_run이 실행될 때 각 Dag의 업스트림 task를 실행하는 것보다 Dag를 완료하는 것이 더 중요할 때 유용하다.

 

absolute

task에 지정한 priority_weight가 실제 priority_weight가 된다. 각 작업의 우선순위를 정확하게 알고 있을 때 사용해야 한다.

 

Airflow는 기본적으로 downstream 규칙을 사용한다.

 

 

테스트

다음과 같이 2개의 Task를 가진 Dag가 있다고 하자.

전체 코드는 접은글로 적어둔다.

더보기
from airflow import DAG
from airflow.operators.python import PythonOperator


def print_hello(**kwargs):
    print(kwargs['ti'].priority_weight)


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': '2023-01-01',
    'retries': 1,
    'retry_delay': 30,
}

dag = DAG(
    'priority_test',
    default_args=default_args,
    schedule='0 0 * * *',
    catchup=False
)

task_1 = PythonOperator(
    task_id='task_1',
    python_callable=print_hello,
    dag=dag
)

task_2 = PythonOperator(
    task_id='task_2',
    python_callable=print_hello,
    dag=dag
)
task_1 >> task_2

Airflow Meta DB를 통해 각 task의 priority_weight를 확인해 보면 다음과 같다.

SELECT ti.dag_id 
	, ti.task_id
	, priority_weight 
FROM airflow.task_instance ti 
WHERE ti.dag_id = 'priority_test'

 

업스트림의 task가 더 우선순위가 높은 것을 확인할 수 있다. 이제 weight_rule을 upstream으로 바꿔보자.

default_args['weight_rule'] = 'upstream'

이때 priority_weight를 확인해 보면 다음과 같다.

downstream 규칙 과 달리 다운스트림 task의 우선순위가 더 높은 것을 알 수 있다.

이번에는 upstream 규칙을 유지하면서 task에 priority_weight을 5로 지정해 보자.

task_1 = PythonOperator(
    task_id='task_1',
    python_callable=print_hello,
    priority_weight=5,
    dag=dag
)

task_2 = PythonOperator(
    task_id='task_2',
    python_callable=print_hello,
    priority_weight=5,
    dag=dag
)

이제 각 task의 priority_weight 값이 task 생성했던 priority_weight 값의 배수로 변경된 것을 확인할 수 있다.

 

 

참고 문서

https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/priority-weight.html

https://dodonam.tistory.com/466