Apache Airflow

[Airflow 2.x] context 변수 사용하기

비번변경 2023. 4. 28. 23:51

개요

기존 작성한

2022.08.19 - [Airflow] XComs

2022.08.11 - [Airflow] PythonOperator 매개변수 전달

에서 PythonOperator를 이용한 task에서 현재 Dag에 대한 정보를 접근할 때 provide_context라는 매개변수에 True 값을 전달했다.

t1 = PythonOperator(
    task_id='print_string',
    python_callable=print_statement,
    params = {"name": "passwd", "job": "user"},
    provide_context=True,
    dag=dag,
)

하지만 Airflow 2에서는 더 이상 provide_context를 사용하지 않는다.

참고 : https://airflow.apache.org/docs/apache-airflow/2.5.1/release_notes.html#airflow-operators-python-pythonoperator

대신 context 변수에 접근하는 방법을 확인한다.

 

아래 Dag의 전체 코드는 접은글로 작성한다.

더보기
from datetime import datetime, timedelta

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.operators.python import PythonOperator

dag_args = {
    "owner": "airflow",
    "retries": 1,
    "retry_delay": timedelta(minutes=1),
}


def print_var(execution_date, logical_date, dag_run, ti):
    print(f"execution_date: {execution_date}")
    print(f"logical_date: {logical_date}")
    print(f"dag_id: {dag_run.dag_id}")
    print(f"dag_id: {ti.task_id}")


def print_context(**context):
    print(context)


def print_dag_id():
    context = get_current_context()
    print(context['dag_run'].dag_id)


@dag(start_date=datetime(2022, 1, 20),
     dag_id='print_context_var',
     default_args=dag_args,
     schedule="@once")
def generate_dag():
    t1 = PythonOperator(
        task_id="print_some_var",
        python_callable=print_var
    )
    t2 = PythonOperator(
        task_id="print_context",
        python_callable=print_context
    )
    t3 = PythonOperator(
        task_id="print_dag_id",
        python_callable=print_dag_id
    )


generate_dag()

 

 

context 변수 자동 감지

Airflow 2에서는 task context로부터 변수가 자동으로 감지되므로 별다른 작업 없이 원하는 변수에 접근할 수 있다.

def print_var(execution_date, logical_date, dag_run, ti):
    print(f"execution_date: {execution_date}")
    print(f"logical_date: {logical_date}")
    print(f"dag_id: {dag_run.dag_id}")
    print(f"dag_id: {ti.task_id}")


@dag(start_date=datetime(2022, 1, 20),
     dag_id='print_context_var',
     default_args=dag_args,
     schedule="@once")
def generate_dag():
    t1 = PythonOperator(
        task_id="print_some_var",
        python_callable=print_var
    )

 

실행 결과

 

 

**context 사용

**context를 통해 모든 context 변수에 접근하는 것도 가능하다.

def print_context(**context):
    for k, v in context.items():
        print(f"{k}: {v}")
        
       
@dag(start_date=datetime(2022, 1, 20),
     dag_id='print_context_var',
     default_args=dag_args,
     schedule="@once")
def generate_dag():
    t1 = PythonOperator(
        task_id="print_some_var",
        python_callable=print_var
    )
    t2 = PythonOperator(
        task_id="print_context",
        python_callable=print_context
    )

 

실행 결과

 

 

get_current_context 사용

context 변수를 자동으로 감지하는 방법, **context 변수를 사용하는 방법 모두 사용하기 위해서는 함수에 매개변수를 추가해야 하는 것으로 보인다. 함수의 매개변수 정의를 변경하지 않고 context 변수를 가져오고 싶다면 get_current_context 함수를 사용할 수 있다.

get_current_context를 사용할 때는 아래 import 문이 필요하다.

from airflow.operators.python import get_current_context

그리고 함수 내에서 호출하여 사용할 수 있다.

def print_dag_id():
    context = get_current_context()
    print(context['dag_run'].dag_id)

 

실행 결과

 

 

 

참고 문서

airflow.operators.python.get_current_context()

https://airflow.apache.org/docs/apache-airflow/2.5.1/release_notes.html#airflow-operators-python-pythonoperator