Apache Airflow

[Airflow] XComs

비번변경 2022. 8. 19. 21:45

XComs

cross-communications의 줄임말

기본적으로 Task는 독립적이다. 그리고 물리적으로 다른 서버에서 동작할 수 있다. 그러나 종종 Task 간에 값을 주고받는 등의 동작이 필요한데, XComs가 이를 가능케 한다.

Task는 xcom_push 함수를 이용해 다른 Task가 사용할 데이터를 내보내고, xcom 사용이 필요한 Task는 xcom_pull 함수를 이용해 데이터에 접근할 수 있다. 다만 특정 오퍼레이터는 do_xcom_push: True인 경우 반환 값을 자동으로 XComs에 push 하도록 할 수 있다. 

 

XComs은 Key(name)와 task_id, dag_id, execution_date 등으로 식별한다. Task 간의 데이터 전달을 위해 설계되었기 때문에 큰 값을 전달하기에는 적합하지 않다. 최대 48KB의 데이터 전달이 가능하다.

 

💡 Airflow 1에서 XComs를 사용하기 위해서는 provide_context : True여야 한다.

 

PythonOperator - return

python_callable로 호출한 함수에서 return하는 값이 있을 경우 자동으로 xcom_push()가 실행된다.

아래와 같이 task_id를 반환하는 Task를 실행해보자.

dag = DAG(
    'push_xcoms',
    default_args=default_args,
    description='print and push xcoms'
)

def return_tid(**context):
    tid = context['task'].task_id

    return tid

t1 = PythonOperator(
    task_id='return_xcoms',
    python_callable=return_tid,
    provide_context=True,
    dag=dag,
)

실행 후 Task Instance Details를 클릭하고,

Task Instance Details
Task가 내보낸 xcom 확인

Xcom을 클릭하면 Task가 내보낸 XCom을 확인할 수 있다. Key는 return_value가 기본값인 듯 하다.

그리고 Admin > Xcoms에서 전체 Xcom 목록을 확인할 수 있다.

Admin > Xcoms

 

 

PythonOperator - xcom_push

xcom_push 함수를 사용할 때는 아래와 같은 방식으로 사용할 수 있다.

def return_tid(**context):
    tid = context['task'].task_id
    context['task_instance'].xcom_push(key='return_tid', value=tid)

xcom_push로 내보낸 XCom

 

 

PythonOperator - xcom_pull

Task가 내보낸 XCom은 pull 해서 사용할 수 있다. 아래와 같이 print_tid를 실행하는 Task를 하나 더 추가해 실행해본다.

dag = DAG(
    'push_xcoms',
    default_args=default_args,
    description='print and push xcoms',
    # schedule_interval=timedelta(days=1),
)

def return_tid(**context):
    tid = context['task'].task_id

    return tid

def print_tid(**context):
    tid = context['task_instance'].xcom_pull(key='return_value', task_ids='return_xcoms')

    print(tid)

t1 = PythonOperator(
    task_id='return_xcoms',
    python_callable=return_tid,
    provide_context=True,
    dag=dag,
)

t2 = PythonOperator(
    task_id='print_xcoms',
    python_callable=print_tid,
    provide_context=True,
    dag=dag,
)

t1 >> t2

같은 DAG 내에서 pull

정상적으로 값에 접근한 것을 확인할 수 있다.

XCom은 내보낸 DAG와 사용하는 DAG가 다를 수도 있다. 다만 기본적으로는 push 한 Task의 execution_date와 pull 하는 Task의 execution_date가 동일해야 하는 것 같다.

다른 DAG에서 pull

 

 

참고 문서

https://dydwnsekd.tistory.com/107

https://moons08.github.io/programming/airflow-xcom/

https://it-sunny-333.tistory.com/160