DAG 선언
DAG를 선언하는 방법은 세 가지가 있다. 이 블로그에서는 주로 표준 생성자를 사용하여 생성하는데
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.empty import EmptyOperator
dag_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=1),
}
dag = DAG(
dag_id="dag_create_test_with",
default_args=dag_args,
start_date=datetime(2022, 1, 20),
schedule_interval="@once",
)
t1 = EmptyOperator(task_id="task", dag=dag)
이 글에서는 다른 생성 방법인 with문을 이용한 방법과 decorator를 이용한 방법을 정리한다.
with문
생성자를 이용하는 방법과 유사하지만 dag를 변수에 할당하지 않아도 된다.
또한 with문 내에 task를 생성하며 task 생성 시 dag 매개변수를 전달하지 않아도 된다.
with DAG(
dag_id="dag_create_test_with",
default_args=dag_args,
start_date=datetime(2022, 1, 20),
schedule_interval="@once", ):
t1 = EmptyOperator(task_id="task")
데코레이터 사용
Airflow 2.0부터는 @dag 데코레이터를 사용해 DAG를 생성할 수도 있다. @dag를 사용하기 위해서는 decorator가 정의된 모듈 import가 필요하다.
from airflow.decorators import dag
그리고 Task를 선언하는 함수 위에 사용한다.
from datetime import datetime, timedelta
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
dag_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=1),
}
@dag(start_date=datetime(2022, 1, 20),
dag_id='create_dag_test_decorator',
default_args=dag_args,
schedule_interval="@once")
def generate_dag():
t1 = EmptyOperator(task_id="task")
dag = generate_dag()
데코레이터 사용 시 dag_id를 지정하지 않으면 데코레이터를 사용한 함수의 이름이 dag_id가 된다.
@dag(start_date=datetime(2022, 1, 20),
default_args=dag_args,
schedule_interval="@once")
def generate_dag():
t1 = EmptyOperator(task_id="task")
dag = generate_dag()
참고 문서
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#declaring-a-dag
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#the-dag-decorator