Apache Airflow 3.2.0: 대규모 데이터 인식 워크플로우

Apache Airflow 3.2.0에서는 세분화된 파이프라인 오케스트레이션을 위한 자산 파티셔닝, 엔터프라이즈 규모를 위한 다중 팀 배포, 동기식 마감일 알림 콜백, 그리고 완전한 Task SDK 분리를 향한 지속적인 개발이 도입되었습니다.

Apache Airflow 3.2.0 출시를 발표하게 되어 매우 기쁩니다 ! Airflow 3.1은 자동화된 워크플로우의 중심에 사람을 두었습니다. 3.2는 데이터에 대해서도 동일한 정밀도를 제공합니다. 세분화된 파이프라인 오케스트레이션을 위한 자산 파티셔닝, 엔터프라이즈 규모를 위한 다중 팀 배포, 동기식 마감일 알림 콜백, 그리고 Task SDK의 완전한 분리를 향한 지속적인 발전이 포함됩니다.

상세 정보 :

:package: PyPI: Client Challenge
:books: 문서: What is Airflow®? — Airflow 3.2.0 Documentation
:hammer_and_wrench: 릴리스 노트: Release Notes — Airflow 3.2.0 Documentation
:spouting_whale: Docker 이미지: docker pull apache/airflow:3.2.0
:bus_stop: 제약 조건: GitHub - apache/airflow at constraints-3.2.0 · GitHub

:card_index_dividers: 자산 분할(AIP-76): 필요한 작업만 실행됩니다

자산 파티셔닝은 데이터 인식 스케줄링에 가장 많이 요청된 기능 중 하나입니다. 날짜별로 파티션된 S3 경로, Hive 테이블 파티션, BigQuery 파티션 또는 기타 파티션된 데이터 저장소를 사용하는 경우 다음과 같은 문제를 경험해 보셨을 것입니다. 상위 작업이 하나의 파티션을 업데이트하면 실제로 어떤 파티션이 변경되었는지에 관계없이 모든 하위 Dag가 실행됩니다. 이는 비효율적이며 대규모 배포 환경에서는 운영상의 혼란을 야기합니다.

3.2 버전의 자산 파티셔닝은 이러한 세분화된 제어를 가능하게 합니다. 하위 Dag는 자신이 관심 있는 특정 파티션이 업데이트될 때만 트리거됩니다. 이는 자산 도입 이후 데이터 인식 스케줄링에 있어 가장 큰 변화이며, 파티션 기반 오케스트레이션을 Airflow에서 기본적으로 처리하는 기능으로 전환하여 사용자가 직접 해결해야 하는 번거로움을 없애줍니다.

