안녕하세요. 오늘은 Dag Owner의 정확성과 직결되는 중요한 버그를 수정한 PR #52216 에 대해 이야기 해보려고 합니다.
Airflow에서 Dag의 Onwer를 "owner": "owner1, owner2, owner3"
와 같이 정의하는 건 매우 흔한 패턴이죠. 쉼표 뒤에 공백을 넣는 건 우리에게 너무나 자연스러운 습관입니다.
하지만 바로 이 ‘공백’ 하나가 제 PR 이전에선 큰 문제를 일으켰습니다.
Airflow는 이 문자열을 우리가 기대하는 ["owner1", "owner2", "owner3"]
가 아닌, 공백이 포함된 ["owner1", " owner2", " owner3"]
로 잘못 인식했습니다.
두 번째와 세 번째 Owner 이름 앞에 불필요한 공백이 그대로 포함된 것입니다.
이 작은 차이가 Owner 기반 Dag 필터링, owner_links 같은 기능들을 모두 오작동시키는 원인이었습니다.
기존 파싱의 문제점
문제는 FastAPI를 통해 Dag의 정보를 제공하는 데이터 모델, airflow/api_fastapi/core_api/datamodels/dags.py
의 DAGResponse
클래스에 있었습니다.
class DAGResponse(BaseModel):
"""DAG serializer for responses."""
model_config = ConfigDict(
alias_generator=AliasGenerator(
validation_alias=lambda field_name: DAG_ALIAS_MAPPING.get(field_name, field_name),
),
)
dag_id: str
dag_display_name: str
is_paused: bool
is_stale: bool
last_parsed_time: datetime | None
last_expired: datetime | None
bundle_name: str | None
bundle_version: str | None
relative_fileloc: str | None
fileloc: str
description: str | None
deadline: list[DeadlineAlertResponse] | None
timetable_summary: str | None
timetable_description: str | None
tags: list[DagTagResponse]
max_active_tasks: int
max_active_runs: int | None
max_consecutive_failed_dag_runs: int
has_task_concurrency_limits: bool
has_import_errors: bool
next_dagrun_logical_date: datetime | None
next_dagrun_data_interval_start: datetime | None
next_dagrun_data_interval_end: datetime | None
next_dagrun_run_after: datetime | None
owners: list[str]
@field_validator("owners", mode="before")
@classmethod
def get_owners(cls, v: Any) -> list[str] | None:
"""Convert owners attribute to DAG representation."""
if not (v is None or isinstance(v, str)):
return v
if v is None:
return []
if isinstance(v, str):
return v.split(",")
return v
마지막에 owner를 가지고 오는 부분을 살펴보면, 아래와 같이 되어있습니다.
@field_validator("owners", mode="before")
@classmethod
def get_owners(cls, v: Any) -> list[str] | None:
"""Convert owners attribute to DAG representation."""
if not (v is None or isinstance(v, str)):
return v
if v is None:
return []
if isinstance(v, str):
return v.split(",")
return v
문제의 핵심은 v.split(",")
코드였습니다. 이 코드는 단순히 쉼표를 기준으로 문자열을 나눌 뿐, 각 원소의 앞뒤에 있는 공백까지 세심하게 처리해주지는 못했습니다.
문제 해결을 위한 간단한 개선
기존의 v.split(",")
코드를 [x.strip() for x in v.split(",")]
로 변경했습니다.
.strip()
은 문자열 양 끝의 모든 공백(스페이스, 탭, 개행 등)을 말끔히 제거해주는 기능을 합니다.
@field_validator("owners", mode="before")
@classmethod
def get_owners(cls, v: Any) -> list[str] | None:
"""Convert owners attribute to DAG representation."""
if not (v is None or isinstance(v, str)):
return v
if v is None:
return []
if isinstance(v, str):
return [x.strip() for x in v.split(",")]
return v
이제 Airflow는 Owner 문자열에 포함된 불필요한 공백을 모두 제거하고 언제나 정확한 Owner 리스트를 반환하게 되었습니다.