Apache Airflow/삽질

[기록] 데이터 재처리/복구용 Airflow Dag 개발

비번변경 2024. 4. 29. 22:49

개요

Airflow에는 데이터를 재처리할 수 있는 방법으로 clear, backfill 등의 기능을 제공하고 있다. 다만 단순 clear로 재처리하는 경우에는 이전 시점의 데이터를 처리하기 어렵고, backfill은 forground로 실행되며 로그로 모니터링을 하기 위해서는 리다이렉션 등을 통해 별도로 출력을 저장해야 한다. 또 업무에서 사용하고 있는 airflow 환경에서 backfill 명령이 동작하지 않는다는 치명적인 문제도 있었다…….

이런저런 이유로 업무에서 사용하고 있는 airflow 환경에서는 recovery_dag(데이터 재처리를 위한 dag)를 생성하는 dag 정의 파일을 개발하여 사용하고 있었는데 몇 가지 문제점이 발견되었다.

어떤 문제점이 있는지, 그리고 어떻게 개선했는지를 정리해 둔다. 회사 정책 상 소스를 공개할 수가 없어서 글로 나마 기록해 둔다.

 

 

기본 동작 방식

recovery_dag 생성 방식은 2023.04.19 - [Airflow] Variables를 이용한 동적 DAG 생성과 동일하다.

Variables에 재처리할 dag_id, 재처리할 데이터 기간(start_date, end_date) 등의 정보를 저장하고,

ref_hourly, 2024-01-01 00:00:00, 2024-01-31 00:00:00
ref_hourly, 2024-04-01 10:00:00, 2024-04-02 10:00:00
oven_daily, 2024-01-01 00:00:00, 2024-01-31 00:00:00

해당 값을 파싱 하여 recovery_dag를 생성 및 처리하는 방식이다.

재처리한 데이터는 '{dag_id}_{summary_date}.parquet' 형식의 이름으로 저장된다. 재처리한 데이터 파일은 재처리용 output_path에 임시로 저장해 두었다가, 재처리가 전부 완료되면 기존 output_path로 이동시킨다. 

 

 

문제점 / 원인 파악

1. Variables에 지정한 데이터 재처리 기간과 recovery_dag에서의 데이터 재처리 기간에 차이가 존재한다.

예로 들어 Variables에 설정한 start_date, end_date가 2024-04-02 03:00:00, 2024-04-04 03:00:00이라면 recovery_dag의 start_date, end_date는 2024-04-02 00:00:00, 2024-04-04 00:00:00으로 변경되어 있었다.

=> 확인해 보니 내부에서 start_date, end_date를 schedule_interval 단위로 나눌 때, 시간/분을 replace 하는 부분이 존재했다.

 

2. recovery_dag 동작 중 데이터 재처리 기간이 schedule_interval 단위로 나뉘지 않는다.

업무에서 사용하는 Airflow는 Dag의 schedule_interval을 기준으로 summary_date(데이터 처리 시점)을 결정한다. 따라서 데이터 처리 기간을 schedule_interval 간격으로 나뉘어야 하는데, 실제로 나뉜 결과가 기댓값과 다른 문제가 있었다.

=> 확인해 보니 schedule_interval을 기준으로 데이터 재처리 기간을 나누지 않고, Dag의 동작 설정 중 입력 데이터의 윈도 사이즈 설정값을 기준으로 데이터 재처리 기간을 나누고 있었다.

 

3. summary_date(데이터 처리 시점)을 결정하는 연산이 적용되지 않는다.

 

4. 데이터 재처리하는 도중 발생한 오류로 인해 retry 될 때, start_date부터 다시 처리를 시작한다.

recovery_dag가 동작하는 도중 오류가 발생하면 retry로 인해 처리를 재시도하는데, 이때 항상 start_date부터 다시 처리한다. 예로 들어 데이터 재처리 기간이 2024-03-02 00:00:00 ~ 2024-04-02 00:00:00이고 summary_date가 2024-03-15 00:00:00일 때의 데이터를 처리하다가 오류가 발생했다면, summary_date가 2024-03-15 00:00:00일 때부터 다시 처리되는 것이 아니라 start_date인 2024-03-02 00:00:00 시점의 데이터부터 다시 처리하는 문제가 있었다.

즉, 처리 시간과 비용 관점에서 효율적이지 않은 상태였다.

 

5. 결과 데이터가 파일로 저장되는 경우, 기존 데이터를 삭제하고 신규 데이터로 복사하는 과정에서 데이터 재처리 기간 외의 데이터도 삭제된다.

처리한 결과 데이터는 데이터의 생성 시점을 의미하는 CRT_DATE 컬럼을 기준으로 파티셔닝 되어 저장된다. 예로 들어 summary_date가 2024-04-02 00:00:00인데 결과 데이터의 CRT_DATE의 범위가 2024-04-01 00:00:00 ~ 2024-04-02 00:00:00이라면, 결과 데이터는 date=2024-04-01/, date=2024-04-02/ 두 개 경로에 나뉘어서 저장된다.

