airflow 65

[Airflow] TaskGroup

TaskGroups TaskGroups은 Graph view에서 Task를 계층적인 그룹으로 구성할 때 사용한다. 반복적인 패턴을 만들고 시각적인 혼란을 줄이는데 유용하다. Task 실행 순서 등에는 영향을 미치지 않으면 단순 UI 그룹 개념이다. 이 글에서는 사용 방법을 간단히 정리한다. TaskFlow API를 사용하지 않는 방법으로 소개한다. 예시 dag 먼저 TaskGroup을 사용하지 않은 dag를 살펴본다. import datetime from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator dag = DAG( dag_id="test_..

Apache Airflow 2023.05.20

[Airflow] @task를 이용한 Task 반복 생성

개요 2023.04.12 - [Airflow] Task 반복 생성에서 Operator 생성자를 이용해 task를 반복 생성하는 방법을 알아보고, 2023.04.23 - [Airflow] @task를 이용한 Task 선언에서 TaskFlow API를 이용해 task을 생성하는 방법을 알아보았다. 그렇다면 TaskFlow API를 이용해 task를 반복 생성해야 할 때는 어떻게 해야 할까? 방법을 정리한다. 예시) 생성자 사용 DAG 아래의 생성자를 사용한 dag를 TaskFlow API를 사용해 재정의해본다. from datetime import datetime, timedelta from time import sleep from airflow import DAG from airflow.operators...

Apache Airflow 2023.05.19

[Airflow] retries 설정 무시하고 task 실패 처리

개요 Airflow dag 또는 task 정의 시 retries를 0 이상으로 설정하면 task 실행 중 실패가 발생해도 재실행을 시도한다. 하지만 재시도가 의미 없는 경우가 있을 수 있는데, 이럴 때는 retries 횟수가 남아있더라도 실패로 처리하고 싶다. Airflow Exception을 이용해 처리해 보자. 관련글 : 2023.05.06 - [Airflow] Exception을 이용한 task 스킵 처리 AirflowFailException 재시도 없이 task를 실패해야 할 때 사용하는 예외이다. 아래와 같이 import 하여 사용한다. from airflow.exceptions import AirflowFailException 예시 예시로 사용할 함수는 0부터 2까지의 수 중에서 무작위로 선택..

Apache Airflow 2023.05.07

[Airflow] Exception을 이용한 task 스킵 처리

개요 그동안 Airflow에서 건너뛰어야 하는 Task가 있으면 Task가 실행하기 전에 skip 필요 여부 등을 확인했다. 참고 : 2023.01.03 - [Airflow] ShortCircuitOperator - 조건부 Task 실행 이 글에서는 Airflow Exception을 이용해 실행 중인 task를 skip할 수 있도록 구성해 본다. AirflowSkipException 작업을 건너뛰어야 할 때 사용하는 예외이다. 아래와 같이 import 하여 사용한다. from airflow.exceptions import AirflowSkipException 예시 예시로 사용할 함수는 0부터 2까지의 수 중에서 무작위로 선택한 값이 1일 때 task를 건너뛴다. 만약 조건을 충족하지 않으면 task를 정..

Apache Airflow 2023.05.06

[Airflow 2.x] context 변수 사용하기

개요 기존 작성한 2022.08.19 - [Airflow] XComs 2022.08.11 - [Airflow] PythonOperator 매개변수 전달 에서 PythonOperator를 이용한 task에서 현재 Dag에 대한 정보를 접근할 때 provide_context라는 매개변수에 True 값을 전달했다. t1 = PythonOperator( task_id='print_string', python_callable=print_statement, params = {"name": "passwd", "job": "user"}, provide_context=True, dag=dag, ) 하지만 Airflow 2에서는 더 이상 provide_context를 사용하지 않는다. 참고 : https://airflow..

Apache Airflow 2023.04.28

[Airflow] @task 의존성 설정

