Airflow 에서 Dag를 삭제한다는 것: 파일 삭제 vs CLI vs UI

요즘 포럼에 어떤 글을 써볼까 컨텐츠 고민을 하고 있었는데요 ㅎㅎ

오늘 오픈 카톡방에 재밌는 컨텐츠가 나왔습니다.
djdl 님께서 dag 삭제의 의의에 대해서 질문을 해주신 것인데요.
코드가 source of truth 이니 reDeploy 하면 다시 Dag가 생성되니 삭제의 의미를 여쭤보신거죠.

정말 재밌는 주제입니다.
Dag 파일을 폴더에서 삭제하는 것이 UI에서 Delete 하는 것과 뭐가 다르지 싶습니다. 하지만 폴더에서만 삭제했을 때, 유령 Dag가 나타날 수 있는데요.

Airflow 내부를 들여다보면 이 둘은 전혀 다른 동작을 한다는 것을 이해할 수 있습니다.

그래서 이번 기회에 포럼에 정리해볼까 합니다.

먼저 Dag의 세 가지 상태에 대해 이해해야 합니다.

“비활성” Dag가 가질 수 있는 상태에 대해서 먼저 보도록 하겠습니다.

관련 코드는 airflow-core/src/airflow/models/dag.py 여기서 보실 수 있습니다.

class DagModel(Base):
    """Table containing DAG properties."""
    ...
    dag_id: Mapped[str] = mapped_column(StringID(), primary_key=True)
    # A DAG can be paused from the UI / DB
    # Set this default value of is_paused based on a configuration value!
    is_paused_at_creation = airflow_conf.getboolean("core", "dags_are_paused_at_creation")
    is_paused: Mapped[bool] = mapped_column(Boolean, default=is_paused_at_creation)
    # Whether that DAG was seen on the last DagBag load
    is_stale: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)

Paused 일시정지

is_paused: Mapped[bool] = mapped_column(Boolean, default=is_paused_at_creation)

UI 나 API에서 토글 할 수 있는 상태입니다. 저희에게 매우 익숙한 상태죠.
Dag 파일은 DAGS_FOLDER에 여전히 존재하고, 스케줄러가 DB에 등록도 해 두었지만, 사용자가 명시적으로 실행을 멈춘 상태입니다. 수동 트리거는 여전히 가능합니다.

Stale 비활성화

is_stale: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)

Dag 파일이 DAGS_FOLDER에서 사라졌을 때, 스케줄러가 이를 감지하고 is_stale = True로 표시합니다. 이것은 "삭제"가 아닙니다. 메타데이터와 실행 이력이 모두 DB에 남아 있으며, 파일을 다시 넣으면 복구됩니다.

Deleted 메타데이터 삭제

UI/API/CLI를 통해 Dag의 모든 메타데이터를 DB에서 완전히 제거하는 것입니다. DagRun, TaskInstance, XCom 등 관련된 모든 레코드가 삭제됩니다.

자 그럼 이 3가지 상태에 대해서 알게 되었으니 각 삭제 방법들에 대해서도 비교해봅시다.

Dag 폴더에서 파일이 삭제되었을 때

1. 스케줄러 감지 메커니즘

Dag 파일을 DAGS_FOLDER에서 삭제하면, 스케줄러의 DagFileProcessorManager가 이를 감지합니다. 두 가지 감지 경로가 있습니다.

경로 A: Bundle 리프레시 시 파일 목록 비교

스케줄러가 Dag Bundle을 리프레시할 때, 현재 존재하는 파일 목록과 DB에 등록된 Dag의 relative_fileloc을 비교합니다.
이 부분에 대해서는 airflow-core/src/airflow/dag_processing/manager.py 여기 deactivate_deleted_dags로 정의 되어 있습니다.

def deactivate_deleted_dags(self, bundle_name: str, present: set[DagFileInfo]) -> None:
    """Deactivate DAGs that come from files that are no longer present in bundle."""

    ...

    rel_filelocs: list[str] = []
    for info in present:
        abs_path = str(info.absolute_path)
        if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
            rel_filelocs.append(str(info.rel_path))
        else:
            for abs_sub_path in find_zipped_dags(abs_path=info.absolute_path):
                rel_sub_path = Path(abs_sub_path).relative_to(info.bundle_path)
                rel_filelocs.append(str(rel_sub_path))

    with create_session() as session:
        any_deactivated = DagModel.deactivate_deleted_dags(
            bundle_name=bundle_name,
            rel_filelocs=rel_filelocs,
            session=session,
        )
        if any_deactivated:
            remove_references_to_deleted_dags(session=session)