그런데 recovery_dag 동작 중 기존 데이터를 삭제할 때 date 파티션 내에 있는 데이터를 모두 삭제하도록 구현되어 있어  데이터 재처리 기간의 직전/직후 데이터에 영향을 주고 있었다.

 

7. 기존 데이터를 삭제하고 신규 데이터로 복사하는 과정에서 잘못된 경로로 복사한다.

=> 기존 recovery_dag를 개발한 이후 데이터 저장 관련 신규 기능이 적용되지 않는 상태였다. 기존에는 하나의 Dag에 하나의 output_path만을 허용했는데, 현재는 여러 개의 output_path를 허용한다.

 

8. recovery_dag로 output_path가 DB 테이블인 경우를 처리하지 못한다.

처리 결과를 저장하는 DB 테이블은 Dag 로직의 id(logic_id)와 summary_date를 포함한 몇 개의 컬럼을 PK로 하고 있다. 따라서 데이터 재처리 전에 처리되었던 기존 데이터에 의해 duplicate entry 에러가 발생하는 문제가 있었다.

 

9. 같은 Dag에 대해 병렬 처리 불가

recovery_dag의 dag_id는 Variables에 저장한 dag_id를 기반으로 생성하고, 그 형식은 'recovery_{dag_id}'이다. 그리고 dag_id는 중복을 허용하지 않기 때문에 Dag 하나당 하나의 recovery_dag만을 생성할 수 있다. 즉, 데이터 재처리 기간이 길어도 recovery_dag를 여러 개 생성하여 병렬로 실행시키지 못하는 문제가 있었다.

 

10. 신규로 추가된 DockerOperator를 사용한 처리 미지원

=> 기존 recovery_dag를 개발한 이후 새로 추가된 기능이 적용되지 않는 상태였다.

 

 

> 결론

잘못 구현된 부분이 존재할 뿐만 아니라 현재 운영 환경에 맞춰 관리되고 있지 않아 새로 개발하기로 결정했다!

 

 

구현 목표

열거된 문제점을 개선하는 것 외에 달리 고려할 사항을 생각해보았다.

 

1. 기존 recovery_dag와 동일한 사용법과 동작 방식을 유지한다.

사용자의 혼란을 방지하기 위해 새로 개발하는 recovery_dag도 Airflow Variables에 필요한 설정을 입력하여 사용할 수 있도록 한다. Varibles에 입력하는 설정값 종류와 형식도 동일하게 유지한다.

 

2. 공통으로 사용하는 기존 dag 생성 함수(generate_dag)를 사용해 recovery_dag를 생성한다.

recovery_dag의 task dependency를 기존 dag와 동일하게 생성할 수 있도록 하기 위함이다. 데이터 재처리/복구 목적이므로 기존 dag의 동작 방식을 그대로 따르는 편이 좋겠다고 생각했다.

 

추가로 아예 처음부터 개발할 필요는 없다고 생각해서…… 기존에 데이터 재처리/복구 시 사용하던 코드를 전부 참고하여 개발하는 방향으로 진행했다.

 

 

개선점

1. summary_date 연산 개선

recovery_dag도 일반 Dag와 동일한 summary_date 연산을 적용하는 것을 목표로 했다.

먼저 Variables에 지정하는 start_date, end_date를 dag가 스케줄링되는 기간을 의미하는 것으로 개념을 재정의했다.

Variables에 저장된 start_date, end_date와 Dag의 schedule_interval을 사용하여 스케줄 목록을 구한다. (참고 : 2023.02.13 - [Python] cron schedule로 datetime 구하기)

from croniter import croniter_range

schedule_conf = '0 0 * * *'
end_date = datetime.now()
start_date = end_date - 10 * timedelta(days=1)
list_schedules = list(croniter_range(start_date, end_date, schedule_conf))

그리고 스케줄 목록에 summary_date 연산을 적용하여, summary_date 목록을 구하는 방식으로 구현했다.

 

2. 처리 실패 시 실패한 summary_date부터 재시작하도록 수정

recovery_dag 동작 중 오류가 발생한 summary_date를 구하여 그 이전 summary_date는 제외하는 것을 목표로 했다.

처음에는 output_path의 데이터 처리 결과를 확인하는 방향으로 진행하려고 했는데 구현이 쉽지 않았다. 때문에 Dag 동작 시 로그에 남은 summary_date 값을 활용하는 쪽으로 방향을 바꾸었다. recovery_dag 직전 처리 task 로그 파일을 읽어 들여 가장 마지막 summary_date부터 처리하도록 구현했다.

recovery_dag 직전 로그 파일은 recovery_dag 로그 파일 목록에서 가장 마지막에 수정된 파일로 판단한다. 단, recovery_dag 로그 파일 목록에 현재 실행 로그 파일도 포함되어 있기 때문에 airflow context를 활용하여 목록에서 제외하는 작업이 필요했다.

airflow_log_path = '/opt/airflow/logs'