TaskFlow 2023.04.22 - [Airflow] with, @dag를 이용한 DAG 선언 2023.04.23 - [Airflow] @task를 이용한 Task 선언 위 두 개 글에서 데코레이터를 이용해 Dag와 Task를 선언하는 방법을 정리했는데, 데코레이터를 이용한 선언 방법은 TaskFlow API를 이용한 방법이다. TaskFlow는 종속성을 자동으로 계산하고 XCom을 이용해 task 간 입출력 이동을 처리한다. 이 글에서는 @task로 생성한 Task의 의존성을 설정하는 방법을 정리한다. 함수 정의 Task로 생성할 함수를 먼저 정의한다. 간단히 데이터를 읽고, 읽은 데이터에서 나이 데이터만 추출해서 총합을 출력하고자 한다. def data_load(): data = [ {"name"..

Apache Airflow 2023.04.26

[Airflow] @task를 이용한 Task 선언

Task 선언 2023.04.22 - [Airflow] with, @dag를 이용한 DAG 선언에서 데코레이터를 이용해 DAG를 생성하는 방법을 살펴봤는데, Task도 데코레이터를 이용해 선언할 수 있다. 다만 지금은 Python, SQL 함수에만 사용할 수 있는 것 같다. 이 글에서는 표준 생성자를 이용해 task가 선언된 DAG를 데코레이터를 이용해 task를 선언해 본다. 기존 선언 방식 기존에는 Task도 표준 생성자를 이용해 선언했다. from datetime import datetime, timedelta from airflow.decorators import dag from airflow.operators.python import PythonOperator from airflow.operat..

Apache Airflow 2023.04.23

[Airflow] with, @dag를 이용한 DAG 선언

DAG 선언 DAG를 선언하는 방법은 세 가지가 있다. 이 블로그에서는 주로 표준 생성자를 사용하여 생성하는데 from datetime import datetime, timedelta from airflow import DAG from airflow.operators.empty import EmptyOperator dag_args = { "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=1), } dag = DAG( dag_id="dag_create_test_with", default_args=dag_args, start_date=datetime(2022, 1, 20), schedule_interval="@once", ) t1 = Em..

Apache Airflow 2023.04.22

[Airflow] Variables를 이용한 동적 DAG 생성

개요 2023.04.15 - [Airflow] Variables - 설정/사용에서 Airflow에서 사용할 전역 변수를 정의하고 사용했다. 이 글에서는 설정한 전역변수를 이용해 동적으로 DAG를 생성한다. 즉, Variables로 설정한 값 목록을 매개변수로 사용해 하나의 DAG 정의 파일로 여러 DAG를 생성해 본다. Variables 설정 2023.04.15 - [Airflow] Variables - 설정/사용을 참조하여 설정한다. 이 글에서는 list_dags라는 키로 dag_id가 될 값들을 개행문자로 구분하여 정의했다. DAG 정의 파일 작성 간단히 dag_id를 출력하는 DAG를 예시로 작성한다. DAG 정의부 Variables에 정의된 list_dags를 이용하기 좋게 DAG와 TASK 정의..

Apache Airflow 2023.04.19

[Airflow] Variables - 설정/사용

Variables Airflow에서 전역적으로 사용할 값을 미리 정의해 두고 공통적으로 사용할 변수를 뜻한다. Airflow Web UI를 통해 정의하거나 JSON 파일을 이용해 대량으로 업로드할 수 있다. 이 글에서는 Variables를 정의하고 사용하는 방법을 정리한다. Variables 설정 1. 상단 메뉴에서 Admin > Variables로 접근 2. +(Add a new record) 버튼 클릭 3. Variable의 키와 값 지정 후 Save 버튼 클릭 값은 텍스트, JSON 등 관계없이 지정할 수 있는 것 같다. 4. 설정값 확인 값이 잘 설정된 것을 확인할 수 있다. 접근 1. airflow.models.Variable import Airflow에 설정된 Variables에 접근하기 위해..

Apache Airflow 2023.04.15
1 2 3 4 5 6 7