from datetime import datetime, timedelta
from time import sleep
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models.baseoperator import chain
def dump():
sleep(3)
dag_args = {
"owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=1),
}
dag = DAG(
dag_id="test_for",
default_args=dag_args,
start_date=datetime(2022, 1, 20),
schedule_interval="@once",
)
# TASK 생성
list_task = list()
for i in range(4):
t = PythonOperator(task_id=f"task_{i}", python_callable=dump, dag=dag)
list_task.append(t)
Graph
1. 병렬
chain 함수에 task list를 전달하면 의존성을 병렬로 설정한다.
list_task = list()
for i in range(4):
t = PythonOperator(task_id=f"task_{i}", python_callable=dump, dag=dag)
list_task.append(t)
chain(list_task)
2. 직렬
chain 함수에 task를 쭉 나열하면 직렬로 의존성을 설정한다.
list_task = list()
for i in range(4):
t = PythonOperator(task_id=f"task_{i}", python_callable=dump, dag=dag)
list_task.append(t)
# list_task[0] >> list_task[1] >> list_task[2] >> list_task[3]
chain(list_task[0], list_task[1], list_task[2], list_task[3])
리스트 내에 있는 task를 모두 직렬로 연결할 때는 * 연산자를 사용해도 된다.
list_task = list()
for i in range(4):
t = PythonOperator(task_id=f"task_{i}", python_callable=dump, dag=dag)
list_task.append(t)
# list_task[0] >> list_task[1] >> list_task[2] >> list_task[3]
chain(*list_task)
3. task 리스트 간 종속성 설정
chain 함수는 여러 task 리스트를 전달받아 의존성을 설정할 수 있다. task 리스트 간 의존성을 설정할 때는 같은 인덱스의 task에 대한 downstream/upstream이 설정된다. 따라서 전달하는 리스트의 길이가 동일해야 한다.
list_task = list()
for i in range(4):
t = PythonOperator(task_id=f"task_{i}", python_callable=dump, dag=dag)
list_task.append(t)
# list_task[0] >> list_task[1]
# list_task[2] >> list_task[3]
chain([list_task[i] for i in range(len(list_task)) if i % 2 == 0],
[list_task[i] for i in range(len(list_task)) if i % 2 == 1])