여기서 저희가 봐야하는 것은 DagModel.deactivate_deleted_dags() 입니다. 이건 airflow-core/src/airflow/models/dag.py 여기 들어있습니다.

@classmethod
def deactivate_deleted_dags(
    cls,
    bundle_name: str,
    rel_filelocs: list[str],
    session: Session = NEW_SESSION,
) -> bool:
    """
    Set ``is_active=False`` on the DAGs for which the DAG files have been removed.
    """
    dag_models = session.scalars(
        select(cls)
        .where(cls.bundle_name == bundle_name)
        .options(load_only(cls.relative_fileloc, cls.is_stale))
    )

    any_deactivated = False
    for dm in dag_models:
        if dm.relative_fileloc not in rel_filelocs:
            dm.is_stale = True
            any_deactivated = True

    return any_deactivated

좀 길었는데 로직을 정리하자면,

  1. 해당 번들에 속한 DagModel 조회
  2. 현재 존재하는 파일 목록에 없는 Dag → is_stale = true
  3. DagRun, TaskInstance 등 다른 테이블은 건드리지 않음.

경로 B: 파싱 타임아웃 기반 감지

파일이 삭제된 후, 스케줄러가 주기적으로 오래된 Dag을 감지합니다:
airflow-core/src/airflow/dag_processing/manager.py 이 부분을 참고 할 수 있습니다.

def _scan_stale_dags(self):
    """Scan and deactivate DAGs which are no longer present in files."""
    now = time.monotonic()
    elapsed_time_since_refresh = now - self._last_deactivate_stale_dags_time
    if elapsed_time_since_refresh > self.parsing_cleanup_interval:
        last_parsed = {
            file_info: stat.last_finish_time
            for file_info, stat in self._file_stats.items()
            if stat.last_finish_time
        }
        self.deactivate_stale_dags(last_parsed=last_parsed)
        self._last_deactivate_stale_dags_time = time.monotonic()
...

@provide_session
def deactivate_stale_dags(
    self,
    last_parsed: dict[DagFileInfo, datetime | None],
    session: Session = NEW_SESSION,
):
    """Detect and deactivate DAGs which are no longer present in files."""
    to_deactivate = set()
    bundle_names = {b.name for b in self._dag_bundles}
    query = select(
        DagModel.dag_id, DagModel.bundle_name, DagModel.fileloc,
        DagModel.last_parsed_time, DagModel.relative_fileloc,
    ).where(~DagModel.is_stale, DagModel.bundle_name.in_(bundle_names))
    dags_parsed = session.execute(query)

    for dag in dags_parsed:
        file_info = DagFileInfo(rel_path=Path(dag.relative_fileloc), bundle_name=dag.bundle_name)
        if last_finish_time := last_parsed.get(file_info, None):
            if dag.last_parsed_time + timedelta(seconds=self.stale_dag_threshold) < last_finish_time:
                to_deactivate.add(dag.dag_id)

    if to_deactivate:
        session.execute(
            update(DagModel)
            .where(DagModel.dag_id.in_(to_deactivate))
            .values(is_stale=True)
            .execution_options(synchronize_session="fetch")
        )

이 부분을 살펴보면 Dag의 last_parsed_time이 파일의 last_finish_time 보다 stale_dag_threshold 이상 오래되면 자동으로 비활성화 합니다. 그리고 여기도 is_stale만 변경합니다.

그렇다면 이게 왜 필요한가 궁금하실 수도 있을 것 같습니다.
이 부분은 동적으로 생성되는 Dag를 위해서 필요한 기능입니다. 파일 자체가 있더라도 더 이상 Dag를 생성하지 않으면 Stale로 표시하기 위함입니다.

그럼 파일 삭제 시 어떤 것들이 삭제되고 보존되는가?

