Airflow의 새로운 기능에 기여해보자 2편: Apache DataFusion 이해해보기

1편을 쓴지 벌써 시간이 좀 지났는데요.
최근에 본업에서 원투펀치를 맞고 뻗어있다보니… 늦어졌습니다 ㅎㅎ

각설하고 시작해보자면, 자 일단 지난 1편에선 AIP를 보고 이해해보기를 진행했습니다.

그런데 자연스럽게 DataFusion이 뭔데 Airflow에 도입되려는거지? 하는 궁금증이 생겼습니다.
그래서 DataFusion을 조금 깊게 살펴봤습니다.

Airflow에 기여할 때마다 새로운 개념들을 계속 만나게 되는데,
이게 은근히 재밌어서 취미지만 계속 할 수 있는 것 같네요.

Apache DataFusion이란?

한 문장으로 정리하면 아래와 같습니다.

Apache DataFusion = Rust로 작성된 확장 가능한(embeddable) 쿼리 엔진

여기서 핵심은 '확장 가능한’라는 단어입니다.

SQLite 나 DuckDB처럼 하나의 바이너리로 동작하는 완전형 데이터 베이스가 아니라, 다른 시스템 안에 녹다을어서 쿼리 처리 기능을 제공하는 라이브러리 같은 느낌으로 느껴집니다.

그래서 타겟이 쿼리를 날리고 싶은 엔드단 유저가 아닌, 다른 시스템 속에서 새로운 DB나 분석 시스템을 만들고 싶은 개발자로 보입니다.

그래서 실제로 DataFusion을 사용하는 프로젝트들을 보면 성격이 매우 다릅니다. InfluxDB IOx는 시계열 DB로, Ballista는 분산 쿼리 엔진으로, Apache Comet은 Spark 가속기로, DataFusion-Python은 pandas 대체제로 각각 DataFusion을 "엔진"으로 내장하여 쓰고 있습니다.

왜 이런게 가능한가?

같은 엔진이 이렇게 이질적인 프로젝트들에서 돌아갈 수 있는 이유는 DataFusion이 거의 모든 컴포넌트를 trait 기반으로 교체 가능하게 설계했기 때문입니다.
이런 이유로 Apache Airflow의 AIP-99에 Apache DataFusion이 채택된 것으로 보입니다.

좀 길었는데 정리하면, DataFusion은 하나의 완제품이 아니라 조립형 쿼리 엔진 키트에 가깝습니다.

메모리 포맷으로는 Apache Arrow를 채택했습니다. Arrow는 컬럼 기반 in-memory 포맷 표준이라 벡터화 연산(SIMD)에 유리하고, 서로 다른 시스템 간에 데이터를 zero-copy로 주고받을 수 있습니다. DataFusion의 성능이 경쟁력 있는 이유 중 큰 부분이 Arrow에서 옵니다.

그러다보니 커뮤니티도 Arrow와 같이 있는 것을 확인할 수 있더라고요.

DataFusion의 쿼리 처리 파이프라인

DataFusion에서 쿼리 한 건이 결과로 바뀔 때까지 거치는 과정을 단계별로 따라가 봅시다.

Step 1. Parsing

sqlparser-rs 로 SQL 문자열을 AST로 변환합니다.

Step 2. Logical Planning

AST를 LogicalPlan이라는 관계대수 트리로 변환합니다. Projection, Filter, Join, Aggregate, TableScan 같은 노드가 트리 형태로 이어지는데, “무엇을” 계산할지만 표현하지 "어떻게"는 아직 정해지지 않은 상태입니다.

이 단계에서 테이블 이름을 실제 테이블로 묶어주는 건 TableProvider trait입니다

Step 3. Logical Optimization

LogicalPlan을 의미는 유지하면서 더 저렴한 형태로 변환합니다.

