Apache Airflow

[Airflow] DockerOperator

비번변경 2023. 7. 11. 19:11

DockerOperator

Airflow에서 작업을 도커 컨테이너 내에서 실행시켜야 한다면 DockerOperator 사용을 고려해 볼 수 있다. 스케쥴링될 작업을 수행할 때 미리 구성한 도커 이미지로 컨테이너를 생성한 후, 컨테이너 안에서 작업을 수행한다. 

 

이 글에서는 간단히 DockerOperator를 이용한 작업을 수행해보려고 한다.

 

 

패키지 설치

Airflow에서 도커 관련 기능을 사용하기 위해서는 추가로 패키지를 설치해주어야 한다.

pip install apache-airflow-providers-docker

 

 

airflow 사용자에 도커 실행 권한 추가

airflow를 통해 도커 컨테이너를 실행시키기 위해서는 airflow를 실행하는 사용자에게 도커를 실행시킬 수 있는 권한이 부여되어 있어야 한다. 만약 부여되어 있지 않다면 permission denied 에러가 발생한다.

 

권한 부여

usermod -aG docker <airflow_user>

참고 : 2021.06.16 - 일반 사용자에 docker 실행 권한 부여

권한 부여 후에도 docker ps 등의 명령어가 동작하지 않으면 shell을 재접속해본다. 권한 부여가 확인되었어도 airflow 프로세스를 통해서는 권한 관련 에러가 발생할 수 있다. 이 때는 스케쥴러만 재시작해주면 된다.

 

 

모듈 임포트

설치한 패키지에서 제공하는 클래스를 사용할 때는 airflow.providers.docker 패키지를 임포트 한다. 그중 DockerOperator를 사용할 때는 아래와 같이 임포트 한다.

from airflow.providers.docker.operators.docker import DockerOperator

 

 

Task 정의

간단하게 DockerOperator로 whoami 명령을 실행해 본다. 

t2 = DockerOperator(
    task_id='echo_container',
    image='ubuntu:22.04', # 컨테이너 이미지
    container_name='echo_container', # 컨테이너 이름
    auto_remove='force', # 컨테이너 종료 시 컨테이너 삭제 여부. 기본값은 never
    command="whoami", # 컨테이너 실행 명령어
    dag=dag
)

참고로  container_name을 지정하고 별도로 삭제 작업을 하지 않는 경우, container 이름에서 충돌이 발생할 수 있다.

auto_remove를 항상 수행하도록 지정하던지, 아니면 task가 실행될 때마다 container 이름이 다르게 지정되도록 하는 것이 좋아 보인다.

❗ auto_remove 옵션은 never, success, force를 전달할 수 있다.
기존에는 bool 타입의 데이터를 전달할 수 있어, True/False로 지정을 했었다.

 

전체 Dag 코드는 접은글로 적어둔다.

container가 아니라 host에서 whoami를 실행했을 때의 결과를 비교하고자 BashOperator로도 whoami를 실행해 보았다.

더보기
from datetime import timedelta

from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'retries': 1,
    'retry_delay': 30,
}

dag = DAG(
    'docker_test',
    default_args=default_args,
    description='echo "hello, world!"',
    schedule=timedelta(days=1),
)

t1 = BashOperator(
    task_id='echo_host',
    bash_command='whoami',
    dag=dag
)

t2 = DockerOperator(
    task_id='echo_container',
    image='ubuntu:22.04',
    container_name='echo_container',
    auto_remove=True,
    command="whoami",
    hostname='docker_test',
    dag=dag
)
t1 >> t2

 

 

테스트

- Graph

 

- BashOperator

 

- DockerOperator

BashOperator는 Airflow를 실행하는 사용자의 이름이, DockerOperator는 컨테이너 내 사용자의 이름이 출력된 모습을 확인할 수 있다.

 

 

 

참고 문서

https://h-devnote.tistory.com/10

https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/index.html

airflow.providers.docker.operators.docker