항목 보존 여부
dag 테이블 레코드 보존 (is_stale=True로 변경)
dag_run 테이블 완전히 보존
task_instance 테이블 완전히 보존
xcom 테이블 완전히 보존
serialized_dag 테이블 보존
dag_code 테이블 보존
dag_version 테이블 보존
log 테이블 완전히 보존
Asset 관련 참조 삭제됨
로컬/원격 로그 파일 완전히 보존

그래서 파일을 다시 넣으면?

위에서 보존되는 것들이 보존되어서 파일을 DAGS_FOLDER에 다시 넣으면, 스케줄러가 파싱할 때 is_stale = False로 자동 복구됩니다

결론은 파일로 삭제한 경우에는 다시 넣어주면 데이터 손실 없이 완전히 복구됩니다.

UI와 API를 통한 삭제

UI와 API는 사실상 서로 같습니다. UI의 삭제 버튼은 DELETE /api/v2/dags/{dag_id} API를 호출합니다. 결국 같은 delete_dag() 함수를 실행합니다.

그럼 delete_dag()에 대해 살펴봅시다.
이 부분은 airflow-core/src/airflow/api/common/delete_dag.py 이 경로에 아래와 같이 있습니다.

@provide_session
def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session = NEW_SESSION) -> int:
    """
    Delete a DAG by a dag_id.

    :param dag_id: the dag_id of the DAG to delete
    :param keep_records_in_log: whether keep records of the given dag_id
        in the Log table in the backend database (for reasons like auditing).
        The default value is True.
    :param session: session used
    :return count of deleted dags
    """
    log.info("Deleting DAG: %s", dag_id)

    running_tis = session.scalar(
        select(models.TaskInstance.state)
        .where(models.TaskInstance.dag_id == dag_id)
        .where(models.TaskInstance.state == TaskInstanceState.RUNNING)
        .limit(1)
    )
    if running_tis:
        raise AirflowException("TaskInstances still running")

    dag = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id).limit(1))
    if dag is None:
        raise DagNotFound(f"Dag id {dag_id} not found")

    models_for_deletion = [TaskInstance, DagRun] + [
        model for model in get_sqla_model_classes()
        if model.__name__ not in ["TaskInstance", "DagRun"]
    ]

    count: int = 0
    for model in models_for_deletion:
        if hasattr(model, "dag_id") and (not keep_records_in_log or model.__name__ != "Log"):
            result: Result = session.execute(
                delete(model)
                .where(model.dag_id == dag_id)
                .execution_options(synchronize_session="fetch")
            )
            cursor_result = cast("CursorResult", result)
            count += cursor_result.rowcount

    session.execute(
        delete(ParseImportError)
        .where(
            ParseImportError.filename == dag.relative_fileloc,
            ParseImportError.bundle_name == dag.bundle_name,
        )
        .execution_options(synchronize_session="fetch")
    )

    return count

단계별로 살펴보자면,

실행 중인 작업이 있는가?

running_tis = session.scalar(
    select(models.TaskInstance.state)
    .where(models.TaskInstance.dag_id == dag_id)
    .where(models.TaskInstance.state == TaskInstanceState.RUNNING)
    .limit(1)
)
if running_tis:
    raise AirflowException("TaskInstances still running")

먼저, 실행 중인 작업이 있는가 확인합니다. 단 하나라도 RUNNING 상태인 TaskInstance가 있으면 예외를 발생시킵니다. 이는 데이터 무결성을 위한 의도된 기능입니다.

삭제 순서

models_for_deletion = [TaskInstance, DagRun] + [
    model for model in get_sqla_model_classes()
    if model.__name__ not in ["TaskInstance", "DagRun"]
]

TaskInstanceDagRun반드시 먼저 삭제됩니다. 이유는 아래와 같은 이유들이 있습니다.

  • DagVersionDagRun을 참조하므로, DagRun이 먼저 삭제되어야 FK 제약 위반이 발생하지 않음
  • BackFillDagRun을 참조하므로, 같은 이유로 DagRun 우선 삭제
  • TaskInstance는 거의 모든 관련 테이블에서 참조됨

이 후에 모든 관련 테이블을 삭제하는 과정을 거칩니다.

그럼 UI와 API에서 삭제 시 아래의 것들이 모두 영향을 받게 됩니다.

