Apache Airflow/삽질

[Airflow] backfill - KeyError: TaskInstanceKey 에러 발생 시

비번변경 2022. 11. 24. 23:26

현상

이전 시점의 데이터를 다시 처리해야 하는 일이 발생해서 2022.11.14 - [Airflow] backfill - 스케쥴 시점이 지나간 DAG 실행하기에서 알아봤던 backfill을 시도했다. 하지만 예상과 다르게 아래와 같이 KeyError: TaskInstanceKey 에러가 발생하며 backfill에 실패했다.

Traceback (most recent call last):
  File "/opt/conda/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/opt/conda/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/opt/conda/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/cli/commands/dag_command.py", line 103, in dag_backfill
    dag.run(
  File "/opt/conda/lib/python3.8/site-packages/airflow/models/dag.py", line 1701, in run
    job.run()
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 237, in run
    self._execute()
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 799, in _execute
    self._execute_for_run_dates(
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 722, in _execute_for_run_dates
    processed_dag_run_dates = self._process_backfill_task_instances(
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 620, in _process_backfill_task_instances
    self._update_counters(ti_status=ti_status)
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 211, in _update_counters
    ti_status.running.pop(key)
KeyError: TaskInstanceKey(dag_id='dag_id', task_id='task_name', execution_date=datetime.datetime(2020, 12, 15, 0, 0, tzinfo=Timezone('UTC')), try_number=2)

스케쥴러에 의해 실행되고 있는 DAG였기 때문에 DAG에 문제가 있는 것은 아닌 것 같다.

 

이런 경우 backfill 대신 사용할 수 있는 방법을 정리해둔다.

 

 

airflow dags test

Airflow는 test 명령어도 실제로 DAG run을 생성하여 실행한다. 그러므로 test 명령어로 DAG run을 생성할 수 있는지 확인해볼 수 있다. 

airflow tasks test [dag_id] [execution_date]

# 예시
airflow dags test \
    test_dag \
    2022-10-11T00:00:00+00:00

 

 

airflow dags trigger

또는 2022.11.23 - [Airflow] Trigger 에서 정리한 Trigger 명령을 사용해도 좋다.

airflow dags trigger [-h] [-c CONF] [-e EXEC_DATE] [-r RUN_ID] [-S SUBDIR]
                     dag_id
                     
# 예시
airflow dags trigger \
    -e 2022-10-08T02:00:00 \
    -r scheduled__2022-10-08T02:00:00+00:00 \
    test_dag

 

 

--donot_pickle 옵션을 주어 backfilling

또는 backill 수행 시 -x, --donot_pickle 옵션을 주어 실행한다.

airflow dags backfill \
    --start-date 2022-10-02T14:00:00 \
    --end-date 2022-10-05 \
    -x test_dag

 

 

참고 문서

https://github.com/apache/airflow/issues/13322

https://stackoverflow.com/questions/44116213/airflow-backfill-job-failing-even-though-test-works-fine