핵심 역량

  • 파티션 기반 스케줄링 : Dag는 모든 자산 변경이 아닌 특정 파티션 업데이트 시에 트리거됩니다.
  • CronPartitionTimetable : cron 표현식을 사용하여 파티션에 대한 Dag를 예약합니다. Task SDK에서도 사용할 수 있습니다.
  • 분할된 Dags에 대한 백필 : 하위 시스템의 모든 작업을 다시 트리거하지 않고 이전 파티션을 백필합니다(#61464).
  • 다중 자산 파티션 : 단일 Dag는 여러 자산에 걸쳐 파티션을 수신할 수 있습니다. 이는 하위 작업이 여러 소스의 정렬에 의존하는 경우에 중요합니다(#60577).

보다 고급 사용 사례의 경우, 시간 범위와 값 범위를 파티션 키에 매핑하는 시간 및 범위 파티션 매퍼(#61522, #55247), 실행을 트리거한 파티션을 정확하게 확인할 수 있는 Dag 실행 참조의 파티션 키 필드(#61725), 그리고 여러 자산의 파티션 이벤트가 통합 트리거로 해결되는 방식을 완벽하게 제어할 수 있는 PartitionedAssetTimetable이 있습니다.

예시 : 세 개의 업스트림 데이터 수집 Dag는 각각 시간 단위로 별도의 자산에 데이터를 기록합니다. 다운스트림 Dag는 세 개의 Dag가 모두 동일한 시간별 파티션을 업데이트했을 때만 트리거됩니다. 세 자산이 기본적으로 파티션 키를 공유하지 않으므로 매퍼가 이를 공통 키로 해석합니다.

from __future__ import annotations

from airflow.sdk import (
    DAG,
    Asset,
    CronPartitionTimetable,
    PartitionedAssetTimetable,
    StartOfHourMapper,
    asset,
    task,
)

team_a_player_stats = Asset(uri="file://incoming/player-stats/team_a.csv", name="team_a_player_stats")
combined_player_stats = Asset(uri="file://curated/player-stats/combined.csv", name="combined_player_stats")


with DAG(
    dag_id="ingest_team_a_player_stats",
    schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
    tags=["player-stats", "ingestion"],
):

    @task(outlets=[team_a_player_stats])
    def ingest_team_a_stats():
        """Materialize Team A player statistics for the current hourly partition."""
        pass

    ingest_team_a_stats()


@asset(schedule=CronPartitionTimetable("15 * * * *", timezone="UTC"))
def team_b_player_stats():
    pass


with DAG(
    dag_id="clean_and_combine_player_stats",
    schedule=PartitionedAssetTimetable(
        assets=team_a_player_stats & team_b_player_stats,
        default_partition_mapper=StartOfHourMapper(),
    ),
    catchup=False,
):

    @task(outlets=[combined_player_stats])
    def combine_player_stats(dag_run=None):
        """Merge the aligned hourly partitions into a combined dataset."""
        print(dag_run.partition_key)

    combine_player_stats()

파티션 매퍼 example_asset_partition.py에 대한 자세한 내용은 Task SDK API 문서를 참조하십시오 .PartitionedAssetTimetable

:office_building: 다중 팀 배포(AIP-67): 엔터프라이즈용 Airflow

:warning: 실험적 기능 : Airflow 3.2의 멀티팀 지원은 실험적인 기능이며 사용자 피드백에 따라 향후 릴리스에서 변경될 수 있습니다.

Airflow 3.2는 멀티팀 지원을 도입하여 조직이 단일 Airflow 배포 환경에서 여러 개의 격리된 팀을 운영할 수 있도록 합니다. 각 팀은 자체 Dag, 연결, 변수, 풀 및 실행기를 가질 수 있으므로 팀별로 별도의 Airflow 인스턴스를 구축할 필요 없이 진정한 리소스 및 권한 격리가 가능합니다.

이는 특히 공유 인프라를 통해 여러 데이터 엔지니어링 또는 데이터 과학 팀에 서비스를 제공하는 플랫폼 팀에게 유용하며, 동시에 팀 간 리소스 및 액세스에 대한 강력한 경계를 유지하는 데에도 도움이 됩니다.

핵심 역량

  • 팀별 리소스 격리 : 각 팀은 고유한 Dag, 연결, 변수 및 풀을 갖습니다.
  • 팀별 실행기 : 각 팀은 서로 다른 실행기(예: Celery, Kubernetes, 로컬, AWS ECS 등)를 사용하고 개별적으로 구성할 수 있습니다. — #57837, #57910
  • 팀 범위 권한 부여 : Keycloak 및 Simple 인증 관리자는 팀 범위 액세스 제어를 지원합니다(#61351, #61861).
  • 팀 범위 비밀 : 팀별 비밀에는 AIRFLOW_VAR__{TEAM}___{KEY} 환경 변수 또는 패턴을 사용하세요(#62588).AIRFLOW_CONN__<TEAM>___<CONN_ID>
  • CLI 관리 : 팀 관리를 위한 새로운 CLI 명령어 (#55283)
  • UI 팀 선택기 : 연결, 변수 및 풀 생성/편집 양식의 팀 선택기 (#60237, #60474, #61082)
  • 완전한 API 지원 : team_name 연결, 변수 및 풀 API에 필드가 추가되었습니다(#59336, #57102, #60952).

멀티팀 활성화

# airflow.cfg 파일에 다음 설정을 추가하세요: 
[core] multi_team = True 
# 또는 환경 변수를 통해 설정하세요: 
export AIRFLOW__CORE__MULTI_TEAM=True

:alarm_clock: 마감일 알림: 이제 동기식 콜백 기능 지원 (AIP-86)

:warning: 실험적 기능 : 마감일 알림은 Airflow 3.2에서 실험적으로 제공되는 기능이며, 사용자 피드백에 따라 향후 릴리스에서 변경될 수 있습니다.

Airflow 3.1에서 도입된 마감일 알림 시스템을 기반으로, 이번 릴리스에서는 동기 콜백 지원이 추가되었습니다. 3.1 버전에서는 콜백이 트리거러를 통해 실행되었지만(비동기 방식만 지원), 통합 옵션이 제한적이었습니다. 동기 콜백은 실행기를 통해 직접 실행되며, 실행기 매개변수를 통해 특정 실행기를 지정할 수도 있습니다.

3.2 버전의 새로운 기능은 무엇인가요?

  • SyncCallback 지원 : AsyncCallback 트리거러에서 실행되는 것과 달리, SyncCallback 특정 실행기를 선택적으로 지정하여 워커에서 직접 실행됩니다.
  • Dag당 여러 마감일 알림 : deadline 매개변수에 목록을 전달하여 단일 Dag에 여러 임계값을 설정할 수 있습니다.
  • Grid API의 마감일 미준수 메타데이터 : Dag 실행 API에 이제 프로그램적 모니터링을 위한 마감일 미준수 정보가 포함됩니다.
  • 사용자 지정 마감일 참조에 대한 사용자 경험 개선 : 사용자 지정 마감일 참조 지점을 정의할 때 개발자 환경이 더욱 깔끔해졌습니다(#57222).
with DAG(
    dag_id="sync_deadline",
    deadline=DeadlineAlert(
        reference=DeadlineReference.FIXED_DATETIME(datetime(1980, 8, 10, 2)),
        interval=timedelta(0),
        callback=SyncCallback(
            SlackWebhookNotifier,
            {"text": "Sync Callback; Alert should trigger immediately!"},
        )
    )
):
    EmptyOperator(task_id='empty_task')

:desktop_computer: 사용자 인터페이스 개선

  • HITL 승인 내역 : 이제 휴먼 인 더 루프(HITL) 승인 인터페이스에서 모든 작업에 대한 승인 및 거부 내역을 완벽하게 확인할 수 있습니다. (#56760, #55952)
  • XCom 관리 : 이제 UI에서 XCom 값을 직접 추가, 편집 및 삭제할 수 있습니다. (#58921)
  • 분할된 상태 표시줄 : 축소된 작업 그룹과 매핑된 작업에 이제 상태를 한눈에 확인할 수 있도록 분할된 상태 표시줄이 표시됩니다(#61854).
  • 통합 툴팁 : 그리드 및 그래프 보기 툴팁에 이제 날짜, 기간 및 하위 상태가 표시됩니다(#62119).
  • Dag 코드 탭의 파일 이름 : 이제 코드 탭에 파일 식별 정보가 표시됩니다(#60759).
  • 로그 복사 버튼 : 원클릭 로그 복사 (#61185)
  • 날짜 범위 필터 : 날짜 범위별로 Dag 실행을 필터링합니다(#60772)
  • 상류/하류 작업 필터 : 그래프 및 그리드 보기에서 상류 또는 하류 작업을 기준으로 필터링합니다(#57237).
  • 데이터 익명화 : 이제 UI 및 공개 API에서 민감한 필드가 익명 처리됩니다(#59873).
  • 사용자 지정 테마 지원 : globalCss 화이트 라벨/사용자 지정 배포를 위한 테마 구성 (#61161, #58411)
  • React 플러그인에서 코어 UI 테마 상속 : 이제 플러그인 UI가 코어 Airflow 테마와 자동으로 일치합니다(#60256).
  • 간트 차트에서 작업 이름 표시 : task_display_name 가독성 향상을 위해 표시됨 (#61438)

:rocket: 성능 개선

렌더링된 작업 인스턴스 필드 정리: 약 42배 더 빨라졌습니다. 렌더링된 작업 인스턴스 필드 정리 작업이 재작성되어 매핑된 작업이 많은 Dag의 경우 약 42배 더 빨라졌습니다. 이제 필드 보존은 최근 N개의 작업 실행이 아닌 최근 N개의 Dag 실행을 기준으로 이루어지므로 더욱 직관적이고 성능이 크게 향상되었습니다. 구성 이름이 변경되었습니다. max_num_rendered_ti_fields_per_task ( num_dag_runs_to_retain_rendered_fields 이전 이름은 사용 중단 경고와 함께 계속 작동합니다.) (#60951)

스케줄러 개선 사항. 대규모 배포의 경우, 3.2 버전에서는 몇 가지 알려진 병목 현상을 해결합니다.

  • 스케줄러가 더 이상 모든 TaskInstance를 메모리에 로드하지 않으므로 대규모 배포 시 메모리 사용량 급증을 방지합니다(#60956).
  • 더 빠른 작업 대기열 제거 루프 (#61376)
  • 이제 큐 쿼리가 max_active_tasks 직접 적용되어 과도한 큐 대기를 방지합니다(#54103).

API 서버 개선 사항:

  • 작업 시작 시 SerializedDag 로드를 제거하여 메모리 사용량을 줄였습니다(#60803).
  • serialized_dag 이제 데이터 열이 PostgreSQL에서 JSONB를 사용합니다(#55979).

:wrench: Task SDK의 발전과 개발자 경험

Task SDK 분리 작업이 계속 진행 중입니다.

Airflow 3.2는 airflow-core Task SDK로 구성 요소를 계속 이동시키면서 클라이언트와 서버의 완전한 분리를 향해 나아가고 있습니다. 이를 통해 Dag 작성자는 Airflow Core 업그레이드 없이 Task SDK를 독립적으로 업그레이드할 수 있으므로 Dag 작성자와 운영팀 간의 조정 부담이 줄어듭니다.

이번 릴리스에서 일부 모듈이 Task SDK로 이동되었습니다(이전 가져오기 경로는 사용 중단 경고와 함께 계속 작동합니다).

  • 예외 : AirflowSkipException , TaskDeferred , 등 → airflow.sdk.exceptions (#59780)
  • Serde : airflow.serialization.serdeairflow.sdk.serde ; 직렬화기 → airflow.sdk.serde.serializers.* (#58900)
  • SkipMixin / BranchMixIn : Task SDK로 이동됨; 기존 가져오기는 common-compat (#62749, #62776) 을 통해 작동합니다.
  • 리미지 모듈 : 클라이언트-서버 분리를 위해 태스크 SDK로 이동했습니다(#60968, #61157).
  • 리스너 모듈 : 공유 라이브러리로 이동됨(#59883)
  • XCom API : 분리됨 XComEncoder (#58900)

PythonOperator 비동기 지원

PythonOperator 이제 비동기 호출 가능 객체를 지원합니다. 비동기 함수를 전달하면 python_callable 연산자가 이를 올바르게 기다리므로 사용자 지정 연산자 없이도 비동기 I/O 패턴을 사용할 수 있습니다. (#60268)

@task(show_return_value_in_logs=False)
async def load_xml_files(files):
    import asyncio
    from io import BytesIO
    from more_itertools import chunked
    from os import cpu_count
    from tenacity import retry, stop_after_attempt, wait_fixed

    from airflow.providers.sftp.hooks.sftp import SFTPClientPool

    print("number of files:", len(files))

    async with SFTPClientPool(sftp_conn_id=sftp_conn, pool_size=cpu_count()) as pool:
        @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
        async def download_file(file):
            async with pool.get_sftp_client() as sftp:
                print("downloading:", file)
                buffer = BytesIO()
                async with sftp.open(file, encoding=xml_encoding) as remote_file:
                    data = await remote_file.read()
                    buffer.write(data.encode(xml_encoding))
                    buffer.seek(0)
                return buffer

        for batch in chunked(files, cpu_count() * 2):
            tasks = [asyncio.create_task(download_file(f)) for f in batch]

            # Wait for this batch to finish before starting the next
            for task in asyncio.as_completed(tasks):
                result = await task
                # Do something with result or accumulate it and return it as an XCom

업데이트된 보안 모델

저희는 Airflow 배포의 격리 및 보안을 개선하기 위해 노력하고 있으며, Airflow 보안에 대한 기대치를 사용자가 더 잘 이해할 수 있도록 Airflow 3.2.0에 구현된 변경 사항을 반영하고 향후 개선 사항을 설명하는 보안 모델을 업데이트했습니다. 자세한 내용은 Airflow 보안 모델을 참조하세요 .

:folded_hands: 커뮤니티 여러분께 감사드립니다

이번 릴리스는 전 세계 수백 명의 기여자들의 공동 노력의 결과입니다. Airflow 3.2.0을 가능하게 해주신 릴리스 관리자, 모든 개발자, 문서 작성자, 테스터 및 커뮤니티 회원 여러분께 특별히 감사드립니다.

여러분과 같은 기여자분들 덕분에 Airflow 프로젝트는 계속해서 발전하고 있습니다. 이슈를 제기하거나, PR을 제출하거나, 문서를 개선하거나, 커뮤니티 구성원을 돕는 등 모든 기여는 소중합니다.

:link: 참여하세요

  • 릴리스 버전을 사용해 보세요 : 개발 환경을 업그레이드하고 새로운 기능을 살펴보세요
  • 대화에 참여하세요 : Slack개발자 메일링 리스트 에서 저희와 소통하세요.
  • 참여하기 : 참여 가이드를 확인하세요
  • 피드백 제공 : GitHub 에서 여러분의 경험과 제안을 공유해 주세요.

Apache Airflow 3.2.0은 데이터 인식 및 파티션 기반 워크플로우 오케스트레이션에 새로운 장을 열었습니다. 여러분이 이 기능을 활용하여 어떤 멋진 결과물을 만들어낼지 기대됩니다!


본 글은 Apache Airflow 블로그 글을 GPT 모델을 활용하여 번역한 내용입니다. 따라서 원문의 내용 또는 의도와 다르게 정리된 내용이 있을 수 있습니다.
관심있는 내용이시라면 원문도 함께 참고해주세요! Blog | Apache Airflow
읽으시면서 어색하거나 잘못된 내용을 발견하시면 댓글로 알려주시면 감사드리겠습니다.

2개의 좋아요

다행히 아무 일 없이(?) 릴리즈 되었네요 ㅎㅎ

멀티 팀모드는 저는 경험하지 못한 부분이라 모르겠지만..,
에셋 파티션은 좋아보이더라고요.
갠적으로 dagster의 가장 큰 장점이 이거라고 생각하고 있는터라 하핫

담주 중으로 3.2.0으로 올려봐야겠어요 :slight_smile:

1개의 좋아요