개요
기존 작성한
2022.08.11 - [Airflow] PythonOperator 매개변수 전달
에서 PythonOperator를 이용한 task에서 현재 Dag에 대한 정보를 접근할 때 provide_context라는 매개변수에 True 값을 전달했다.
t1 = PythonOperator(
task_id='print_string',
python_callable=print_statement,
params = {"name": "passwd", "job": "user"},
provide_context=True,
dag=dag,
)
하지만 Airflow 2에서는 더 이상 provide_context를 사용하지 않는다.
대신 context 변수에 접근하는 방법을 확인한다.
아래 Dag의 전체 코드는 접은글로 작성한다.
더보기
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.operators.python import PythonOperator
dag_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=1),
}
def print_var(execution_date, logical_date, dag_run, ti):
print(f"execution_date: {execution_date}")
print(f"logical_date: {logical_date}")
print(f"dag_id: {dag_run.dag_id}")
print(f"dag_id: {ti.task_id}")
def print_context(**context):
print(context)
def print_dag_id():
context = get_current_context()
print(context['dag_run'].dag_id)
@dag(start_date=datetime(2022, 1, 20),
dag_id='print_context_var',
default_args=dag_args,
schedule="@once")
def generate_dag():
t1 = PythonOperator(
task_id="print_some_var",
python_callable=print_var
)
t2 = PythonOperator(
task_id="print_context",
python_callable=print_context
)
t3 = PythonOperator(
task_id="print_dag_id",
python_callable=print_dag_id
)
generate_dag()
context 변수 자동 감지
Airflow 2에서는 task context로부터 변수가 자동으로 감지되므로 별다른 작업 없이 원하는 변수에 접근할 수 있다.
def print_var(execution_date, logical_date, dag_run, ti):
print(f"execution_date: {execution_date}")
print(f"logical_date: {logical_date}")
print(f"dag_id: {dag_run.dag_id}")
print(f"dag_id: {ti.task_id}")
@dag(start_date=datetime(2022, 1, 20),
dag_id='print_context_var',
default_args=dag_args,
schedule="@once")
def generate_dag():
t1 = PythonOperator(
task_id="print_some_var",
python_callable=print_var
)
실행 결과
**context 사용
**context를 통해 모든 context 변수에 접근하는 것도 가능하다.
def print_context(**context):
for k, v in context.items():
print(f"{k}: {v}")
@dag(start_date=datetime(2022, 1, 20),
dag_id='print_context_var',
default_args=dag_args,
schedule="@once")
def generate_dag():
t1 = PythonOperator(
task_id="print_some_var",
python_callable=print_var
)
t2 = PythonOperator(
task_id="print_context",
python_callable=print_context
)
실행 결과
get_current_context 사용
context 변수를 자동으로 감지하는 방법, **context 변수를 사용하는 방법 모두 사용하기 위해서는 함수에 매개변수를 추가해야 하는 것으로 보인다. 함수의 매개변수 정의를 변경하지 않고 context 변수를 가져오고 싶다면 get_current_context 함수를 사용할 수 있다.
get_current_context를 사용할 때는 아래 import 문이 필요하다.
from airflow.operators.python import get_current_context
그리고 함수 내에서 호출하여 사용할 수 있다.
def print_dag_id():
context = get_current_context()
print(context['dag_run'].dag_id)
실행 결과
참고 문서
airflow.operators.python.get_current_context()