개요
DAG 실행 시 실행할 PythonOperator가 호출할 함수에 매개변수를 전달하고 싶다. 테스트로 아래와 같은 코드를 작성해 실행해보았으나, python_callable param must be callable 에러가 발생했다.
... 생략 ...
t1 = PythonOperator(
task_id='print_string',
python_callable=print("hello, world!"),
dag=dag,
)
Airflow에서 python_callable로 실행할 함수에 인자값은 어떻게 전달할 수 있을까?
방법을 정리해둔다.
op_args
Airflow PythonOperator API 문서에 따르면 PythonOperator는 아래와 같은 매개변수를 가진다.
그 중 op_args는 함수를 호출할 때 전달되는 인수의 List 형 데이터에 해당한다. 즉, 다음과 같이 인자값을 리스트로 전달할 수 있다.
... 생략 ...
t1 = PythonOperator(
task_id='print_string',
python_callable=print,
op_args=["hello", "world"],
dag=dag,
)
op_kwargs
또는 함수를 호출할 때 전달되는 인수의 Dictionary 형 데이터인 op_kwargs를 이용할 수도 있다.
... 생략 ...
def print_statement(**kwargs):
print(f"{kwargs['name']} is {kwargs['job']}.")
t1 = PythonOperator(
task_id='print_string',
python_callable=print_statement,
op_kwargs={"name": "passwd", "job": "blogger"},
dag=dag,
)
Dictionary
PythonOperator는 **kwargs를 통해 딕셔너리를 매개변수로 받고 있으므로, 아래와 같이 op_kwargs가 아닌 다른 이름으로 딕셔너리 데이터를 전달할 수도 있다. 이 때 provide_context는 True로 설정해야 한다. (provide_context 속성은 Airflow 1.x에서만 유효하다.)
... 생략 ...
def print_statement(**context):
context = context["params"]
name = context.get("name")
job = context.get("job")
print(f"{name} is {job}.")
t1 = PythonOperator(
task_id='print_string',
python_callable=print_statement,
params = {"name": "passwd", "job": "user"},
provide_context=True,
dag=dag,
)
Trigger DAG 시 인수 전달
또한 웹에서 Trigger DAG 시 JSON 형식으로 인자값을 전달받아 사용할 수도 있다.
웹에서 인자값을 전달받기 위해서는 Task 인스턴스 생성 시 provide_context를 True로 설정해야 한다. 전달받은 인자값은 context['dag_run'].conf로 접근하여 사용할 수 있다.
... 생략 ...
def print_statement(**context):
context = context['dag_run'].conf
name = context.get("name")
job = context.get("job")
print(f"{name} is {job}.")
t1 = PythonOperator(
task_id='print_string',
python_callable=print_statement,
provide_context=True,
dag=dag,
)
참고 문서