[PR 리뷰] Airflow Dag Owner 파싱 오류 수정

안녕하세요. 오늘은 Dag Owner의 정확성과 직결되는 중요한 버그를 수정한 PR #52216 에 대해 이야기 해보려고 합니다.

Airflow에서 Dag의 Onwer를 "owner": "owner1, owner2, owner3" 와 같이 정의하는 건 매우 흔한 패턴이죠. 쉼표 뒤에 공백을 넣는 건 우리에게 너무나 자연스러운 습관입니다.

하지만 바로 이 ‘공백’ 하나가 제 PR 이전에선 큰 문제를 일으켰습니다.

Airflow는 이 문자열을 우리가 기대하는 ["owner1", "owner2", "owner3"] 가 아닌, 공백이 포함된 ["owner1", " owner2", " owner3"] 로 잘못 인식했습니다.
두 번째와 세 번째 Owner 이름 앞에 불필요한 공백이 그대로 포함된 것입니다.
이 작은 차이가 Owner 기반 Dag 필터링, owner_links 같은 기능들을 모두 오작동시키는 원인이었습니다.

기존 파싱의 문제점

문제는 FastAPI를 통해 Dag의 정보를 제공하는 데이터 모델, airflow/api_fastapi/core_api/datamodels/dags.pyDAGResponse 클래스에 있었습니다.

class DAGResponse(BaseModel):
    """DAG serializer for responses."""
    model_config = ConfigDict(
        alias_generator=AliasGenerator(
            validation_alias=lambda field_name: DAG_ALIAS_MAPPING.get(field_name, field_name),
        ),
    )
    dag_id: str
    dag_display_name: str
    is_paused: bool
    is_stale: bool
    last_parsed_time: datetime | None
    last_expired: datetime | None
    bundle_name: str | None
    bundle_version: str | None
    relative_fileloc: str | None
    fileloc: str
    description: str | None
    deadline: list[DeadlineAlertResponse] | None
    timetable_summary: str | None
    timetable_description: str | None
    tags: list[DagTagResponse]
    max_active_tasks: int
    max_active_runs: int | None
    max_consecutive_failed_dag_runs: int
    has_task_concurrency_limits: bool
    has_import_errors: bool
    next_dagrun_logical_date: datetime | None
    next_dagrun_data_interval_start: datetime | None
    next_dagrun_data_interval_end: datetime | None
    next_dagrun_run_after: datetime | None
    owners: list[str]

    @field_validator("owners", mode="before")
    @classmethod
    def get_owners(cls, v: Any) -> list[str] | None:
        """Convert owners attribute to DAG representation."""
        if not (v is None or isinstance(v, str)):
            return v
        if v is None:
            return []
        if isinstance(v, str):
            return v.split(",")
        return v

마지막에 owner를 가지고 오는 부분을 살펴보면, 아래와 같이 되어있습니다.

    @field_validator("owners", mode="before")
    @classmethod
    def get_owners(cls, v: Any) -> list[str] | None:
        """Convert owners attribute to DAG representation."""
        if not (v is None or isinstance(v, str)):
            return v
        if v is None:
            return []
        if isinstance(v, str):
            return v.split(",")
        return v

문제의 핵심은 v.split(",") 코드였습니다. 이 코드는 단순히 쉼표를 기준으로 문자열을 나눌 뿐, 각 원소의 앞뒤에 있는 공백까지 세심하게 처리해주지는 못했습니다.

문제 해결을 위한 간단한 개선

기존의 v.split(",") 코드를 [x.strip() for x in v.split(",")]로 변경했습니다.

.strip()은 문자열 양 끝의 모든 공백(스페이스, 탭, 개행 등)을 말끔히 제거해주는 기능을 합니다.

    @field_validator("owners", mode="before")
    @classmethod
    def get_owners(cls, v: Any) -> list[str] | None:
        """Convert owners attribute to DAG representation."""
        if not (v is None or isinstance(v, str)):
            return v
        if v is None:
            return []
        if isinstance(v, str):
            return [x.strip() for x in v.split(",")]
        return v

이제 Airflow는 Owner 문자열에 포함된 불필요한 공백을 모두 제거하고 언제나 정확한 Owner 리스트를 반환하게 되었습니다.

1개의 좋아요