Apache Airflow

[Airflow] DAG 생성

비번변경 2022. 7. 25. 23:30

DAG

Airflow의 핵심 개념으로, 여러 Task와 각 Task가 어떻게 실행되어야 하는지를 정의하는 종속성과 관계로 구성된다.

DAG

DAG는 사진의 Task a, b, c, d를 정의하고 실행 순서, 그리고 의존성 등을 지정한다. 또한 DAG가 얼마나 자주 실행되어야 하는지를 지정해야 한다.

 

Airflow의 Python 스크립트는 이러한 DAG의 구조를 코드로 지정하는 설정 파일이다. DAG 정의 파일은 실제 데이터 처리를 수행하지 않으며, 단순히 DAG 객체만을 정의한다.

이 글에서는 DAG 정의 방법을 정리해둔다.

 

DAG 관리 디렉터리 생성

Airflow는 DAG를 관리하는 디렉터리를 지정하여 관리한다. 해당 설정은 airflow.cfg에서 설정할 수 있다.

dags_folder

해당 경로에 맞게 Airflow 홈 디렉터리에 dags 디렉터리를 생성하고, 해당 디렉터리에 DAG 정의 파일을 작성하면 된다.

cd AIRFLOW_HOME
mkdir dags

dags 디렉터리

 

 

DAG 정의 파일

DAG 정의 파일을 작성해본다. 이 글에서는 bash를 이용해 hello, world를 출력하는 Task 1개를 가진 DAG를 생성해보고자 한다.

 

모듈 Import

from datetime import timedelta

# DAG 인스턴스화에 사용하는 라이브러리
from airflow import DAG

# Operator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

 

매개변수 정의

Operator에 전달할 매개변수를 정의한다. DAG 내에서 계속 사용되는 매개변수는 별도로 분리하면 코드를 보다 깔끔하게 분리할 수 있다.

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    # 'email': ['airflow@example.com'],
    # 'email_on_failure': False,
    # 'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

 

DAG 인스턴스 생성

DAG_ID와 default_args, description, schedule_interaval 등을 지정하여 DAG 인스턴스를 생성한다.

# DAG 인스턴스화
dag = DAG(
    'helloworld',
    default_args=default_args,
    description='echo "hello, world!"',
    schedule_interval=timedelta(days=1),
)

DAG_ID는 Airflow 서버 내에서 유일해야 한다.

 

Task 정의

BashOperator를 이용해 Task를 정의한다. bash_command에 실행할 명령문을 전달한다.

t1 = BashOperator(
    task_id='echo_hello_world',
    bash_command='echo "Hello, World!"',
    dag=dag,
)

task_id는 영문자, -, 점, _으로만 구성되어야 한다. 그 외의 문자가 포함되면 아래와 같이 Broken DAG 알람을 만날 수 있다.

task_id 에러

 

여기까지 작성한 전체 코드는 접은글로 확인할 수 있다.

더보기

 

# DAG 인스턴스화에 사용하는 라이브러리
from airflow import DAG

# Operator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# Operator에 매개변수 전달
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    # 'email': ['airflow@example.com'],
    # 'email_on_failure': False,
    # 'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

# DAG 인스턴스화
dag = DAG(
    'helloworld',
    default_args=default_args,
    description='echo "hello, world!"',
    schedule_interval=timedelta(days=1),
)

# task
t1 = BashOperator(
    task_id='echo_hello_world',
    bash_command='echo "Hello, World!"',
    dag=dag
)

 

 

DAG 실행

작성한 파일에 이상이 없다면 웹 서버에서 생성한 DAG를 확인할 수 있다.

생성한 DAG

DAG 이름을 클릭하면 Task 정보 등을 확인할 수 있다.

DAG 상세

DAG 이름 옆의 Off 버튼을 눌러 On으로 전환할 수 있다. 스케줄이 설정된 DAG는 On으로 전환 시 바로 트리거 되어 실행된다.

DAG 실행

실행된 Task를 클릭하고 View Log를 클릭하면 실행한 Task의 로그를 확인할 수 있다.

Task 작업

로그에서 실행한 명령어와 출력, 그리고 종료 상태 값 등을 확인할 수 있다.

Task 로그

 

 

참고 문서

https://airflow.apache.org/docs/apache-airflow/1.10.13/tutorial.html

728x90