대표적인 규칙들:

  • PushDownFilter → WHERE를 스캔 단계까지 밀어넣음
  • PushDownProjection → 필요 없는 컬럼 제거
  • SimplifyExpressions → 상수 계산 미리 수행
  • EliminateCrossJoin / DecorrelatePredicateSubquery

핵심 목표는 데이터 양 자체를 줄이는 것으로 명확합니다.

Step 4. Physical Planning

LogicalPlan을 ExecutionPlan으로 변환합니다.

최적화된 LogicalPlan이 ExecutionPlan 트리로 바뀝니다. 이제부터는 “어떻게” 계산할지의 이야기입니다.

  • Aggregate 하나가 해시 기반이냐 정렬 기반이냐.
  • Join도 HashJoinExec, SortMergeJoinExec, NestedLoopJoinExec 중에서 고름.
  • 데이터 소스는 ParquetExec, CsvExec, AvroExec 같은 물리 스캔 노드로 구체화.

이 단계부터 파티셔닝과 분산을 고려하기 시작합니다.

Step 5. Physical Optimization

ExecutionPlan을 실행 효율에 맞게 최적화합니다.

이 단계의 규칙은 하드웨어, 메모리, 파티션을 신경 쓰면서 최적화 합니다.

  • EnforceDistribution — 조인 양쪽의 파티션이 안 맞으면 RepartitionExec를 끼워 넣음.
  • EnforceSorting — 정렬이 필요한 곳에만 SortExec를 남기고 불필요한 정렬은 제거.
  • CoalesceBatches — 너무 잘게 쪼개진 RecordBatch를 뭉쳐 벡터화 효율을 올림.
  • AggregateStatistics — COUNT(*) 같은 건 통계로 즉답 가능하면 스캔 자체를 생략.

Step 6. Execution

최종적으로 Stream 형태로 결과를 반환합니다.

드디어 실행. 포인트는 두 가지입니다.

Pull 기반 스트림

각 ExecutionPlan 노드는 호출되면 Stream를 반환합니다.
상위에서 next()를 호출하면 하위 노드가 필요한 만큼만 데이터를 가져옵니다.
따라서 전체 데이터를 메모리에 올릴 필요가 없습니다.

Arrow RecordBatch 단위 벡터화

한 번의 함수 호출이 한 행이 아니라 수천 행(기본 8192)짜리 컬럼 묶음을 처리합니다. 여기에 SIMD가 붙으면서 성능이 나옵니다. Tokio 기반 async 실행과 파티션별 병렬 실행이 그 위에 얹힙니다.

이 부분은 아직 제가 잘 모르는 부분이라 좀 더 확인이 필요할 것 같습니다.

정리

앞서 "모든 컴포넌트가 trait로 교체 가능"이라고 적었는데, 이 파이프라인의 거의 모든 단계마다 교체 지점이 하나씩 박혀 있다고 생각하면 될 것 같아요.

단계 하는 일 대표 교체 지점
Parsing SQL → AST DataFrame API로 우회
Logical Plan AST → 관계대수 트리 TableProvider
Logical Opt 의미 보존 재작성 OptimizerRule
Physical Plan 논리 → 실행 전략 PhysicalPlanner
Physical Opt 파티션/메모리 고려 재작성 PhysicalOptimizerRule
Execution Arrow 스트림 실행 ExecutionPlan, UDF 3종

"trait 기반으로 모든 게 교체 가능"이라는 말이 추상적인 카피가 아니라, 각 단계에 박힌 실제 확장점의 총합이라는 게 보입니다.

Airflow 코드에서 DataFusion이 불리는 순간

자, 이제 실제로 Airflow가 이 엔진을 어떻게 부르는지 한번 따라가 봅시다.

providers/common/sql/src/airflow/providers/common/sql/
├── config.py                    # DataSourceConfig, ConnectionConfig 등
├── operators/
│   └── analytics.py             # AnalyticsOperator
└── datafusion/
    ├── base.py                  # 추상 클래스 (ObjectStorageProvider, FormatHandler)
    ├── engine.py                # DataFusionEngine
    ├── object_storage_provider.py   # S3, Local 구현
    ├── format_handlers.py       # Parquet, CSV, Avro, Iceberg 구현
    └── exceptions.py

