Apache Airflow

[Airflow] PythonOperator 매개변수 전달

비번변경 2022. 8. 11. 19:00

개요

DAG 실행 시 실행할 PythonOperator가 호출할 함수에 매개변수를 전달하고 싶다. 테스트로 아래와 같은 코드를 작성해 실행해보았으나, python_callable param must be callable 에러가 발생했다.

... 생략 ...

t1 = PythonOperator(
    task_id='print_string',
    python_callable=print("hello, world!"),
    dag=dag,
)

python_callable param must be callable 에러

Airflow에서 python_callable로 실행할 함수에 인자값은 어떻게 전달할 수 있을까?

방법을 정리해둔다.

 

 

op_args

Airflow PythonOperator API 문서에 따르면 PythonOperator는 아래와 같은 매개변수를 가진다.

PythonOperator API 문서

그 중 op_args는 함수를 호출할 때 전달되는 인수의 List 형 데이터에 해당한다. 즉, 다음과 같이 인자값을 리스트로 전달할 수 있다.

... 생략 ...

t1 = PythonOperator(
    task_id='print_string',
    python_callable=print,
    op_args=["hello", "world"],
    dag=dag,
)

op_args

 

 

op_kwargs

또는 함수를 호출할 때 전달되는 인수의 Dictionary 형 데이터인 op_kwargs를 이용할 수도 있다.

... 생략 ...

def print_statement(**kwargs):
    print(f"{kwargs['name']} is {kwargs['job']}.")

t1 = PythonOperator(
    task_id='print_string',
    python_callable=print_statement,
    op_kwargs={"name": "passwd", "job": "blogger"},
    dag=dag,
)

op_kwargs

 

 

Dictionary

PythonOperator는 **kwargs를 통해 딕셔너리를 매개변수로 받고 있으므로, 아래와 같이 op_kwargs가 아닌 다른 이름으로 딕셔너리 데이터를 전달할 수도 있다. 이 때 provide_context는 True로 설정해야 한다. (provide_context 속성은 Airflow 1.x에서만 유효하다.)

... 생략 ...

def print_statement(**context):
    context = context["params"]
    name = context.get("name")
    job = context.get("job")
    print(f"{name} is {job}.")


t1 = PythonOperator(
    task_id='print_string',
    python_callable=print_statement,
    params = {"name": "passwd", "job": "user"},
    provide_context=True,
    dag=dag,
)

Dictionary

 

 

Trigger DAG 시 인수 전달

또한 웹에서 Trigger DAG 시 JSON 형식으로 인자값을 전달받아 사용할 수도 있다.

Trigger DAG with Config

웹에서 인자값을 전달받기 위해서는 Task 인스턴스 생성 시 provide_context를 True로 설정해야 한다. 전달받은 인자값은 context['dag_run'].conf로 접근하여 사용할 수 있다.

... 생략 ...

def print_statement(**context):
    context = context['dag_run'].conf
    name = context.get("name")
    job = context.get("job")
    print(f"{name} is {job}.")

t1 = PythonOperator(
    task_id='print_string',
    python_callable=print_statement,
    provide_context=True,
    dag=dag,
)

dag_run.conf

 

 

참고 문서

https://airflow.apache.org/docs/apache-airflow/1.10.2/_modules/airflow/operators/python_operator.html#PythonOperator

https://airflow.apache.org/docs/apache-airflow/1.10.2/code.html#airflow.operators.python_operator.PythonOperator