airflow 73

[Airflow] Task 반복 생성

개요 Airflow DAG는 Python 코드로 정의되기 때문에 반복문을 이용해 task를 생성할 수 있다. 이를 이용하면 매개변수나 설정값만 다른 같은 작업을 Task 내에서 실행하지 않고, 각 Task로 정의하여 실행할 수 있다. 예제로 정리한다. 예제 DAG 아래 코드는 반복문을 이용해 task를 정의한다. from datetime import datetime, timedelta from time import sleep from airflow import DAG from airflow.operators.python import PythonOperator def dump(): sleep(3) dag_args = { "owner": "airflow", "retries": 1, "retry_delay":..

Apache Airflow 2023.04.12

[Airflow] LocalExecutor - Task 병렬 실행

개요 2022.12.13 - [Airflow] Executor Airflow는 기본적으로 작업을 순차적으로 실행하는 SequentialExecutor를 사용한다. 하지만 task 병렬 실행을 지원하지 않아 운영 환경에서는 적합하지 않아 Airflow 공식 문서에서도 변경하여 사용하도록 권장하고 있다. 이 글에서는 task를 병렬 실행할 수 있는 LocalExecutor를 사용한다. 기본값인 SequentialExecutor를 사용할 때와 어떻게 동작이 다른 지도 살펴본다. 테스트 DAG 먼저 task가 병렬로 나열된 DAG를 작성한다. from datetime import datetime, timedelta from time import sleep from airflow import DAG from ai..

Apache Airflow 2023.04.09

[Airflow] Meta DB로 PostgreSQL 연결

개요 Airflow는 기본적으로 Meta DB로 SQLite를 사용한다. 하지만 SQLite는 동시 접속이 불가능해 운영환경에는 적합하지 않아 공식문서에서도 변경하여 사용하기를 권장하고 있다. 보통 PostgreSQL과 MySQL을 사용하는 것 같은데, MySQL보다는 PostgreSQL이 Airflow와 좀 더 잘 맞는다고 한다. 이 글에서는 Airflow Meta DB를 PostgreSQL로 설정해보려고 한다. Airflow는 이미 설치되어 있다고 가정한다. - 2022.07.22 - [Apache Airflow] 설치 / 실행 (버전 2.3) PostgreSQL 설치 2023.04.04 - [Ubuntu] PostgreSQL 설치 및 실행을 작성했었는데…… 해결되지 않는 오류가 발생해서 Docker..

Apache Airflow 2023.04.08

[Airflow] Meta DB로 MySQL 연결

개요 Airflow는 기본적으로 Meta DB로 SQLite를 사용한다. 하지만 SQLite는 동시 접속이 불가능해 운영환경에는 적합하지 않아 공식문서에서도 변경하여 사용하기를 권장하고 있다. 보통 PostgreSQL과 MySQL을 사용하는 것 같은데, 이 글에서는 MySQL을 연결한다. Airflow와 MySQL은 이미 설치되어 있다고 가정한다. MySQL 설정 Airflow가 사용할 데이터베이스와 서비스 계정을 생성한다. 데이터베이스 생성 CREATE DATABASE airflow CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; 서비스 계정 생성 CREATE USER 'airflow' IDENTIFIED BY 'airflow1234'; GRANT ALL PRI..

Apache Airflow 2023.04.06

[Airflow] 기본 View 형식 변경

개요 2022.07.22 - [Apache Airflow] 설치 / 실행 (버전 2.3)을 통해 Airflow를 설치하고 DAG 화면을 보면, View 형식이 아래와 같다. Airflow 2.3부터 tree 형식이 grid 형식으로 대체되었기 때문인데, 기본 View 형식 지정 방법을 적어둔다. dag_default_view Airflow Webserver 설정 중 하나로, 기본 DAG 뷰를 지정한다. 유효한 값은 grid, graph, duration, gantt, landing_times이며, 기본값은 grid이다. Airflow 2.2.5까지는 tree, graph, duration, gantt, landing_times 값이 유효했다. view 형식 변경 방법 Airflow 구성 파일인 airfl..

Apache Airflow 2023.03.31

[Airflow] is_pause와 is_active

개요 Airflow META DB에서 dag 테이블을 살펴보면 is_paused와 is_active 컬럼을 확인할 수 있다. 두 컬럼 모두 DAG의 동작 여부와 관련된 컬럼인 것 같은데 두 컬럼의 차이는 무엇일까? is_paused DAG가 On 상태인지 Off 상태인지를 나타내는 속성이다. is_paused가 1이면 Off 상태이고, 0이면 On 상태이다. is_active 기본값은 1이다. 하지만 DAG 정의 파일이 삭제되거나 import에 실패하면 0으로 설정된다. is_active가 0으로 설정되면 UI 또는 스케쥴링 작업에서 필터링되며, DAG 소스 파일이 삭제되어도 Meta DB에서 관련 데이터가 삭제되지 않도록 한다. 참고 문서 https://forum.astronomer.io/t/is-pa..

Apache Airflow 2023.01.20

[Airflow] 자주 쓰는 CLI

개요 업무 중 Airflow를 많이 다루는데, 웹 서버에 접속하지 못하는 상태이다. 그렇다고 업무를 놓을 수는 없으니 Airflow 사용에 필요한 CLI 명령을 정리한다. DAG 목록 확인 어떤 파일로 DAG가 생성되었는지, 소유자와 ON/OFF(pause) 상태를 확인할 수 있다. airflow dags list pause가 True이면 OFF, False이면 ON 상태이다. DAG 구조 확인 DAG가 어떤 Task로 구성되어 있고, Task 간 의존성은 어떤지 확인한다. airflow dags show schedule_test Dag의 Task 목록 확인 DAG에 정의된 Task 목록을 확인한다. 트리 형태로 확인하고 싶을 때는 -t 옵션을 주어 실행한다. airflow tasks list # -t,..

Apache Airflow 2023.01.18

[Airflow] n번째 특정 요일마다 스케쥴하기

개요 매 달 또는 분기별 첫 번째 주 월요일에 DAG 스케쥴링 설정을 하고자 한다. 즉, 2021.06.24 - crontab - 매달 N번째 특정 요일에 실행시키기와 동일한 스케쥴링을 Airflow에서 설정해보려고 한다. Airflow DAG를 스케쥴링할 때는 cron 표현식을 사용하므로 속성과 값의 범위, 특수 문자를 이용해 스케쥴링할 것이다. cron 표현식 Airflow DAG를 스케쥴링할 때는 cron 표현식을 사용할 수 있다. 속성 필수 유효 값 유효 특수 문자 비고 분 O 0–59 * , - 시간 O 0–23 * , - 날짜 O 1–31 * , - ? L W 일부 구현에서만 '?', 'L', 'W' 허용 월 O 1–12 또는 JAN–DEC * , - 요일 O 0–6 또는 SUN–SAT * ,..

Apache Airflow 2023.01.15

[Airflow] 트리거 규칙 (Trigger Rule)

트리거 규칙 기본적으로 Airflow는 모든 상위 Task이 성공해야 다음 Task를 실행한다. 다만 필요한 경우 task 정의 시 trigger_rule 변수를 이용해 Task 실행 규칙을 지정할 수 있다. 옵션 trigger_rule로 지정할 수 있는 값은 다음과 같다. 값 동작 방식 all_success 모든 상위 Task 실행 성공 all_failed 모든 상위 Task가 실행 실패, 또는 upstream_failed 상태 all_done 모든 상위 Task 실행 완료 one_failed 하나 이상의 상위 Task 실패. 모든 상위 Task의 실행 완료를 대기하지 않는다. one_success 하나 이상의 상위 Task 성공. 모든 상위 Task의 실행 완료를 대기하지 않는다. none_faile..

Apache Airflow 2023.01.10

[Airflow] BranchDayOfWeekOperator - 실행 요일에 따른 분기

BranchDayOfWeekOperator 2023.01.03 - [Airflow] ShortCircuitOperator - 조건부 Task 실행에서 간단히 소개한 작업을 분기하는 오퍼레이터 중 하나이다. 실행한 날짜의 요일과 지정한 요일의 일치 여부에 따라 실행할 Task를 결정한다. import BranchDayOfWeekOperator는 아래와 같이 import 하여 사용할 수 있다. from airflow.operators.weekday import BranchDayOfWeekOperator from airflow.utils.weekday import WeekDay WeekDay 모듈은 요일 지정 시 사용한다. Task 정의 BranchDayOfWeekOperator로 Task를 정의할 때는 조건..

Apache Airflow 2023.01.04
1 2 3 4 5 6 7 8