유저가 Dag에서 쓰는 건 AnalyticsOperator 하나지만, 자세히 알기 위해서 그 밑에서 DataFusion Python 바인딩(datafusion 패키지)이 호출되는 지점을 순서대로 짚어보겠습니다.

Step 0. 유저가 쓰는 모양

유저는 단순하게 이렇게 씁니다:

from airflow.providers.common.sql.config import DataSourceConfig
from airflow.providers.common.sql.operators.analytics import AnalyticsOperator

AnalyticsOperator(
    task_id="sales_q1",
    datasource_configs=[
        DataSourceConfig(
            conn_id="aws_default",
            uri="s3://my-bucket/sales/2026/",
            format="parquet",
            table_name="sales",
        ),
    ],
    queries=["SELECT region, SUM(amount) FROM sales GROUP BY region"],
)

이 Dag 한 줄이 DataFusion의 6단계 파이프라인을 통째로 돌립니다. 어떻게 그게 가능한지 추적해 봅시다.

Step 1. Operator execute() 진입

# analytics.py:85
def execute(self, context: Context) -> str:
    results = []
    for datasource_config in self.datasource_configs:
        self._df_engine.register_datasource(datasource_config)

    for query in self.queries:
        result_dict = self._df_engine.execute_query(query, max_rows=self.max_rows_check)
        results.append({"query": query, "data": result_dict})
    ...

구조가 매우 담백합니다. “데이터소스를 전부 등록하고, 쿼리를 하나씩 실행한다.” 이게 전부예요. 복잡한 건 전부 _df_engine 뒤로 숨겨져 있습니다.

self._df_enginecached_property로 선언돼 있는데, 재밌는 포인트 하나가 있습니다.

# analytics.py:70
@cached_property
def _df_engine(self) -> DataFusionEngine:
    if self.engine is None:
        try:
            from airflow.providers.common.sql.datafusion.engine import DataFusionEngine
        except ModuleNotFoundError as e:
            if e.name == "datafusion":
                raise AirflowOptionalProviderFeatureException(
                    "Failed to import DataFusion. To use the AnalyticsOperator, please install the "
                    "`apache-airflow-providers-common-sql[datafusion]` extra."
                ) from e
            raise
        return DataFusionEngine()

datafusion 패키지 import를 execute 시점까지 미룹니다. provider 설치만 하고 extra는 설치되지 않은 유저가 Dag 파싱 단계에서 터지지 않도록, 그리고 친절하게 “extra 안깔려 있음. 설치해주세요” 에러를 내기 위해서인데, 이게 Airflow provider 패턴의 교과서적인 모습입니다.

Step 2. DataFusionEngine 생성

# engine.py:37
class DataFusionEngine(LoggingMixin):
    def __init__(self):
        super().__init__()
        self.df_ctx = SessionContext()
        self.registered_tables: dict[str, str] = {}

여기서 드디어 DataFusion Python 바인딩이 처음 호출됩니다. from datafusion import SessionContextSessionContext() 생성자가 그 부분입니다.

이 한 줄이 DataFusion 세계의 모든 것을 품고 있습니다. SessionContext는 러스트 쪽의 SessionContext와 1:1로 매칭되는 핸들이고, 그 안에 파이프라인의 1~6단계에서 썼던 모든 component parser, optimazier ruleset, physical planner, catalog, object store 레지스트리 가 전부 기본값으로 들어가 있습니다. 여기부터 아까 그린 파이프라인 박스가 메모리 위에 실체화됩니다.

Step 3. Datasource 등록

