Airflow의 callback은 동기인가 비동기인가?

오늘 오픈카톡방에서 에어플로우린이 님께서 질문을 주셨습니다.

“Airflow의 콜백은 동기인가 비동기인가?”

저도 airflow의 2.x의 초반 버전까진 비동기고, 이젠 "동기"라고 생각했습니다.

하지만 gpt형님께서 동기라는 의견에 반론을 하셨다는 군요.
AI vs 인간 느낌으로 저도 확실히 하고 싶어서 한 번 제대로 확인해보기로 했습니다.

결론부터 말씀드리자면, 모든 Callback이 동일하다고 할 수 없고, Task-level과 Dag-level이 완전히 다른 방식으로 동작합니다.

구분 Task-Level Callback Dag-Level Callback
실행 주체 Worker Scheduler
종류 on_execute, on_success, on_failure, on_retry, on_skipped on_success, on_failure
실행 시점 각 task의 생명주기 중 즉시 실행 Dag run 종료 시점
로그 위치 Task 로그 (worker) Scheduler 로그

즉, Task-level은 동기, Dag-level은 비동기입니다.

공식 문서를 먼저 확인

일단 공식 문서를 훑어봤는데 callback에 "동기/비동기"라는 단어가 직접적으로 보이지 않았습니다.(제가 못찾았을 수도 있습니다…ㅠ)

하지만 다음 문장에서 중요한 힌트를 얻을 수 있습니다.

Callback functions are executed after tasks are completed. Errors in callback functions will show up in scheduler logs rather than task logs. By default, scheduler logs do not show up in the UI and instead can be found in $AIRFLOW_HOME/logs/scheduler/latest/DAG_FILE.py.log

Scheduler 로그에 찍힌다는 뜻은 callback이 worker 내부에서 돌아가는 것이 아니라 scheduler 쪽에서 실행될 수 있음을 의심해볼 수 있습니다.

하지만 아직 확신하긴 어렵습니다. 이렇게 결론내면 좀 불편하죠.
callback의 동기/비동기를 검색으로 찾지 못했으니 확실히 하려면 몸빵을 해야합니다..ㅠ
그래서 소스코드를 분석하는 단계로 넘어갑니다.

Dag-Level Callback – “비동기”

그럼 scheduler는 어떻게 Dag의 성공과 실패를 알고 callback 함수를 실행될 수 있을까?

Dag가 성공/실패했을 때 callback이 어떻게 실행되는지 airflow-core/src/airflow/models/dagrun.pyupdate_state() 코드를 보면 힌트를 얻을 수 있었습니다.

@provide_session
    def update_state(
        self, session: Session = NEW_SESSION, execute_callbacks: bool = True
    ) -> tuple[list[TI], DagCallbackRequest | None]:
        # Callback to execute in case of Task Failures
        callback: DagCallbackRequest | None = None

        ...

        if not unfinished.tis and any(x.state in State.failed_states for x in tis_for_dagrun_state):
            self.log.info("Marking run %s failed", self)
            self.set_state(DagRunState.FAILED)
            self.notify_dagrun_state_changed(msg="task_failure")

            if execute_callbacks and dag.has_on_failure_callback:
                self.handle_dag_callback(...)
            elif dag.has_on_failure_callback:
                callback = DagCallbackRequest(...)

여기서 중요한 점은 Scheduler가 TaskInstance들의 상태를 모아서 DagRun 상태를 판단하고, 콜백 실행 요청을 DagCallbackRequest 형태로 큐에 적재한다는 것입니다.

이 뒤의 흐름은,

  1. airflow-core/src/airflow/jobs/scheduler_job_runner.pyexecutor.send_callback(callback) executor에게 callback 전달
  2. excutor는 airflow-core/src/airflow/executors/base_executor.pyairflow-core/src/airflow/callbacks/database_callback_sink.py 를 통해서 callback을 DB에 저장
  3. airflow-core/src/airflow/dag_processing/manager.py_fetch_callbacks()를 통해서 큐를 조회
  4. 최종적으로 airflow-core/src/airflow/dag_processing/processor.py_execute_callbacks()에서 실행되는 것을 볼 수 있습니다.

즉 Dag-level Callback은 Task가 실행되는 Worker와 분리된 비동기 처리입니다.

그런데 동기처럼 보였던 Callback은 무엇인가?

그러면 한 가지 또 불편함이 생깁니다.
왜 task 1에 콜백을 넣고 task 2를 했을 때 블로킹되는 것으로 보이는 것일까?

예를 들어:

  • task A에 on_success_callback 넣고
  • callback 안에서 sleep(10)을 넣으면
  • task B는 A의 callback이 끝난 뒤에 실행됩니다

이건 우리가 확인한 callback이라면 어딘가 이상합니다.
그렇다면, 한 가지 추가 가설을 세울 수 있습니다. Dag에서의 Callback과 Task에서의 Callback은 서로 다르게 구현된건가?

Task-level Callback – “동기”

확인해보니 Dag-level callback과 Task-level callback은 구현이 따로 되어 있습니다.

Task-level Callback의 작동 방식은 task-sdk/src/airflow/sdk/execution_time/task_runner.py 이곳을 확인하면 힌트를 얻을 수 있습니다.

def _execute_task(context: Context, ti: RuntimeTaskInstance, log: Logger):
    """Execute Task (optionally with a Timeout) and push Xcom results."""
    task = ti.task
    execute = task.execute

    ...

    _run_task_state_change_callbacks(task, "on_execute_callback", context, log)

def _run_task_state_change_callbacks(
    task: BaseOperator,
    kind: Literal[
        "on_execute_callback",
        "on_failure_callback",
        "on_success_callback",
        "on_retry_callback",
        "on_skipped_callback",
    ],
    context: Context,
    log: Logger,
) -> None:
    callback: Callable[[Context], None]
    for i, callback in enumerate(getattr(task, kind)):
        try:
            create_executable_runner(callback, context_get_outlet_events(context), logger=log).run(context)
        except Exception:
            log.exception("Failed to run task callback", kind=kind, index=i, callback=callback)

task-level callback은 task_runner 프로세스 내부에서 실행되며, 블로킹 방식으로 콜백 완료 후 다음 task를 진행합니다.
즉, task-level callback은 동기로 실행됩니다.

이후의 callback 실행 과정은 Dag-level callback에서 설명한 내용과 같습니다.

최종 결론!

Task-level Callback

  • 동기
  • Task Runner(Worker) 내부에서 즉시 실행
  • callback이 끝나야 task 실행 루틴이 계속됨

Dag-level Callback

  • 비동기
  • Scheduler Dag 상태 업데이트 후 별도로 실행
  • worker 실행과 독립적

마무리

결국 gpt형님과 저, 둘 다 틀렸었군요ㅋㅋㅋㅋㅋ 다행(?)
질문해주신 에어플로우린이 님께 감사드리며, 덕분에 저도 좋은 정리를 할 수 있었습니다! :wink:

3개의 좋아요