XComs
cross-communications의 줄임말
기본적으로 Task는 독립적이다. 그리고 물리적으로 다른 서버에서 동작할 수 있다. 그러나 종종 Task 간에 값을 주고받는 등의 동작이 필요한데, XComs가 이를 가능케 한다.
Task는 xcom_push 함수를 이용해 다른 Task가 사용할 데이터를 내보내고, xcom 사용이 필요한 Task는 xcom_pull 함수를 이용해 데이터에 접근할 수 있다. 다만 특정 오퍼레이터는 do_xcom_push: True인 경우 반환 값을 자동으로 XComs에 push 하도록 할 수 있다.
XComs은 Key(name)와 task_id, dag_id, execution_date 등으로 식별한다. Task 간의 데이터 전달을 위해 설계되었기 때문에 큰 값을 전달하기에는 적합하지 않다. 최대 48KB의 데이터 전달이 가능하다.
💡 Airflow 1에서 XComs를 사용하기 위해서는 provide_context : True여야 한다.
PythonOperator - return
python_callable로 호출한 함수에서 return하는 값이 있을 경우 자동으로 xcom_push()가 실행된다.
아래와 같이 task_id를 반환하는 Task를 실행해보자.
dag = DAG(
'push_xcoms',
default_args=default_args,
description='print and push xcoms'
)
def return_tid(**context):
tid = context['task'].task_id
return tid
t1 = PythonOperator(
task_id='return_xcoms',
python_callable=return_tid,
provide_context=True,
dag=dag,
)
실행 후 Task Instance Details를 클릭하고,
Xcom을 클릭하면 Task가 내보낸 XCom을 확인할 수 있다. Key는 return_value가 기본값인 듯 하다.
그리고 Admin > Xcoms에서 전체 Xcom 목록을 확인할 수 있다.
PythonOperator - xcom_push
xcom_push 함수를 사용할 때는 아래와 같은 방식으로 사용할 수 있다.
def return_tid(**context):
tid = context['task'].task_id
context['task_instance'].xcom_push(key='return_tid', value=tid)
PythonOperator - xcom_pull
Task가 내보낸 XCom은 pull 해서 사용할 수 있다. 아래와 같이 print_tid를 실행하는 Task를 하나 더 추가해 실행해본다.
dag = DAG(
'push_xcoms',
default_args=default_args,
description='print and push xcoms',
# schedule_interval=timedelta(days=1),
)
def return_tid(**context):
tid = context['task'].task_id
return tid
def print_tid(**context):
tid = context['task_instance'].xcom_pull(key='return_value', task_ids='return_xcoms')
print(tid)
t1 = PythonOperator(
task_id='return_xcoms',
python_callable=return_tid,
provide_context=True,
dag=dag,
)
t2 = PythonOperator(
task_id='print_xcoms',
python_callable=print_tid,
provide_context=True,
dag=dag,
)
t1 >> t2
정상적으로 값에 접근한 것을 확인할 수 있다.
XCom은 내보낸 DAG와 사용하는 DAG가 다를 수도 있다. 다만 기본적으로는 push 한 Task의 execution_date와 pull 하는 Task의 execution_date가 동일해야 하는 것 같다.
참고 문서
https://dydwnsekd.tistory.com/107
https://moons08.github.io/programming/airflow-xcom/
https://it-sunny-333.tistory.com/160