# engine.py:48
def register_datasource(self, datasource_config: DataSourceConfig):
    if not datasource_config.is_table_provider:
        if datasource_config.storage_type == StorageType.LOCAL:
            connection_config = None
        else:
            connection_config = self._get_connection_config(datasource_config.conn_id)
        self._register_object_store(datasource_config, connection_config)

    self._register_data_source_format(datasource_config)

등록은 두 단계로 나뉩니다.

  • 어디에 있는 파일이냐 (object store)
  • 그 파일을 어떻게 읽을 거냐 (format)

Step 3-a. Object Store 등록

# engine.py:63
storage_provider = get_object_storage_provider(datasource_config.storage_type)
object_store = storage_provider.create_object_store(
    datasource_config.uri, connection_config=connection_config
)
schema = storage_provider.get_scheme()
self.session_context.register_object_store(schema=schema, store=object_store)
# object_storage_provider.py:26
from datafusion.object_store import AmazonS3, LocalFileSystem
...
s3_store = AmazonS3(**credentials, **connection_config.extra_config, bucket_name=bucket)

S3ObjectStorageProvider가 실제로 하는 일은 AwsGenericHook으로부터 access key/secret을 뽑아와 DataFusion의 AmazonS3 객체에 주입하는 게 전부입니다. 그리고 session_context.register_object_store("s3://", s3_store)로 엔진에 꽂아줍니다.

이걸 하고 나면 DataFusion 쪽에서는 "s3://로 시작하는 경로를 어떻게 읽어야 하는지"를 알게 됩니다. 이게 파이프라인 2번 단계(TableProvider)의 밑바닥 레이어라고 볼 수 있습니다.

Step 3-b. Format 등록

# engine.py:83
format_cls = get_format_handler(datasource_config)
format_cls.register_data_source_format(self.session_context)

def register_data_source_format(self, ctx: SessionContext):
    ctx.register_parquet(
        self.datasource_config.table_name,
        self.datasource_config.uri,
        **self.datasource_config.options,
    )

DataFusion의 SessionContext.register_parquet(name, uri)를 그대로 호출합니다. 이 호출 하나로 DataFusion은 해당 URI의 Parquet 파일들에 대한 TableProvider를 내부적으로 생성하고, SQL에서 sales라는 이름으로 조회 가능한 논리 테이블로 등록해줍니다.

Iceberg만 조금 다릅니다 Iceberg는 이미 만들어진 논리 테이블을 통째로 넣는 경로라, register_parquet 같은 메서드가 아니라 ctx.register_table(name, iceberg_table)을 씁니다. 그래서 engine.py에서 is_table_provider 플래그로 object store 등록을 건너뛰는 분기가 있습니다.

앞에서 "파이프라인 2단계 = TableProvider"라고 했던 부분이 바로 이 호출들입니다. 유저가 DataSourceConfig를 쓰면 register_parquet/register_csv/register_table이 불리고, 그게 러스트 쪽 TableProvider가 되어 LogicalPlan에 연결됩니다.

Step 4. 쿼리 실행

# engine.py:104
def execute_query(self, query: str, max_rows: int | None = None) -> dict[str, list[Any]]:
    try:
        self.log.info("Executing query: %s", query)
        df = self.session_context.sql(query)

        if max_rows is not None:
            result = df.limit(max_rows + 1).to_pydict()
            ...

문자열 하나를 넘기는 이 session_context.sql(query) 호출 안에서, 아까 그린 파이프라인 1~5단계가 통째로 도는 겁니다.

  1. sqlparser-rs가 SQL을 AST로 파싱.
  2. LogicalPlan이 만들어지면서 sales 테이블이 Step 3-b에서 등록한 Parquet TableProvider로 해석됨.
  3. OptimizerRule들이 돌면서 PushDownProjection이 SELECT region, amount만 남기고, PushDownFilter가 있으면 Parquet 스캔까지 내려감.
  4. ExecutionPlan으로 번역 — ParquetExec → HashAggregateExec 같은 실물 노드가 연결됨.
  5. Physical Optimization이 CoalesceBatches 같은 걸 끼워넣음.

