Apache Airflow 71

[Airflow] DockerOperator

DockerOperator Airflow에서 작업을 도커 컨테이너 내에서 실행시켜야 한다면 DockerOperator 사용을 고려해 볼 수 있다. 스케쥴링될 작업을 수행할 때 미리 구성한 도커 이미지로 컨테이너를 생성한 후, 컨테이너 안에서 작업을 수행한다. 이 글에서는 간단히 DockerOperator를 이용한 작업을 수행해보려고 한다. 패키지 설치 Airflow에서 도커 관련 기능을 사용하기 위해서는 추가로 패키지를 설치해주어야 한다. pip install apache-airflow-providers-docker airflow 사용자에 도커 실행 권한 추가 airflow를 통해 도커 컨테이너를 실행시키기 위해서는 airflow를 실행하는 사용자에게 도커를 실행시킬 수 있는 권한이 부여되어 있어야 한다..

Apache Airflow 2023.07.11

[Airflow] Dynamic Task Mapping

Dynamic Task Mapping 기존에 Airflow는 DAG가 프로세싱되는 시점에만 Task를 동적으로 생성할 수 있었다. (참고 : 2023.04.12 - [Airflow] Task 반복 생성) 하지만 Airflow 2.3부터는 Dynamic Task Mapping 기능이 도입되면 Runtime에 Task를 동적으로 생성할 수 있게 되었다. 이 글에서는 Dynamic Task Mapping 기능에 대해 알아보려고 한다. Dynamic Task Dynamic Task Mapping 기능은 MapReduce 프로그래밍 모델을 기반으로 한다. Map은 데이터를 연산하기 좋은 형태로 변환하는 작업으로 일반적으로 Trasformation이라고 표현한다. Reduce는 Map을 통해 만들어진 데이터로 연산..

Apache Airflow 2023.06.14

[Airflow] Dag Processing 소요 시간 확인

개요 Airflow Scheduler는 주기적으로 DAG_FOLDER를 읽어 DAG 정의 파일을 읽어 들이고, DAG의 메타데이터를 Airflow meta DB에 저장한다. 스케쥴러와 관련된 프로세스를 찾으면 주기적으로 아래와 같은 DagFileProcessor가 동작하는 모습을 확인할 수 있다. ps -ef | grep scheduler 주기적으로 DAG 정의 파일을 읽어들이므로 DAG 정의 파일을 Processing 하는데 소요되는 시간도 Airflow 성능과 관련이 있다. 이 글에서는 Dag Processing 소요 시간 확인 방법을 적어둔다. DAG 프로세싱 로그 확인 DagFIle을 프로세싱하는 스케쥴러는 관련 정보를 로그로 남기는데, 그 위치는 다음과 같다. $AIRFLOW_HOME/logs/..

Apache Airflow 2023.06.09

[Airflow] @task_group으로 TaskGroup 정의

