diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 4b0ea8606d..9a84832d88 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -24,6 +24,8 @@ pub use error::*; pub mod physical_plan; mod schema; pub mod table; +pub use physical_plan::IcebergPartitionedScan; +pub use table::partitioned::IcebergPartitionedTableProvider; pub use table::table_provider_factory::IcebergTableProviderFactory; pub use table::*; diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index aeac30de32..a257fe9e20 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -18,6 +18,7 @@ pub(crate) mod commit; pub(crate) mod expr_to_predicate; pub(crate) mod metadata_scan; +pub(crate) mod partitioned_scan; pub(crate) mod project; pub(crate) mod repartition; pub(crate) mod scan; @@ -27,5 +28,6 @@ pub(crate) mod write; pub(crate) const DATA_FILES_COL_NAME: &str = "data_files"; pub use expr_to_predicate::convert_filters_to_predicate; +pub use partitioned_scan::IcebergPartitionedScan; pub use project::project_with_partition; pub use scan::IcebergTableScan; diff --git a/crates/integrations/datafusion/src/physical_plan/partitioned_scan.rs b/crates/integrations/datafusion/src/physical_plan/partitioned_scan.rs new file mode 100644 index 0000000000..5083f068b2 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/partitioned_scan.rs @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::error::Result as DFResult; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; +use futures::TryStreamExt; +use iceberg::arrow::ArrowReaderBuilder; +use iceberg::io::FileIO; +use iceberg::scan::FileScanTask; + +use crate::to_datafusion_error; + +/// A DataFusion [`ExecutionPlan`] that reads one [`FileScanTask`] per partition. +/// +/// Display information (projection, predicate) is derived at runtime from the output schema and +/// the tasks rather than stored as dedicated struct fields. +#[derive(Debug, Clone)] +pub struct IcebergPartitionedScan { + tasks: Vec, + file_io: FileIO, + plan_properties: Arc, +} + +impl IcebergPartitionedScan { + pub fn new(tasks: Vec, file_io: FileIO, schema: ArrowSchemaRef) -> Self { + let n_partitions = tasks.len(); + let plan_properties = Self::compute_properties(schema, n_partitions); + Self { + tasks, + file_io, + plan_properties, + } + } + + pub fn tasks(&self) -> &[FileScanTask] { + &self.tasks + } + + pub fn file_io(&self) -> &FileIO { + &self.file_io + } + + fn compute_properties(schema: ArrowSchemaRef, n_partitions: usize) -> Arc { + Arc::new(PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(n_partitions), + EmissionType::Incremental, + Boundedness::Bounded, + )) + } +} + +impl ExecutionPlan for IcebergPartitionedScan { + fn name(&self) -> &str { + "IcebergPartitionedScan" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn properties(&self) -> &Arc { + &self.plan_properties + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> DFResult { + let task = self.tasks.get(partition).cloned().ok_or_else(|| { + datafusion::error::DataFusionError::Internal(format!( + "{}: partition index {partition} is out of bounds \ + (total tasks: {})", + self.name(), + self.tasks.len() + )) + })?; + + let file_io = self.file_io.clone(); + + let fut = async move { + let task_stream = futures::stream::once(futures::future::ready(Ok(task))); + let record_batch_stream = ArrowReaderBuilder::new(file_io) + .build() + .read(Box::pin(task_stream)) + .map_err(to_datafusion_error)? + .map_err(to_datafusion_error); + Ok::<_, datafusion::error::DataFusionError>(record_batch_stream) + }; + + let stream = futures::stream::once(fut).try_flatten(); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + stream, + ))) + } +} + +impl DisplayAs for IcebergPartitionedScan { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + let projection = self + .schema() + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect::>() + .join(","); + // All tasks share the same predicate (they come from a single scan plan build), + // so reading it from the first task is sufficient. + let predicate = self + .tasks + .first() + .and_then(|t| t.predicate()) + .map_or(String::new(), |p| format!("{p}")); + let file_count = self.tasks.len(); + write!( + f, + "{} projection:[{projection}] predicate:[{predicate}] file_count:[{file_count}]", + self.name() + )?; + if self.tasks.len() <= 5 { + let files = self + .tasks + .iter() + .map(|t| t.data_file_path()) + .collect::>() + .join(", "); + write!(f, " files:[{files}]")?; + } + Ok(()) + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..5ae41b86c1 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -17,15 +17,20 @@ //! Iceberg table providers for DataFusion. //! -//! This module provides two table provider implementations: +//! This module provides three table provider implementations: //! //! - [`IcebergTableProvider`]: Catalog-backed provider with automatic metadata refresh. //! Use for write operations and when you need to see the latest table state. //! //! - [`IcebergStaticTableProvider`]: Static provider for read-only access to a specific //! table snapshot. Use for consistent analytical queries or time-travel scenarios. +//! +//! - [`IcebergPartitionedTableProvider`]: Catalog-backed provider that assigns one +//! DataFusion partition per data file, enabling parallel file-level scanning. +//! Read-only; use [`IcebergTableProvider`] for write operations. pub mod metadata_table; +pub mod partitioned; pub mod table_provider_factory; use std::any::Any; diff --git a/crates/integrations/datafusion/src/table/partitioned.rs b/crates/integrations/datafusion/src/table/partitioned.rs new file mode 100644 index 0000000000..2e9aa22628 --- /dev/null +++ b/crates/integrations/datafusion/src/table/partitioned.rs @@ -0,0 +1,315 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::catalog::Session; +use datafusion::common::DataFusionError; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result as DFResult; +use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; +use datafusion::physical_plan::ExecutionPlan; +use futures::TryStreamExt; +use iceberg::arrow::schema_to_arrow_schema; +use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; + +use crate::error::to_datafusion_error; +use crate::physical_plan::expr_to_predicate::convert_filters_to_predicate; +use crate::physical_plan::partitioned_scan::IcebergPartitionedScan; + +/// Catalog-backed table provider that scans each data file in a separate DataFusion partition. +/// +/// This provider reloads table metadata from the catalog on every [`scan`][Self::scan] call +/// to guarantee freshness, then issues one DataFusion partition per data file so that +/// DataFusion's scheduler can execute file reads in parallel. +/// +/// Write operations are not supported. Use [`IcebergTableProvider`] for write access. +/// +/// For consistent read-only access to a fixed snapshot without per-scan catalog overhead, +/// use [`IcebergStaticTableProvider`] instead. +#[derive(Debug, Clone)] +pub struct IcebergPartitionedTableProvider { + catalog: Arc, + table_ident: TableIdent, + schema: ArrowSchemaRef, +} + +impl IcebergPartitionedTableProvider { + pub async fn try_new( + catalog: Arc, + namespace: NamespaceIdent, + name: impl Into, + ) -> Result { + let table_ident = TableIdent::new(namespace, name.into()); + // First load: used only to snapshot the Arrow schema for DataFusion planning. + // A second load_table is issued at scan time to guarantee the freshest snapshot. + let table = catalog.load_table(&table_ident).await?; + let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); + Ok(Self { + catalog, + table_ident, + schema, + }) + } +} + +#[async_trait] +impl TableProvider for IcebergPartitionedTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> DFResult> { + // Per-partition row limits are not yet implemented for IcebergPartitionedScan. + // DataFusion will apply a GlobalLimitExec on top of this node when needed. + + // Second load: fetch the latest snapshot so scans always reflect current table state. + let table = self + .catalog + .load_table(&self.table_ident) + .await + .map_err(to_datafusion_error)?; + + // Projection indices are resolved against self.schema (captured at try_new time), + // same as IcebergTableProvider / IcebergTableScan. + let col_names = projection.map(|indices| { + indices + .iter() + .map(|&i| self.schema.field(i).name().clone()) + .collect::>() + }); + + let predicate = convert_filters_to_predicate(filters); + + let mut builder = table.scan(); + builder = match col_names { + Some(names) => builder.select(names), + None => builder.select_all(), + }; + if let Some(pred) = predicate { + builder = builder.with_filter(pred); + } + + let tasks = builder + .build() + .map_err(to_datafusion_error)? + .plan_files() + .await + .map_err(to_datafusion_error)? + .try_collect::>() + .await + .map_err(to_datafusion_error)?; + + let output_schema = match projection { + None => self.schema.clone(), + Some(indices) => Arc::new(self.schema.project(indices).map_err(|e| { + DataFusionError::Internal(format!("schema projection failed: {e}")) + })?), + }; + + Ok(Arc::new(IcebergPartitionedScan::new( + tasks, + table.file_io().clone(), + output_schema, + ))) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult> { + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } + + async fn insert_into( + &self, + _state: &dyn Session, + _input: Arc, + _insert_op: datafusion::logical_expr::dml::InsertOp, + ) -> DFResult> { + Err(to_datafusion_error(Error::new( + ErrorKind::FeatureUnsupported, + "IcebergPartitionedTableProvider does not support writes; \ + use IcebergTableProvider instead", + ))) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use datafusion::prelude::SessionContext; + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, NestedField, PrimitiveType, Schema, Type, + }; + use iceberg::transaction::{ApplyTransactionAction, Transaction}; + use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent}; + use tempfile::TempDir; + + use super::*; + + async fn make_catalog_and_table() -> (Arc, NamespaceIdent, String, TempDir) { + let temp_dir = TempDir::new().unwrap(); + let warehouse = temp_dir.path().to_str().unwrap().to_string(); + + let catalog = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse.clone())]), + ) + .await + .unwrap(), + ); + + let namespace = NamespaceIdent::new("ns".to_string()); + catalog + .create_namespace(&namespace, HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + catalog + .create_table( + &namespace, + TableCreation::builder() + .name("t".to_string()) + .location(format!("{warehouse}/t")) + .schema(schema) + .properties(HashMap::new()) + .build(), + ) + .await + .unwrap(); + + (catalog, namespace, "t".to_string(), temp_dir) + } + + /// Registers `n` synthetic data files in the table metadata via the iceberg + /// transaction API. No actual parquet files are written, only the metadata + /// entries that `plan_files()` reads are created. + async fn append_fake_data_files( + catalog: &Arc, + namespace: &NamespaceIdent, + table_name: &str, + n: usize, + ) { + let table = catalog + .load_table(&TableIdent::new(namespace.clone(), table_name.to_string())) + .await + .unwrap(); + + let data_files = (0..n) + .map(|i| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!( + "{}/data/fake_{i}.parquet", + table.metadata().location() + )) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(128) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .build() + .unwrap() + }) + .collect::>(); + + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(data_files); + action + .apply(tx) + .unwrap() + .commit(catalog.as_ref()) + .await + .unwrap(); + } + + /// An empty table must produce a zero-partition scan so DataFusion never calls + /// execute(0), which would otherwise return an out-of-bounds error. + #[tokio::test] + async fn test_empty_table_zero_partitions() { + let (catalog, namespace, table_name, _temp_dir) = make_catalog_and_table().await; + // no files appended + let provider = IcebergPartitionedTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&SessionContext::new().state(), None, &[], None) + .await + .unwrap(); + let scan = plan + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(scan.tasks().len(), 0); + assert_eq!(scan.properties().partitioning.partition_count(), 0); + } + + /// Each data file in the table must become exactly one DataFusion partition + /// in IcebergPartitionedScan, enabling parallel file reads. + #[tokio::test] + async fn test_one_partition_per_file() { + let (catalog, namespace, table_name, _temp_dir) = make_catalog_and_table().await; + append_fake_data_files(&catalog, &namespace, &table_name, 3).await; + + let provider = IcebergPartitionedTableProvider::try_new(catalog, namespace, table_name) + .await + .unwrap(); + let plan = provider + .scan(&SessionContext::new().state(), None, &[], None) + .await + .unwrap(); + let scan = plan + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(scan.tasks().len(), 3); + assert_eq!(scan.properties().partitioning.partition_count(), 3); + } +}