그리고 .to_pydict()를 부르는 순간이 6단계(Execution)의 트리거입니다. DataFusion은 그제야 pull 기반 스트림을 돌리고, Parquet → Arrow RecordBatch → Aggregate → 최종 Arrow 컬럼을 파이썬 dict로 변환해 돌려줍니다.

Operator 쪽에서 df.limit(max_rows + 1)을 거는 이유도 여기서 설명됩니다. DataFusion은 스트리밍이라 전체를 다 돌지 않고 앞의 N+1만 뽑을 수 있고, 그게 XCom에 올라가는 결과 크기를 물리적으로 제한해줍니다.

Step 5. 결과를 사람이 볼 수 있는 형태로

우리는 결국 XCom 으로 받아야합니다.

#  analytics.py:96
match self.result_output_format:
    case "tabulate":
        return self._build_tabulate_output(results)
    case "json":
        return self._build_json_output(results)

엔진이 돌려준 dict[str, list[Any]] 컬럼 지향을 행 지향으로 뒤집고, tabulate로 마크다운 테이블을 만들거나 JSON 문자열로 직렬화해 리턴합니다. 이게 XCom에 올라가서 다음 태스크가 받거나, Airflow UI에서 바로 눈으로 확인할 수 있는 모양이 됩니다.

전체 과정 정리

마지막으로 전체 과정을 정리하면,

  1. Dag 작성자

    • AnalyticsOperator를 정의
  2. AnalyticsOperator.execute() (analytics.py:85)

    • DataFusion 엔진 lazy import
    • datasource 등록 및 쿼리 실행 시작
  3. DataFusionEngine 초기화 (engine.py:37)

    • SessionContext 생성
    • → DataFusion 엔진 전체 초기화
  4. Datasource 등록 (engine.py:48)

    • Object Store 등록 (S3 / Local)
      • ctx.register_object_store(…)
    • Format 등록 (Parquet / CSV / Avro)
      • ctx.register_parquet(…)
  5. 쿼리 실행 (engine.py:104)

    • ctx.sql(query)
      → Parsing ~ Physical Optimization (1~5단계)
    • df.limit().to_pydict()
      → Execution (6단계)
  6. 결과 처리

    • tabulate / JSON 변환
  7. XCom 반환

    • Airflow downstream task로 전달

datafusion 파이썬 패키지 호출만 추려보면 딱 5개입니다.

  1. SessionContext()
  2. AmazonS3(…) / LocalFileSystem()
  3. ctx.register_object_store(…)
  4. ctx.register_parquet(…) (혹은 register_csv / register_avro / register_table)
  5. ctx.sql(query).to_pydict()

이 5개 호출 사이사이에 Airflow의 Connection/Hook 체계, provider 의존성 격리, XCom 직렬화 같은 "Airflow스러운 것들"이 얇게 감싸여 있을 뿐입니다.
엔진은 DataFusion이 다 들고 있고, Airflow는 얇은 어댑터 역할에 집중하고 있다는 것을 볼 수 있습니다.

마무리하며…

그리 가볍게 본 건 아니었는데, 생각보다 더 깊은 세계였습니다.
호두 깨질뻔했네요…ㅠ

DataFusion은 단순한 SQL 엔진이 아니라 “다른 시스템 안에서 살아가는 쿼리 엔진 프레임워크” 에 가깝고, 그 구조 덕분에 Airflow 같은 시스템에서도 의미 있게 사용될 수 있다는 걸 알게 됐습니다.

AIP를 읽을 때마다 느끼는 건데, 결국 “기능”보다 “구조”를 이해해야 보이는 것들이 많아지는 것 같습니다.
DataFusion 쪽 공부를 좀 더 해봐야겠네요.

1개의 좋아요

이야~ DataFusion 여기서 보니 반갑네요!!!

DataFusion 어렵네요… :smiling_face_with_tear: