Apache Airflow 71

[Airflow] @task를 이용한 Task 선언

Task 선언 2023.04.22 - [Airflow] with, @dag를 이용한 DAG 선언에서 데코레이터를 이용해 DAG를 생성하는 방법을 살펴봤는데, Task도 데코레이터를 이용해 선언할 수 있다. 다만 지금은 Python, SQL 함수에만 사용할 수 있는 것 같다. 이 글에서는 표준 생성자를 이용해 task가 선언된 DAG를 데코레이터를 이용해 task를 선언해 본다. 기존 선언 방식 기존에는 Task도 표준 생성자를 이용해 선언했다. from datetime import datetime, timedelta from airflow.decorators import dag from airflow.operators.python import PythonOperator from airflow.operat..

Apache Airflow 2023.04.23

[Airflow] with, @dag를 이용한 DAG 선언

DAG 선언 DAG를 선언하는 방법은 세 가지가 있다. 이 블로그에서는 주로 표준 생성자를 사용하여 생성하는데 from datetime import datetime, timedelta from airflow import DAG from airflow.operators.empty import EmptyOperator dag_args = { "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=1), } dag = DAG( dag_id="dag_create_test_with", default_args=dag_args, start_date=datetime(2022, 1, 20), schedule_interval="@once", ) t1 = Em..

Apache Airflow 2023.04.22

[Airflow] Variables를 이용한 동적 DAG 생성

개요 2023.04.15 - [Airflow] Variables - 설정/사용에서 Airflow에서 사용할 전역 변수를 정의하고 사용했다. 이 글에서는 설정한 전역변수를 이용해 동적으로 DAG를 생성한다. 즉, Variables로 설정한 값 목록을 매개변수로 사용해 하나의 DAG 정의 파일로 여러 DAG를 생성해 본다. Variables 설정 2023.04.15 - [Airflow] Variables - 설정/사용을 참조하여 설정한다. 이 글에서는 list_dags라는 키로 dag_id가 될 값들을 개행문자로 구분하여 정의했다. DAG 정의 파일 작성 간단히 dag_id를 출력하는 DAG를 예시로 작성한다. DAG 정의부 Variables에 정의된 list_dags를 이용하기 좋게 DAG와 TASK 정의..

Apache Airflow 2023.04.19

[Airflow] Variables - 설정/사용

Variables Airflow에서 전역적으로 사용할 값을 미리 정의해 두고 공통적으로 사용할 변수를 뜻한다. Airflow Web UI를 통해 정의하거나 JSON 파일을 이용해 대량으로 업로드할 수 있다. 이 글에서는 Variables를 정의하고 사용하는 방법을 정리한다. Variables 설정 1. 상단 메뉴에서 Admin > Variables로 접근 2. +(Add a new record) 버튼 클릭 3. Variable의 키와 값 지정 후 Save 버튼 클릭 값은 텍스트, JSON 등 관계없이 지정할 수 있는 것 같다. 4. 설정값 확인 값이 잘 설정된 것을 확인할 수 있다. 접근 1. airflow.models.Variable import Airflow에 설정된 Variables에 접근하기 위해..

Apache Airflow 2023.04.15

[Airflow] 버전 업그레이드

개요 현재 테스트 서버에 설치되어 있는 Airflow 버전은 2.3.2이고, 2023년 3월 28일 기준 Airflow 최신 버전은 2.5.2이다. 사용하고 있는 버전도 비교적 최신 버전이지만, 아예 최신 안정 버전으로 업그레이드를 해보려고 한다. 방법을 정리한다. 환경 정보 AWS EC2 단일 노드 python 3.8 pip를 통한 설치 META DB : postgreSQL (Docker) LocalExecutor ❗ 운영 환경에서 작업하지 않은 글이므로 참고만 할 것을 당부한다. ❗ 데이터 백업 문제 상황을 대비해 Airflow 관련 데이터를 백업한다. 백업할 사항은 아래 부분 정도가 될 것 같다. Python 패키지 버전 정보 pip freeze > airflow_requirements.txt Ai..

Apache Airflow 2023.04.14

[Airflow] chain - Task 간 의존성 설정

개요 2022.08.03 - [Airflow] Task 간 의존성 설정에서 시프트 연산자와 set_downstream, set_upstream 함수를 이용한 방법을 정리했다. 추가로 chain 함수를 이용한 의존성 설정에 대해 정리한다. chain 주어진 여러 개의 task에 대한 의존성 체인을 만든다. chain(*tasks) chain 함수는 tasks, Labels, XComArg, TaskGroups, 그리고 이러한 유형을 원소로 하는 리스트를 매개변수로 받을 수 있다. 모듈 임포트 아래와 같이 임포트 하여 사용할 수 있다. from airflow.models.baseoperator import chain 사용 예 테스트 DAG 코드는 접은 글로 작성해 둔다. 더보기 Code from dateti..

Apache Airflow 2023.04.13

[Airflow] Task 반복 생성

개요 Airflow DAG는 Python 코드로 정의되기 때문에 반복문을 이용해 task를 생성할 수 있다. 이를 이용하면 매개변수나 설정값만 다른 같은 작업을 Task 내에서 실행하지 않고, 각 Task로 정의하여 실행할 수 있다. 예제로 정리한다. 예제 DAG 아래 코드는 반복문을 이용해 task를 정의한다. from datetime import datetime, timedelta from time import sleep from airflow import DAG from airflow.operators.python import PythonOperator def dump(): sleep(3) dag_args = { "owner": "airflow", "retries": 1, "retry_delay":..

Apache Airflow 2023.04.12

[Airflow] LocalExecutor - Task 병렬 실행

개요 2022.12.13 - [Airflow] Executor Airflow는 기본적으로 작업을 순차적으로 실행하는 SequentialExecutor를 사용한다. 하지만 task 병렬 실행을 지원하지 않아 운영 환경에서는 적합하지 않아 Airflow 공식 문서에서도 변경하여 사용하도록 권장하고 있다. 이 글에서는 task를 병렬 실행할 수 있는 LocalExecutor를 사용한다. 기본값인 SequentialExecutor를 사용할 때와 어떻게 동작이 다른 지도 살펴본다. 테스트 DAG 먼저 task가 병렬로 나열된 DAG를 작성한다. from datetime import datetime, timedelta from time import sleep from airflow import DAG from ai..

Apache Airflow 2023.04.09

[Airflow] Meta DB로 PostgreSQL 연결

개요 Airflow는 기본적으로 Meta DB로 SQLite를 사용한다. 하지만 SQLite는 동시 접속이 불가능해 운영환경에는 적합하지 않아 공식문서에서도 변경하여 사용하기를 권장하고 있다. 보통 PostgreSQL과 MySQL을 사용하는 것 같은데, MySQL보다는 PostgreSQL이 Airflow와 좀 더 잘 맞는다고 한다. 이 글에서는 Airflow Meta DB를 PostgreSQL로 설정해보려고 한다. Airflow는 이미 설치되어 있다고 가정한다. - 2022.07.22 - [Apache Airflow] 설치 / 실행 (버전 2.3) PostgreSQL 설치 2023.04.04 - [Ubuntu] PostgreSQL 설치 및 실행을 작성했었는데…… 해결되지 않는 오류가 발생해서 Docker..

Apache Airflow 2023.04.08

[Airflow] Meta DB로 MySQL 연결

개요 Airflow는 기본적으로 Meta DB로 SQLite를 사용한다. 하지만 SQLite는 동시 접속이 불가능해 운영환경에는 적합하지 않아 공식문서에서도 변경하여 사용하기를 권장하고 있다. 보통 PostgreSQL과 MySQL을 사용하는 것 같은데, 이 글에서는 MySQL을 연결한다. Airflow와 MySQL은 이미 설치되어 있다고 가정한다. MySQL 설정 Airflow가 사용할 데이터베이스와 서비스 계정을 생성한다. 데이터베이스 생성 CREATE DATABASE airflow CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; 서비스 계정 생성 CREATE USER 'airflow' IDENTIFIED BY 'airflow1234'; GRANT ALL PRI..

Apache Airflow 2023.04.06
1 2 3 4 5 6 7 8