Apache Airflow

[Airflow] 사용자 정의 오퍼레이터 (Custom Operator)

비번변경 2024. 5. 17. 21:20

개요

Airflow에서는 오퍼레이터를 통해 여러 기능을 제공하고 있다. 만약 원하는 기능을 제공하는 오퍼레이터가 없다면 개발자가 직접 구현하여 사용할 수 있다.

이 글에서는 사용자 정의 오퍼레이터를 구현하여 테스트해보려고 한다. 구현할 기능은 문자열을 입력받고, 문자열을 반환하는 정도로 한다.

 

 

사용자 정의 오퍼레이터

사용자 정의 오퍼레이터를 만들기 위해서는 아래 조건을 만족해야 한다.

  1. BaseOperator 상속
  2. 생성자 작성 : 오퍼레이터에 필요한 매개변수를 정의한다. 
  3. execute 함수 작성 : Executor가 오퍼레이터를 호출할 때 실행할 코드를 작성한다.

순서대로 차근차근 작성해 보도록 하겠다.

 

 

BaseOperator 상속

상속받을 BaseOperator가 정의된 라이브러리를 임포트 한다.

from airflow.models.baseoperator import BaseOperator

 

그리고 MyOperator에 BaseOperator를 상속받도록 한다.

class MyOperator(BaseOperator):
    pass

 

 

생성자 작성

Dag가 파싱 될 때 실행되는 생성자 __init__ 함수를 정의한다. __init__ 함수에서는 사용자 정의 오퍼레이터에서 필요한 매개변수를 정의하고, BaseOperator의 생성자가 동작해야 한다.

from airflow.models.baseoperator import BaseOperator

class HelloOperator(BaseOperator):
    def __init__(self, name, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.name = name

 

 

execute 함수 작성

이제 오퍼레이터로 Task를 만들어 실행했을 때 동작하는 execute 함수를 작성한다. execute 함수에는 Airflow의 context가 항상 전달되어야 한다. 때문에 매개변수에 context라는 이름의 변수가 항상 포함되어 있어야 한다.

    def execute(self, context):
        # Airflow task 로깅
        self.log.info(f'name: {self.name}')
        return f'Hello, {self.name}'

기본적으로 execute 함수의 반환값은 XCom으로 push 된다.

 

기능 구현을 완료한 오퍼레이터는 airflow dags_folder 아래에 위치시킨다. 필요시 사용자 정의 오퍼레이터를 모아두기 위한 경로를 생성하여 저장해도 된다. 이 글에서는 dags_folder에 사용자 정의 오퍼레이터를 모아둘 custom_operator라는 경로를 생성하여 저장해 두었다.

전체 코드는 접은 글로 작성해 둔다.

더보기
from airflow.models.baseoperator import BaseOperator

class HelloOperator(BaseOperator):
    def __init__(self, name, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.name = name

    def execute(self, context):
        self.log.info(f'name: {self.name}')
        return f'Hello, {self.name}'

 

 

사용자 정의 오퍼레이터 사용

 

이제 위에서 구현한 HelloOperator를 사용하여 Task를 생성한다.

from airflow import DAG
from custom_operator.hello_operator import HelloOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': '2023-01-01',
    'retries': 1,
    'retry_delay': 30,
    'catchup': False
}

dag = DAG(
    'dag_test_custom_operator',
    default_args=default_args,
    schedule='0 0 * * *',
    catchup=False
)

task_sensor = HelloOperator(
    task_id='hello_task',
    name='Passwd',
    dag=dag
)

Airflow 상에서도 직접 구현한 HelloOperator를 사용한 Task임을 확인할 수 있다. Dag를 켜서 실행시켜 보면 정상적으로 잘 동작한다.

Task 로그에서 지정한 형식으로 로그가 출력된 모습을 확인할 수 있고, 반환값은 XCom 목록에서 확인할 수 있다.

 

 

참고 문서

https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html#

https://medium.com/@savleenkr92/unleashing-the-power-of-custom-operators-in-apache-airflow-2f2a44ee53e2

https://amazelimi.tistory.com/entry/Airflow-CustomOperator-%EC%83%9D%EC%84%B1

https://docs.astronomer.io/learn/airflow-importing-custom-hooks-operators