diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 0a386bded7..3926de8acd 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -34,6 +34,7 @@ use iceberg::scan::{FileScanTask, TableScan}; use iceberg::table::Table; use super::expr_to_predicate::convert_filters_to_predicate; +use crate::table::PartitionKeysKind; use crate::to_datafusion_error; /// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`]. @@ -60,6 +61,8 @@ pub struct IcebergTableScan { predicates: Option, /// Pre-planned file scan tasks per partition (eager mode), or `None` (lazy mode). buckets: Option>>, + /// `None` when partitioning is `UnknownPartitioning`. + partition_keys_kind: Option, /// Optional limit on the number of rows to return. limit: Option, } @@ -146,10 +149,19 @@ impl IcebergTableScan { projection, predicates, buckets, + partition_keys_kind: None, limit, } } + pub(crate) fn with_partition_keys_kind( + mut self, + partition_keys_kind: Option, + ) -> Self { + self.partition_keys_kind = partition_keys_kind; + self + } + pub fn table(&self) -> &Table { &self.table } @@ -171,6 +183,12 @@ impl IcebergTableScan { self.buckets.as_deref().unwrap_or(&[]) } + /// Returns the transform family behind the `Partitioning::Hash` declaration, + /// or `None` when the scan declares `UnknownPartitioning`. + pub fn partition_keys_kind(&self) -> Option { + self.partition_keys_kind + } + pub fn limit(&self) -> Option { self.limit } diff --git a/crates/integrations/datafusion/src/table/bucketing.rs b/crates/integrations/datafusion/src/table/bucketing.rs index 579752108c..633a732b31 100644 --- a/crates/integrations/datafusion/src/table/bucketing.rs +++ b/crates/integrations/datafusion/src/table/bucketing.rs @@ -152,6 +152,17 @@ pub(super) fn compute_bucket_cols( }]) } +/// Identifies the transform family behind a `Partitioning::Hash` declaration +/// on an [`IcebergTableScan`][crate::physical_plan::scan::IcebergTableScan]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum PartitionKeysKind { + /// Keys come from `Transform::Identity` fields. + Identity, + /// Keys come from `Transform::Bucket(_)` fields. + Bucket, +} + /// Single-entry partition-key descriptor used by [`bucket_tasks`] and /// `IcebergTableProvider::scan` to drive both task distribution and the /// `Partitioning::Hash` declaration. @@ -175,6 +186,13 @@ impl PartitionKeys { .collect(), } } + + pub(super) fn kind(&self) -> PartitionKeysKind { + match self { + PartitionKeys::Identity(_) => PartitionKeysKind::Identity, + PartitionKeys::Bucket(_) => PartitionKeysKind::Bucket, + } + } } /// Return the partition keys that drive a `Partitioning::Hash` declaration, diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 3e76393bec..f02b2e3b94 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -26,6 +26,7 @@ //! table snapshot. Use for consistent analytical queries or time-travel scenarios. mod bucketing; +pub use bucketing::PartitionKeysKind; pub mod metadata_table; pub mod table_provider_factory; @@ -242,23 +243,27 @@ impl TableProvider for IcebergTableProvider { let (buckets, all_had_full_key) = bucketing::bucket_tasks(tasks, n_partitions, keys.as_ref()); - let partitioning = match &keys { - Some(keys) if all_had_full_key && n_partitions > 0 => { - Partitioning::Hash(keys.column_exprs(), n_partitions) - } - _ => Partitioning::UnknownPartitioning(n_partitions), + let (partitioning, partition_keys_kind) = match &keys { + Some(keys) if all_had_full_key && n_partitions > 0 => ( + Partitioning::Hash(keys.column_exprs(), n_partitions), + Some(keys.kind()), + ), + _ => (Partitioning::UnknownPartitioning(n_partitions), None), }; - Ok(Arc::new(IcebergTableScan::new_with_tasks( - table, - None, // Always use current snapshot for catalog-backed provider - self.schema.clone(), - projection, - filters, - limit, - buckets, - partitioning, - ))) + Ok(Arc::new( + IcebergTableScan::new_with_tasks( + table, + None, // Always use current snapshot for catalog-backed provider + self.schema.clone(), + projection, + filters, + limit, + buckets, + partitioning, + ) + .with_partition_keys_kind(partition_keys_kind), + )) } fn supports_filters_pushdown( @@ -1326,6 +1331,10 @@ mod tests { } other => panic!("expected Partitioning::Hash, got {other:?}"), } + assert_eq!( + scan.partition_keys_kind(), + Some(super::PartitionKeysKind::Identity), + ); } /// A projection that omits the partition source column drops @@ -1358,6 +1367,7 @@ mod tests { scan.properties().partitioning, Partitioning::UnknownPartitioning(_) )); + assert_eq!(scan.partition_keys_kind(), None); } // ── Bucket-transform partitioning tests ───────────────────────────────── @@ -1516,6 +1526,10 @@ mod tests { } other => panic!("expected Partitioning::Hash, got {other:?}"), } + assert_eq!( + scan.partition_keys_kind(), + Some(super::PartitionKeysKind::Bucket), + ); } /// Single-column bucket spec where the projection excludes the *only* @@ -1556,6 +1570,7 @@ mod tests { scan.properties().partitioning, Partitioning::UnknownPartitioning(_) )); + assert_eq!(scan.partition_keys_kind(), None); } /// A `None` partition slot makes `bucket_hash` return `None`, so the @@ -1587,6 +1602,7 @@ mod tests { scan.properties().partitioning, Partitioning::UnknownPartitioning(_) )); + assert_eq!(scan.partition_keys_kind(), None); } /// Mixed `Bucket[N] + Truncate(_)` spec: `compute_bucket_cols` rejects @@ -1693,6 +1709,7 @@ mod tests { scan.properties().partitioning, Partitioning::UnknownPartitioning(_) )); + assert_eq!(scan.partition_keys_kind(), None); } /// Mixed `Identity + Bucket` spec must keep the existing behaviour: @@ -1825,6 +1842,10 @@ mod tests { } other => panic!("expected Partitioning::Hash, got {other:?}"), } + assert_eq!( + scan.partition_keys_kind(), + Some(super::PartitionKeysKind::Identity), + ); } /// Pure `Bucket[N]` with `target_partitions == N`: tasks must land