airflow 86

[Airflow] TaskFlow - XCOM 값 반환 시 이름 붙이기

개요2023.04.09-[Airflow] @task 의존성 설정에서 TaskFlow는 태스크 간 종속성을 자동으로 계산하고, XCom을 통해 입출력을 처리한다고 정리했다. 기본적으로 TaskFlow를 사용하면 XCom 키는 return_value로 고정되며, 여러 값을 반환하더라도 튜플 형태로 묶여 return_value 하나에 저장된다.그런데 기존의 수동 XCom Push/Pull 방식과 호환되는 코드를 작성해야 하는 상황이 생겼다. 특정 키 이름으로 값을 push 해야 하위 태스크가 그 이름으로 pull 할 수 있기 때문에, return_value 대신 이름 있는 키로 값을 반환하는 방법이 필요했다. 이 글에서는 TaskFlow DAG에서 multiple_outputs을 활용해 키 이름을 포함한 값을..

Apache Airflow 2026.04.14

[Airflow] Connections에 옵션 추가

개요2025.08.25-[Airflow] Connections이란에서 Airflow와 외부 시스템 간의 연동을 위한 연결정보를 저장하는 Connection에 대해서 알아보았다. 평소에는 연결에 필요한 필수 정보 정도만 지정하고 사용하지만, 필요시에는 기타 추가적인 옵션도 지정할 필요가 있을 수 있다. 이번 글에서는 그런 부분에 대해 추가로 작성한다. extra이전 글에서 extra가 하는 역할이 무엇인가 싶었는데, 이 extra가 바로 추가적인 구성 정보를 지정하는 속성이었다. extra는 json 형식으로 지정하려는 속성의 키와 값을 넣어주면 된다.이 연결 정보는 URI 형식으로 확인할 수도 있는데 아래와 같은 형식이다.[ { "id": "5", "conn_id": "test", ..

Apache Airflow 2026.01.27

[Airflow] git-sync - fatal: Unable to create '/git/.git/worktrees//index.lock': File exists.

개요출근하고 운영 중인 Airflow을 점검했더니 스케쥴러가 동작하지 않고 있었다. 이 Airflow는 AWS EKS 내에서 동작하고 있는데, 확인해 보니 스케쥴러 포드가 Init:CrashLoopBackOff 상태를 지속하고 있었다.원인을 확인하고 문제를 해결한 방법을 적어둔다. 원인 파악- Pods Event 확인먼저 포드의 이벤트를 확인하고자 describe를 했다.kubectl describe pods airflow-schedulergit-sync-init가 재시작한다는 정보를 얻을 수 있었다. git-sync-init 컨테이너가 원인일 가능성이 높아졌다. - 컨테이너 로그 확인git-sync-init의 동작 현황을 보고자 로그를 확인한다.kubectl logs airflow-scheduler..

Apache Airflow 2026.01.22

[Airflow] Hook - 외부 연동 인터페이스

개요2025.08.25-[Airflow] Connections이란에서 Airflow 상에서 외부와 연동할 때 사용하는 연결 정보를 저장하는 개념인 Connections를 살펴보았었다. 그리고 저장한 연결 정보를 사용하는 방법은 따로 정리를 안 해두었었는데…… 보통 Hook으로 많이 사용하는 것 같다.이번 글에서는 Connections를 활용하는 Hook에 대해서 적어두려고 한다. HookAirflow에서 Hook이란 외부 플랫폼, 데이터베이스에 대한 고수준 인터페이스다. API를 호출하거나 특정한 라이브러리를 사용하는 저수준 코드를 작성할 필요 없이 외부 시스템과 연동할 수 있도록 도와준다.Hook은 Connections으로부터 자격 정보를 얻어서 외부 시스템과 연동한다. 모든 Hook은 BaseHo..

Apache Airflow 2026.01.06

[Airflow] k8s Git-Sync란

개요여러 EC2 인스턴스에 걸쳐서 동작하던 Airflow 환경에서는 Dag 폴더를 EFS에 두는 방법으로 서버 간 Dag 정의 파일을 동기화했다. 그리고 최근 마주한 쿠버네티스 환경에서 동작하는 Airflow 환경에선 git-sync라는 프로세스로 포드 간 Dag 정의 파일을 동기화하고 있었는데, 관련해서 정리해보려고 한다. Git-Sync일반적으로 Airflow on Kubernetes에서 DAG 파일을 어떻게 배포하느냐는 사실상 아래 선택지 중 하나를 택하게 된다. 이미지에 DAG 포함 (immutable image)공유 스토리지 (NFS, PVC, S3/GCS + sync)Git 기반 pull 모델 (git-sync)CI/CD에서 kubectl apply로 ConfigMap 생성여기서 git-sy..

Apache Airflow 2025.12.24

[Airflow] PythonDecoratorOperator 적용 함수 직접 실행

개요Airflow 2부터는 TaskFlow API가 도입되어 데코레이터를 사용하면 Python 함수가 Airflow Task로 기능하게 된다. 다만 단점이라면 기존에는 테스트를 위해 Task가 호출하는 함수를 직접 실행할 수 있던 반면, 테코레이터를 사용하면 함수를 호출해도 동작을 확인할 수 없다.이번 글에서는 데코레이터를 적용한 함수를 직접 실행하는 방법을 적어둔다. 현상예로 들어 다음과 같이 데코레이터를 활용한 Dag 정의 파일이 있다고 하자.from datetime import datetime, timedeltafrom airflow.decorators import dag, taskdag_args = { "owner": "airflow", "retries": 1, "retry_de..

Apache Airflow 2025.09.19

[Airflow] Connection 테스트 활성화

개요보통 소프트웨어에서 다른 소프트웨어와 연동할 때, 대표적으로 데이터베이스와 연동하는 경우 접속 테스트 기능을 통해 테스트를 하곤 한다. 그런데 Airflow는 연결 테스트 기능은 제공하지만 기본적으로 비활성화 상태로 제공하는 것 같다.연결 테스트 기능을 활성화하는 방법을 적어둔다. TEST_CONNECTION공식 문서를 찾아보니 보안 상의 이유로 테스트 연결 기능은 비활성화한 상태로 제공된다고 한다. 연결 테스트 기능을 활성화하고 싶다면 airflow.cfg 파일을 수정하거나 환경 변수를 수정하는 방법이 있다. 제어하는 변수의 이름은 test_connection이며 아래의 값을 허용한다.- Disable : (기본값) 연결 테스트 기능을 비활성화하고 UI에서 연결 테스트 버튼을 비활성화한다. - E..

Apache Airflow 2025.09.16

[Airflow] Connections이란

개요새로운 Airflow 환경을 사용하게 되었는데, 이번 환경에서는 Connections라는 기능을 사용하고 있어 어떤 기능인지 정리해보려고 한다. ConnectionsAirflow는 다른 시스템과 통신하기 위한 자격 증명 등을 저장하는 Connections라는 개념을 사용한다. 다른 시스템에서 데이터를 가져오거나 내보내기 위해 사용하며, 기본적으로 사용자 이름, 비밀번호, 호스트, 연결 시스템 유형, conn_id 등의 속성으로 구성되어 있다.Connection은 Web UI 또는 CLI를 통해 관리할 수 있고, Hooks나 jinja templetes을 통해 사용할 수 있다. Connections 생성이번 글에서는 Web UI를 통해 신규 Connection을 생성해본다. 연결할 시스템은 Post..

Apache Airflow 2025.08.25

[Airflow] dag_id 중복 확인하기

개요Airflow에서 dag_id는 유일해야 하기 때문에 dag_id에 중복이 발생하면 의도하지 않게 동작할 수 있다. 하지만 Dag의 오류 사항을 확인할 수 있는 Web UI에서는 dag_id에 중복 발생 여부를 확인할 수 없어 디버깅하기 조금 힘든데, cli를 활용하면 바로 확인할 수 있는 것을 알게 되어 적어둔다. 확인 방법dag_id 중복 시 발생하는 예외는 AirflowDagDuplicatedIdException이다. 즉, 2024.11.30-[Airflow] Dag Import Error 확인하기에서 확인한 airflow cli의 list-import-errors 명령어를 사용하면 dag_id의 중복 발생 여부를 확인할 수 있다. 참고로 Github에서 명령어의 동작 방식을 확인해 보니 DA..

Apache Airflow 2025.07.16

[Airflow] DagFileProcessorManager (PID=) last sent a heartbeat 51.22 seconds ago! Restarting it

현상업무를 하던 중, Dag 파싱이 정상적으로 이루어지지 않는 현상을 확인했다. Airflow Web UI에서 Dag의 코드를 확인하면 언제 파싱이 이루어졌는지 확인할 수 있는데, 확인 당시 기준 2일 전에 마지막으로 Dag 파싱이 이뤄지고 더 이상 파싱이 이뤄지지 않는 상태였다.원인을 파악하고 문제를 해결한 이력을 적어둔다. 원인먼저 Dag 파싱을 담당하는 DagFileProcessor의 로그가 존재하는지 확인했는데, 각 Dag 정의 파일의 파싱 이력 정도만 찾고 별도의 로그 파일은 확인하지 못했다.하지만 스케쥴러 로그 파일에서 아래와 같은 로그가 반복적으로 발생하고 있는 것을 확인했다.DagFileProcessorManager (PID=) last sent a heartbeat 51.22 seco..

Apache Airflow 2025.06.16