개요
Airflow에서는 오퍼레이터를 통해 여러 기능을 제공하고 있다. 만약 원하는 기능을 제공하는 오퍼레이터가 없다면 개발자가 직접 구현하여 사용할 수 있다.
이 글에서는 사용자 정의 오퍼레이터를 구현하여 테스트해보려고 한다. 구현할 기능은 문자열을 입력받고, 문자열을 반환하는 정도로 한다.
사용자 정의 오퍼레이터
사용자 정의 오퍼레이터를 만들기 위해서는 아래 조건을 만족해야 한다.
- BaseOperator 상속
- 생성자 작성 : 오퍼레이터에 필요한 매개변수를 정의한다.
- execute 함수 작성 : Executor가 오퍼레이터를 호출할 때 실행할 코드를 작성한다.
순서대로 차근차근 작성해 보도록 하겠다.
BaseOperator 상속
상속받을 BaseOperator가 정의된 라이브러리를 임포트 한다.
from airflow.models.baseoperator import BaseOperator
그리고 MyOperator에 BaseOperator를 상속받도록 한다.
class MyOperator(BaseOperator):
pass
생성자 작성
Dag가 파싱 될 때 실행되는 생성자 __init__ 함수를 정의한다. __init__ 함수에서는 사용자 정의 오퍼레이터에서 필요한 매개변수를 정의하고, BaseOperator의 생성자가 동작해야 한다.
from airflow.models.baseoperator import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = name
execute 함수 작성
이제 오퍼레이터로 Task를 만들어 실행했을 때 동작하는 execute 함수를 작성한다. execute 함수에는 Airflow의 context가 항상 전달되어야 한다. 때문에 매개변수에 context라는 이름의 변수가 항상 포함되어 있어야 한다.
def execute(self, context):
# Airflow task 로깅
self.log.info(f'name: {self.name}')
return f'Hello, {self.name}'
기본적으로 execute 함수의 반환값은 XCom으로 push 된다.
기능 구현을 완료한 오퍼레이터는 airflow dags_folder 아래에 위치시킨다. 필요시 사용자 정의 오퍼레이터를 모아두기 위한 경로를 생성하여 저장해도 된다. 이 글에서는 dags_folder에 사용자 정의 오퍼레이터를 모아둘 custom_operator라는 경로를 생성하여 저장해 두었다.
전체 코드는 접은 글로 작성해 둔다.
from airflow.models.baseoperator import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = name
def execute(self, context):
self.log.info(f'name: {self.name}')
return f'Hello, {self.name}'
사용자 정의 오퍼레이터 사용
이제 위에서 구현한 HelloOperator를 사용하여 Task를 생성한다.
from airflow import DAG
from custom_operator.hello_operator import HelloOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': '2023-01-01',
'retries': 1,
'retry_delay': 30,
'catchup': False
}
dag = DAG(
'dag_test_custom_operator',
default_args=default_args,
schedule='0 0 * * *',
catchup=False
)
task_sensor = HelloOperator(
task_id='hello_task',
name='Passwd',
dag=dag
)
Airflow 상에서도 직접 구현한 HelloOperator를 사용한 Task임을 확인할 수 있다. Dag를 켜서 실행시켜 보면 정상적으로 잘 동작한다.
Task 로그에서 지정한 형식으로 로그가 출력된 모습을 확인할 수 있고, 반환값은 XCom 목록에서 확인할 수 있다.
참고 문서
https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html#
https://amazelimi.tistory.com/entry/Airflow-CustomOperator-%EC%83%9D%EC%84%B1
https://docs.astronomer.io/learn/airflow-importing-custom-hooks-operators