Apache Airflow

[Airflow] DockerOperator - Bind Mount

비번변경 2023. 7. 14. 17:43

개요

2023.07.11 - [Airflow] DockerOperator 에서 살펴본 DockerOperator는 스케쥴링된 작업을 수행할 때 미리 구성해 둔 도커 이미지로 컨테이너를 생성하고, 컨테이너에서 작업을 수행한다. 각 작업이 동작하는데 필요한 환경이 다른 경우 의존성 문제를 해소하기 위해 많이 사용하는 것 같다.

 

그렇다면 DockerOperator로 실행한 컨테이너에서 작업이 동작하기 위한 소스 코드 등은 어떻게 전달해야 할까?

 

 

Volume VS Bind Mount

2023.07.12 - [Docker] Volume

2023.07.13 - [Docker] Bind Mount

위 두 개 글에서 컨테이너 내에서 생성한 데이터를 호스트에 유지하거나 호스트의 데이터를 컨테이너에 전달하는 방식인 Volume과 Bind Mount에 대해서 살펴보았다. 두 가지 방법은 크게 호스트와 컨테이너가 공유하는 공간이 호스트의 특정 경로(Volume)인지 임의 경로(Bind Mount)인지에 대해 차이점을 가지고 있는데, 프로젝트 유지 보수를 위한 코드 최신화 등의 이유로 Bind Mount 방식이 적절해 보인다.

 

 

DockerOperator - mounts 관련 내용

DockerOperator는 기본적으로 호스트에 생성된 임시 디렉터리가 컨테이너에 마운트되어 기본 디스크 사이즈인 10GB를 초과하는 파일을 컨테이너에 저장할 수 있게 한다. 이때 마운트된 디렉터리는 환경 변수 AIRFLOW_TMP_DIR를 통해 접근할 수 있다. 만약 컨테이너 생성 시 볼륨 마운트가 불가능한 상태라면 경고 메시지를 출력하고 임시 폴더를 마운트 하지 않고 도커 명령을 수행하려고 시도한다.

또는 mounts 매개변수를 이용해 Volume을 마운트하거나 Bind Mount 방식으로 컨테이너를 실행시킬 수 있다. mounts 매개변수에는 docker.types.Mount 객체로 이루어진 리스트를 전달해야 한다.

 

 

Mount 객체 생성

Mount 객체는 생성자에 마운트할 target 경로, source 경로, 마운트 유형과 read_only 여부 등을 지정하여 생성할 수 있다.

 

참고 : https://docker-py.readthedocs.io/en/stable/api.html?highlight=mount#docker.types.Mount 

from docker.types import Mount

mounts = [
    Mount(target='</MOUNT/PATH>', source='</PATH/OF/HOST>', type='bind')
]

 

 

Bind Mount 방식의 DockerOperator 사용

Bimd Mount 방식으로 컨테이너를 생성하는 DockerOperator를 생성해 본다.

 

1. 컨테이너에 전달하여 실행시킬 소스 파일 작성

## $USER_HOME/app_logic/test.py:/app_logic/test.py

print('Hello, world!')

간단히 hello, world를 출력하는 소스를 실행시켜보려고 한다. 호스트의 적당한 위치에 실행시킬 파일을 생성한다. 이 글에서는 사용자 홈 디렉터리/app_logic에 생성했다.

잘 동작한다.

 

2. Mount 객체 생성

컨테이너에서 실행시킬 소스 파일과 컨테이너 내 경로, mount 유형을 지정하여 객체를 선언한다.

from docker.types import Mount

mounts = [
    Mount(target='/app_logic/test.py', source='/home/airflow/app_logic/test.py', type='bind', read_only=True)
]

 

3. Airflow Task 정의

DockerOperator를 이용해 Task를 정의한다. mounts 매개변수에 선언한 mount 객체를 전달해주어야 한다.

t2 = DockerOperator(
    task_id='hello_container',
    image='python:3.8.17-slim',
    container_name='hello_container',
    auto_remove='force',
    command=command,
    mounts=mounts,
    hostname='docker_test',
    dag=dag
)

전체 소스는 접은글로 작성한다.

더보기
from datetime import timedelta

from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago

from docker.types import Mount

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'retries': 1,
    'retry_delay': 30,
}
mounts = [
    Mount(target='/app_logic/test.py', source='/home/airflow/app_logic/test.py', type='bind', read_only=True)
]

dag = DAG(
    'docker_mount_test',
    default_args=default_args,
    description='echo "hello, world!"',
    schedule=timedelta(days=1),
)

t2 = DockerOperator(
    task_id='hello_container',
    image='python:3.8.17-slim',
    container_name='hello_container',
    auto_remove='force',
    command='python /app_logic/test.py',
    mounts=mounts,
    hostname='docker_test',
    dag=dag
)

 

 

동작 테스트

생성한 작업을 테스트해 본다. 

 

로그

정상적으로 잘 동작해 Host에서 실행했을 때와 동일하게 Hello, world! 를 출력하는 모습을 확인할 수 있다.

 

💡 만약 CeleryExecutor를 사용한다면
도커 컨테이너를 통해 Celery Worker를 실행 -> Dag에서 시작한 도커 컨테이너 Worker에서 작업 수행 -> 컨테이너에서 실행 중인 Worker를 종료하는 흐름으로 Dag를 구성할 수도 있다.

 

 

참고 문서

https://h-devnote.tistory.com/10

https://docker-py.readthedocs.io/en/stable/api.html?highlight=mount#docker.types.Mount