개요
2025.01.23-[Airflow] 2.5.1 -> 2.8.2 업그레이드 후 Trigger DAG w/ config 미노출 현상에서 Dag의 매개변수를 전달하는 새로운 방법이 있다는 것을 알게 되었다. 해당 글에서는 기존 매개변수 전달 방식을 유지하기 위한 방법에 초점을 맞춘 글을 작성했는데, 이번에는 새로운 매개변수 전달 및 사용 방법에 대해서 알아보려고 한다.
Params
Airflow Params는 Dag에 런타임 구성을 제공하여, Dag와 Task에 값을 전달하는 방법이다. Dag 정의 코드에서 구성하면 Dag를 트리거할 때 추가로 매개변수를 전달하거나 기존에 설정되어 있던 값을 덮어쓴다. 또한 Trigger UI Form을 렌더링 하는 데 사용한다.
Dag 수준 매개변수
Task에 전달되는 기본값에 해당되며 Dag 객체에 params 매개변수에 Params 객체 또는 값으로 이루어진 Dictionary를 전달하면 된다.
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
with DAG(
"the_dag",
params={
"x": Param(5, type="integer", minimum=3),
"my_int_param": 6
},
) as dag:
Params 객체는 기본값, 매개변수의 유형을 나타내는 type, 값을 제한하기 위한 number, integer 등으로 구성된다. Params 객체의 속성은 Trigger UI From을 렌더링하는데 사용되며, Form 필드가 표시되는 방식을 정의한다.
Task 수준 매개변수
Params는 Dag 수준 뿐만 아니라 Task 수준으로도 전달할 수 있다. 전달 방식은 Dag 객체에 전달하는 것과 동일하게 Operator 객체에 params 매개변수로 전달하면 된다.
Task 수준 매개변수는 Dag 수준 매개변수보다 높은 우선순위를 가지되, Trigger 시의 값보다는 낮은 우선순위를 가진다.
PythonOperator(
task_id="print_my_int_param",
params={"my_int_param": 10},
python_callable=print_my_int_param,
)
예시
위에서 살펴본 예시 코드로 매개변수를 전달하고 사용하는 방법을 알아보자.
아래 코드는 Params 객체와 값으로 Dag 수준 매개변수를 전달하고 PythonOperator 내에서 참조하여 값을 출력하는 Dag이다.
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
with DAG(
"params_test_dag",
params={
"x": Param(5, type="integer", minimum=3),
"my_int_param": 6
},
) as dag:
@task.python()
def my_task(params, **context):
print(f'params : {params}')
print(f'dag.params : {dag.params}')
print(f'context["dag_run"].conf : {context["dag_run"].conf}')
my_task()
이 Dag 정의 파일은 다음과 같은 Trigger Dag UI를 제공한다.
두 개 값을 전부 10, 10으로 변경하여 실행한 결과는 아래와 같다.
params, dag_run.conf를 참조한 결과와 dag.params를 참조한 결과가 서로 다른 모습을 확인할 수 있다. params, dag_run.conf는 트리거 시 전달받은 값을 참조하는 반면, dag.params은 dag 객체의 params 속성값, 즉 기본값을 참조하기 때문에 다른 결과를 보이고 있다.
아래 코드는 Task 수준 매개변수를 전달하고 사용하는 예시이다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
from airflow.models.param import Param
def print_my_int_param(params, **context):
print(f'params : {params}')
print(f'dag.params : {dag.params}')
print(f'context["dag_run"].conf : {context["dag_run"].conf}')
with DAG(
"params_test_dag",
params={
"x": Param(5, type="integer", minimum=3),
"my_int_param": 6
},
) as dag:
test_2 = PythonOperator(
task_id="print_my_int_param",
params={"my_int_param": 10},
python_callable=print_my_int_param)
다만 Dag 수준 params, Task 수준 params 모두에서 같은 이름의 매개변수를 구성한 경우, 기본적으로는 Trigger DAG UI가 항상 노출되기 때문에 task 수준의 매개변수, Task 수준에서의 기본값을 사용하는 경우는 없는 것 같다.
참고 문서
https://airflow.apache.org/docs/apache-airflow/2.8.2/core-concepts/params.html