삭제 순서 테이블/모델 설명
1순위 task_instance (TaskInstance) 모든 태스크 실행 기록
1순위 dag_run (DagRun) 모든 Dag 실행 기록
2순위 xcom 태스크 간 데이터 전달 기록
2순위 dag(DagModel) Dag 메타데이터 자체
2순위 serialized_dag 직렬화된 Dag 정의
2순위 dag_version Dag 버전 기록
2순위 dag_code Dag 소스 코드 캐시
2순위 rendered_task_instance_fields` 렌더링된 템플릿 필드
2순위 task_map 태스크 매핑 레코드
2순위 dag_warning Dag 관련 경고
2순위 dag_schedule_asset_*_reference Asset 스케줄 참조
2순위 task_outlet_asset_reference 태스크 아웃렛 Asset 참조
2순위 task_inlet_asset_reference 태스크 인렛 Asset 참조
2순위 asset_dag_run_queue Asset 큐 레코드
2순위 backfill 백필 레코드
2순위 deadline_alert 데드라인 알림
삭제x, 보존됨 log(Log) 감사 로그 (기본적으로 보존)
별도 처리 import_error(ParseImportError) filenamebundle_name으로 삭제

Dag 파일이 여전히 존재하는 경우?

UI/API로 메타데이터를 삭제해도 Dag 파일이 DAGS_FOLDER에 여전히 있으면, 스케줄러가 다음 파싱 주기에서 Dag을 다시 등록합니다.
하지만 이때 과거 실행 이력(DagRun, TaskInstance 등)은 이미 삭제되어 복구되지 않습니다.

CLI를 통한 삭제

원석님께서 CLI로 Dag를 삭제한다고 하셨죠 ㅋㅋㅋ
CLI로는 아래와 같이 삭제 할 수 있습니다.

airflow dags delete <dag_id>

CLI도 살펴보면 위에 설명했던 delete_dag() 함수를 호출합니다.
관련 코드는 airflow-core/src/airflow/cli/commands/dag_command.py 여기서 확인할 수 있습니다.

def dag_delete(args) -> None:
    """Delete all DB records related to the specified dag."""
    api_client = get_current_api_client()
    if (
        args.yes
        or input("This will drop all existing records related to the specified DAG. Proceed? (y/n)").upper()
        == "Y"
    ):
        try:
            message = api_client.delete_dag(dag_id=args.dag_id)
            print(message)
        except OSError as err:
            raise AirflowException(err)
    else:
        print("Cancelled")

결론으론 CLI 삭제는 UI/API와 동일한 delete_dag() 함수를 호출합니다.

세 가지 방식의 차이 비교

항목 파일 삭제 (Deactivation) UI/API 삭제 CLI 삭제
트리거 스케줄러 자동 감지 사용자 명시적 요청 사용자 명시적 요청
DB 작업 UPDATE dag SET is_stale=True 모든 관련 테이블 DELETE 모든 관련 테이블 DELETE
DagRun 보존 보존됨 삭제됨 삭제됨
TaskInstance 보존 보존됨 삭제됨 삭제됨
XCom 보존 보존됨 삭제됨 삭제됨
감사 로그 (Log) 보존됨 보존됨 (기본값) 보존됨 (기본값)
Asset 참조 삭제됨 삭제됨 삭제됨
파일 복원 시 복구 완전 복구 실행 이력 복구 불가 실행 이력 복구 불가
로컬 로그 파일 보존됨 보존됨 보존됨
원격 로그 (S3 등) 보존됨 보존됨 보존됨
실행 중 보호 없음 (stale 표시만) 있음 (409 Conflict) 있음 (409 Conflict)

마무리하며

Dag 삭제는 생각보다 매우 복잡한 프로세스를 거칩니다. 그래서 논의도 좀 있었던 걸로 기억합니다ㅎㅎ

정리하자면,

  1. 파일 삭제 ≠ Dag 삭제: 파일을 삭제하면 Dag가 비활성화(stale)될 뿐, 모든 실행 이력은 보존됩니다.
  2. UI/API/CLI 삭제는 동일: 세 가지 모두 같은 delete_dag() 함수를 호출하며, dag_id를 가진 거의 모든 테이블에서 레코드를 삭제합니다.
  3. 로그는 삭제되지 않음: 어떤 방식으로든 로그 파일은 삭제되지 않습니다.
1개의 좋아요
2개의 좋아요