개요
2023.07.11 - [Airflow] DockerOperator 에서 살펴본 DockerOperator는 스케쥴링된 작업을 수행할 때 미리 구성해 둔 도커 이미지로 컨테이너를 생성하고, 컨테이너에서 작업을 수행한다. 각 작업이 동작하는데 필요한 환경이 다른 경우 의존성 문제를 해소하기 위해 많이 사용하는 것 같다.
그렇다면 DockerOperator로 실행한 컨테이너에서 작업이 동작하기 위한 소스 코드 등은 어떻게 전달해야 할까?
Volume VS Bind Mount
2023.07.13 - [Docker] Bind Mount
위 두 개 글에서 컨테이너 내에서 생성한 데이터를 호스트에 유지하거나 호스트의 데이터를 컨테이너에 전달하는 방식인 Volume과 Bind Mount에 대해서 살펴보았다. 두 가지 방법은 크게 호스트와 컨테이너가 공유하는 공간이 호스트의 특정 경로(Volume)인지 임의 경로(Bind Mount)인지에 대해 차이점을 가지고 있는데, 프로젝트 유지 보수를 위한 코드 최신화 등의 이유로 Bind Mount 방식이 적절해 보인다.
DockerOperator - mounts 관련 내용
DockerOperator는 기본적으로 호스트에 생성된 임시 디렉터리가 컨테이너에 마운트되어 기본 디스크 사이즈인 10GB를 초과하는 파일을 컨테이너에 저장할 수 있게 한다. 이때 마운트된 디렉터리는 환경 변수 AIRFLOW_TMP_DIR를 통해 접근할 수 있다. 만약 컨테이너 생성 시 볼륨 마운트가 불가능한 상태라면 경고 메시지를 출력하고 임시 폴더를 마운트 하지 않고 도커 명령을 수행하려고 시도한다.
또는 mounts 매개변수를 이용해 Volume을 마운트하거나 Bind Mount 방식으로 컨테이너를 실행시킬 수 있다. mounts 매개변수에는 docker.types.Mount 객체로 이루어진 리스트를 전달해야 한다.
Mount 객체 생성
Mount 객체는 생성자에 마운트할 target 경로, source 경로, 마운트 유형과 read_only 여부 등을 지정하여 생성할 수 있다.
참고 : https://docker-py.readthedocs.io/en/stable/api.html?highlight=mount#docker.types.Mount
from docker.types import Mount
mounts = [
Mount(target='</MOUNT/PATH>', source='</PATH/OF/HOST>', type='bind')
]
Bind Mount 방식의 DockerOperator 사용
Bimd Mount 방식으로 컨테이너를 생성하는 DockerOperator를 생성해 본다.
1. 컨테이너에 전달하여 실행시킬 소스 파일 작성
## $USER_HOME/app_logic/test.py:/app_logic/test.py
print('Hello, world!')
간단히 hello, world를 출력하는 소스를 실행시켜보려고 한다. 호스트의 적당한 위치에 실행시킬 파일을 생성한다. 이 글에서는 사용자 홈 디렉터리/app_logic에 생성했다.
잘 동작한다.
2. Mount 객체 생성
컨테이너에서 실행시킬 소스 파일과 컨테이너 내 경로, mount 유형을 지정하여 객체를 선언한다.
from docker.types import Mount
mounts = [
Mount(target='/app_logic/test.py', source='/home/airflow/app_logic/test.py', type='bind', read_only=True)
]
3. Airflow Task 정의
DockerOperator를 이용해 Task를 정의한다. mounts 매개변수에 선언한 mount 객체를 전달해주어야 한다.
t2 = DockerOperator(
task_id='hello_container',
image='python:3.8.17-slim',
container_name='hello_container',
auto_remove='force',
command=command,
mounts=mounts,
hostname='docker_test',
dag=dag
)
전체 소스는 접은글로 작성한다.
from datetime import timedelta
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from docker.types import Mount
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'retries': 1,
'retry_delay': 30,
}
mounts = [
Mount(target='/app_logic/test.py', source='/home/airflow/app_logic/test.py', type='bind', read_only=True)
]
dag = DAG(
'docker_mount_test',
default_args=default_args,
description='echo "hello, world!"',
schedule=timedelta(days=1),
)
t2 = DockerOperator(
task_id='hello_container',
image='python:3.8.17-slim',
container_name='hello_container',
auto_remove='force',
command='python /app_logic/test.py',
mounts=mounts,
hostname='docker_test',
dag=dag
)
동작 테스트
생성한 작업을 테스트해 본다.
로그
정상적으로 잘 동작해 Host에서 실행했을 때와 동일하게 Hello, world! 를 출력하는 모습을 확인할 수 있다.
💡 만약 CeleryExecutor를 사용한다면
도커 컨테이너를 통해 Celery Worker를 실행 -> Dag에서 시작한 도커 컨테이너 Worker에서 작업 수행 -> 컨테이너에서 실행 중인 Worker를 종료하는 흐름으로 Dag를 구성할 수도 있다.
참고 문서
https://h-devnote.tistory.com/10
https://docker-py.readthedocs.io/en/stable/api.html?highlight=mount#docker.types.Mount