context = get_current_context()
dag_id = context['dag_run'].dag_id
run_id = context['dag_run'].run_id
task_id = context['ti'].task_id
attempt = context['ti'].try_number

log_file_path = os.path.join(airflow_log_path, f'dag_id={dag_id}', '*', f'task_id={task_id}', '*')
log_files = [f for f in glob.glob(log_file_path) if f != f'/opt/airflow/logs/dag_id={dag_id}/run_id={run_id}/task_id={task_id}/attempt={attempt}.log']
if log_files:
     last_log_file = max(log_files, key=os.path.getctime)

이 작업으로 이미 완료한 데이터 재처리 작업을 중복으로 수행하지 않아 소요 시간을 많이 개선할 수 있었다.

 

 

3. 재처리 데이터 파일을 기존 output_path에 복사하는 기능 삭제

문제가 되는 부분이 기존 데이터를 삭제하는 과정에서 발생하기 때문에 처음에는 recovery_dag에서 기존 데이터 삭제 기능만 제거하려고 했다.

하지만 Dag의 schedule_interval이 재처리 수행 시점에 변경되는 경우가 있어, 데이터를 삭제하지 않으면 기존 데이터가 output_path에 남아있는 상황이 발생할 수 있었다. 데이터 재처리 시 재처리용 output_path 대신 기존 output_path를 사용해도 동일한 문제가 있었다.

데이터 재처리 시점 외의 데이터에 영향을 주지 않으면서 schedule_interval이 변경되기 이전의 데이터만 삭제할 수 있는 마땅한 방법을 찾지 못했고, 사람이 직접 처리해도 크게 번거롭거나 문제가 발생하지는 않은 것 같아 해당 기능은 삭제했다. 

 

4. 처리 결과가 DB 테이블에 저장되는 경우도 처리할 수 있도록 구현

PK 중복으로 인한 오류를 피하기 위해 데이터를 재처리하기 전에 기존에 DB 테이블에 저장되어 있던 데이터의 logic_id를 정해진 값으로 업데이트한 후, 재처리를 완료한 후 업데이트했던 데이터를 삭제하는 방식으로 구현했다.

단, 데이터를 업데이트할 때는 recovery_dag가 retry 되어 실패했던 summary_date부터 다시 실행하는 상황을 고려하여 recovery_dag가 동작하면서 결정되는 summary_date 목록 범위의 데이터를 업데이트하도록 해야 했다.

재처리를 완료한 후 데이터를 삭제할 때는 DB 테이블에서 logic_id가 업데이트했던 값이고, summary_date가 전체 summary_date 목록 범위에 해당되는 행만 삭제하도록 구현했다. 

 

5. 같은 Dag에 대해 병렬 처리 기능 추가

같은 dag에 대해 recovery_dag를 생성할 때 dag_id를 다르게 생성할 수 있도록 recovery_dag의 dag_id 형식을 'recovery_{dag_id}_{dag_id 등장 횟수}'로 변경했다. 예로 들어 Variables에 지정된 설정이 다음과 같다면,

ref_hourly, 2024-01-01 00:00:00, 2024-01-31 00:00:00
ref_hourly, 2024-04-01 10:00:00, 2024-04-02 10:00:00
oven_daily, 2024-01-01 00:00:00, 2024-01-31 00:00:00

recovery_dag의 dag_id는 다음과 같이 생성되도록 했다.

recovery_ref_hourly_0
recovery_ref_hourly_1
recovery_ref_daily_0

구현은 2023.12.11 - 중복/반복 가능한 데이터에서 데이터의 등장 순서 확인에 정리를 해두었다.

 

6. DockerOperator를 사용한 처리 추가

해당 부분은 Dag 정의 파일을 참고하여 기능을 추가했다.

 

 

결과

구현 및 반영을 완료하고 다음과 같은 결과를 얻을 수 있었다.

 

1. 사용자가 데이터 재처리/복구 처리를 위한 새로운 방식을 익힐 필요 없이 데이터를 재처리할 수 있었다.

2. 재처리 중간에 오류가 발생했을 때, 오류가 발생한 summary_date부터 다시 처리하기 시작하여 총 처리에 소요된 시간과 비용을 아낄 수 있었다.

3. 새로 개발된 기능을 데이터 재처리/복구 작업 시에도 동일하게 적용할 수 있게 되었다.

4. 데이터 재처리 작업도 병렬로 수행할 수 있게 되어 복구에 필요한 시간을 절약할 수 있게 되었다.

5. summary_date 연산 방식을 개선하여 기존 처리 결과와 재처리 결과에 로직 변경점 외의 지점에서 차이를 없앨 수 있었다.

 

사실 이 작업은 크게 세 번(기존 recovery_dag 수정 시도, recovery_dag 신규 개발, recovery_dag 개선)에 걸쳐서 진행되었는데…… 덕분에 Airflow에 구현된 데이터 처리부에 대한 이해를 크게 높일 수 있었다. 굳굳.