개요 2023.05.20 - [Airflow] TaskGroup에서 TaskGroup의 개념을 알아보고, 사용방법을 정리했다. 이 글에서는 TaskFlow API로, 즉 @task_group을 이용해 TaskGroup을 정의하는 방법을 정리한다. 예시 dag @task_group를 사용하지 않고 정의한 아래 dag를 데코레이터를 이용해 다시 작성한다. import datetime from airflow import DAG from airflow.utils.task_group import TaskGroup from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator dag = DAG( dag_..

Apache Airflow 2023.05.21

[Airflow] TaskGroup

TaskGroups TaskGroups은 Graph view에서 Task를 계층적인 그룹으로 구성할 때 사용한다. 반복적인 패턴을 만들고 시각적인 혼란을 줄이는데 유용하다. Task 실행 순서 등에는 영향을 미치지 않으면 단순 UI 그룹 개념이다. 이 글에서는 사용 방법을 간단히 정리한다. TaskFlow API를 사용하지 않는 방법으로 소개한다. 예시 dag 먼저 TaskGroup을 사용하지 않은 dag를 살펴본다. import datetime from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator dag = DAG( dag_id="test_..

Apache Airflow 2023.05.20

[Airflow] @task를 이용한 Task 반복 생성

개요 2023.04.12 - [Airflow] Task 반복 생성에서 Operator 생성자를 이용해 task를 반복 생성하는 방법을 알아보고, 2023.04.23 - [Airflow] @task를 이용한 Task 선언에서 TaskFlow API를 이용해 task을 생성하는 방법을 알아보았다. 그렇다면 TaskFlow API를 이용해 task를 반복 생성해야 할 때는 어떻게 해야 할까? 방법을 정리한다. 예시) 생성자 사용 DAG 아래의 생성자를 사용한 dag를 TaskFlow API를 사용해 재정의해본다. from datetime import datetime, timedelta from time import sleep from airflow import DAG from airflow.operators...

Apache Airflow 2023.05.19

[Airflow] retries 설정 무시하고 task 실패 처리

개요 Airflow dag 또는 task 정의 시 retries를 0 이상으로 설정하면 task 실행 중 실패가 발생해도 재실행을 시도한다. 하지만 재시도가 의미 없는 경우가 있을 수 있는데, 이럴 때는 retries 횟수가 남아있더라도 실패로 처리하고 싶다. Airflow Exception을 이용해 처리해 보자. 관련글 : 2023.05.06 - [Airflow] Exception을 이용한 task 스킵 처리 AirflowFailException 재시도 없이 task를 실패해야 할 때 사용하는 예외이다. 아래와 같이 import 하여 사용한다. from airflow.exceptions import AirflowFailException 예시 예시로 사용할 함수는 0부터 2까지의 수 중에서 무작위로 선택..

Apache Airflow 2023.05.07

[Airflow] Exception을 이용한 task 스킵 처리

개요 그동안 Airflow에서 건너뛰어야 하는 Task가 있으면 Task가 실행하기 전에 skip 필요 여부 등을 확인했다. 참고 : 2023.01.03 - [Airflow] ShortCircuitOperator - 조건부 Task 실행 이 글에서는 Airflow Exception을 이용해 실행 중인 task를 skip할 수 있도록 구성해 본다. AirflowSkipException 작업을 건너뛰어야 할 때 사용하는 예외이다. 아래와 같이 import 하여 사용한다. from airflow.exceptions import AirflowSkipException 예시 예시로 사용할 함수는 0부터 2까지의 수 중에서 무작위로 선택한 값이 1일 때 task를 건너뛴다. 만약 조건을 충족하지 않으면 task를 정..

Apache Airflow 2023.05.06

[Airflow 2.x] context 변수 사용하기

개요 기존 작성한 2022.08.19 - [Airflow] XComs 2022.08.11 - [Airflow] PythonOperator 매개변수 전달 에서 PythonOperator를 이용한 task에서 현재 Dag에 대한 정보를 접근할 때 provide_context라는 매개변수에 True 값을 전달했다. t1 = PythonOperator( task_id='print_string', python_callable=print_statement, params = {"name": "passwd", "job": "user"}, provide_context=True, dag=dag, ) 하지만 Airflow 2에서는 더 이상 provide_context를 사용하지 않는다. 참고 : https://airflow..

Apache Airflow 2023.04.28

[Airflow] @task 의존성 설정

TaskFlow 2023.04.22 - [Airflow] with, @dag를 이용한 DAG 선언 2023.04.23 - [Airflow] @task를 이용한 Task 선언 위 두 개 글에서 데코레이터를 이용해 Dag와 Task를 선언하는 방법을 정리했는데, 데코레이터를 이용한 선언 방법은 TaskFlow API를 이용한 방법이다. TaskFlow는 종속성을 자동으로 계산하고 XCom을 이용해 task 간 입출력 이동을 처리한다. 이 글에서는 @task로 생성한 Task의 의존성을 설정하는 방법을 정리한다. 함수 정의 Task로 생성할 함수를 먼저 정의한다. 간단히 데이터를 읽고, 읽은 데이터에서 나이 데이터만 추출해서 총합을 출력하고자 한다. def data_load(): data = [ {"name"..

Apache Airflow 2023.04.26
1 2 3 4 5 6 ··· 8