2024/04 22

[Airflow] ExternalTaskSensor - 다른 dag/task 작업 완료 대기

개요 Airflow에서는 여러 작업 간의 종속성을 설정하여 Task Flow을 구성할 수 있는데, 종속성을 설정할 작업이 서로 다른 Dag에 속해있는 경우가 있을 수 있다. 이런 경우 Airflow에서 제공하는 Sensor Operator인 ExternalTaskSensor의 사용을 고려해 볼 수 있다. 이 글에서는 ExternalTaskSensor의 사용법 정도를 간단히 적어둔다. ExternalTaskSensor ExternalTaskSensor는 특정 logical_date(execution_date)에 대한 다른 dag, task group, task가 완료되기를 기다리는 Operator이다. airflow.sensors.external_task.ExternalTaskSensor(*, extern..

Apache Airflow 2024.04.30

[기록] 데이터 재처리/복구용 Airflow Dag 개발

개요Airflow에는 데이터를 재처리할 수 있는 방법으로 clear, backfill 등의 기능을 제공하고 있다. 다만 단순 clear로 재처리하는 경우에는 이전 시점의 데이터를 처리하기 어렵고, backfill은 forground로 실행되며 로그로 모니터링을 하기 위해서는 리다이렉션 등을 통해 별도로 출력을 저장해야 한다. 또 업무에서 사용하고 있는 airflow 환경에서 backfill 명령이 동작하지 않는다는 치명적인 문제도 있었다…….이런저런 이유로 업무에서 사용하고 있는 airflow 환경에서는 recovery_dag(데이터 재처리를 위한 dag)를 생성하는 dag 정의 파일을 개발하여 사용하고 있었는데 몇 가지 문제점이 발견되었다.어떤 문제점이 있는지, 그리고 어떻게 개선했는지를 정리해 둔다...

[Pandas] update - 다른 데이터프레임 값으로 수정

개요두 개의 다른 데이터프레임이 있을 때, 한 데이터프레임의 컬럼의 데이터프레임의 컬럼 값으로 수정하고 싶다. 확인해 보니 Pandas에서 제공하는 update 함수를 이용할 수 있는 것 같다.사용 방법을 적어둔다.  DataFrame.updateupdate는 데이터프레임의 컬럼을 다른 DataFrame의 컬럼으로 덮어씌운다. 단 결측치가 아닌 값만 사용하며. 반환값 없이 원본이 변경된다.인덱스와 컬럼이 동일한 셀의 값으로 데이터프레임을 수정하기 때문에, 두 데이터프레임 간에는 같은 이름의 인덱스와 컬럼이 존재해야 한다. 만약 시리즈를 사용한다면 name 속성을 사용해 컬럼 이름을 동일하게 맞추어야 한다.DataFrame.update(other, join='left', overwrite=True, fil..

[GitLab] 프로젝트 ID 확인하기

개요GitLab API를 이용해 프로젝트에 대한 작업이 필요한데, 일반적으로 프로젝트에 대한 작업은 프로젝트 ID를 지정하게끔 되어 있는 것 같다. 이 글에서는 GitLab 프로젝트 ID를 확인하는 방법에 대해 적어둔다.  웹에서 확인프로젝트 ID는 웹에서 간단하게 확인할 수 있다. 버전에 따라 위치는 다르지만 GitLab 프로젝트의 메인 페이지에서 확인할 수 있다.또는 프로젝트의 General Settings 페이지에서 확인할 수도 있다.   API로 확인GitLab API를 통해서도 확인할 수 있다.사실 이 글에서 기록해두려고 했던 바가 이 방법이다. 조금 더 정확하게는 프로젝트의 네임스페이스와 이름을 가지고 프로젝트 ID를 찾는 방법을 적어둔다.https:///api/v4/groups//projec..

Git | GitLab 2024.04.25

[작업 기록] Single AZ -> Multi AZ 구성 변경

개요 AWS 환경에서 운영하는 서비스 중 하나가 Single AZ로 구성되어 있는 것을 알게 되었다. 인프라 관련 담당자에 의하면 기존에 IP 대역이 부족해서 단일로 구성했었고, 이후에 IP 대역 추가 작업이 있었다고 한다. 이에 따라 예기치 않은 사고로 서비스 장애가 발생할 가능성을 줄이기 위해 Multi AZ로 구성을 변경하려고 한다. 업무 역할 상 AWS 구성을 직접 관리하지는 못했으나…… 비슷한 작업을 할 때 참고할 수 있도록 어떤 순서로 진행했는지 기록해둔다. + 참고로 동작하고 있던 서비스는 Airflow와 Tomcat(API 서버)이다. 구성도 정확하진 않지만…… 기존 구성도는 대략 다음과 같다. 운영계이므로 Public Subnet과 Database 쪽 Subnet은 HA로 구성되어 있었지..

기타 2024.04.24

[Pandas] apply 시 컬럼의 값에 따라 다른 함수 사용하기

개요 Pandas에서 제공하는 apply 함수는 매개변수로 전달받은 함수를 적용하는 함수이다. 당연히 하나의 함수를 열이나 행에 적용하는데…… 각 데이터의 값에 따라 적용해야 하는 함수가 달라져야 하는 필요가 생겼다. 지금부터 다음의 문제 상황을 맞이했다고 해보자. 업무에서 처리되는 데이터는 S3에 다음과 같은 경로에 떨어진다. 's3://test-bucket/summary/titanic/class=First/date=2024-03-28/27b3ce6245b44ff0950e98419089e66c-0.parquet' 's3://test-bucket/summary/titanic/class=Second/date=2024-03-28/27b3ce6245b44ff0950e98419089e66c-0.parquet' ..

[Pandas] 데이터프레임의 다른 컬럼을 참조하여 문자열 치환

개요 기존에 처리해서 S3에 저장한 데이터를 새로 처리해 새로운 경로에 저장하고 있다. 예로 들어 기존 데이터가 저장된 S3 경로가 아래와 같다면, s3://test-bucket/summary/titanic/class=First/27b3ce6245b44ff0950e98419089e66c-0.parquet' s3://test-bucket/summary/titanic/class=Second/27b3ce6245b44ff0950e98419089e66c-0.parquet' s3://test-bucket/summary/titanic/class=Third/27b3ce6245b44ff0950e98419089e66c-0.parquet' s3://test-bucket/raw/iris/species=setosa/2770df..

[Airflow] airflow_db_cleanup Dag 사용하기

개요 2023.08.08 - [Airflow] DB Clean에서 Airflow Meta DB를 관리하는 두 가지 방법을 소개했었다. airflow_db_cleanup Dag 사용 airflow db clean 명령 사용 기존 글에서는 airflow 명령어를 사용한 방법에 초점을 맞췄는데, 이번 글에서는 airflow_db_cleanup Dag를 사용한 방법에 초점을 맞춰서 정리해보려고 한다. airflow_db_cleanup airflow_db_cleanup Dag는 https://github.com/teamclairvoyant/airflow-maintenance-dags 에서 개발되었다. 하지만 저장소 이력을 보면 전혀 관리가 되어 있지 않은 상태로 보이는데, 좀 더 찾아보니 GCP의 https://..

Apache Airflow 2024.04.19

[Airflow] DagFileProcessor - DAG 파일 처리

개요 Airflow 스케쥴러의 DagFileProcessor는 DAG 정의 파일을 처리해 JSON 형식으로 직렬화한 후, Airflow MetaData DB SerializedDagModel에 저장한다. 이 작업은 일정 주기마다 반복적으로 수행되는 것으로 보이는데, 최근에 관련 설정을 확인하게 되어서 기록할 겸 DAG 파일 처리에 대해서 정리한다. DAG 파일 처리 DAG 파일 처리는 dag_folder에 저장된 Dag 정의 파일을 DAG 객체로 변환하는 작업으로, 아래의 두 개 프로세스가 동작하여 수행한다. - DagFileProcessorManager : 무한 루프로 실행되는 프로세스로, 처리해야 할 파일을 결정한다. - DagFileProcessorProcess : 개별 파일을 DAG 객체로 변환한..

Apache Airflow 2024.04.18

[AWS] Amazon Managed Workflows for Apache Airflow 란

MWAA란? Amazon Managed Workflows for Apache Airflow란 Apache Airflow에 대한 AWS 관리형 오케스트레이션 서비스로 MWAA라고 칭해진다. MWAA는 관리형 서비스이기 때문에 확장성, 가용성 등 인프라 관리에 대한 고민 없이 Airflow를 사용할 수 있다. 즉, 요구 리소스에 맞춰 워크플로우 용량을 확장하므로 AWS 클라우드 환경에서 데이터 파이프라인을 대규모를 설정하고 운영하는데 적합하다. 이 글에서는 Apache Airflow에 대한 기본 지식은 있다고 가정하고 작성했다. 장점 MWAA를 사용하면 다음과 같은 장점을 가질 수 있다. - 자동 Airflow 설정 : MWAA를 사용하면 환경 생성 시 필요한 Airflow 버전을 선택하여 환경을 설정할 수..

AWS 2024.04.17
1 2 3