Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use iceberg::scan::{FileScanTask, TableScan};
use iceberg::table::Table;

use super::expr_to_predicate::convert_filters_to_predicate;
use crate::table::PartitionKeysKind;
use crate::to_datafusion_error;

/// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`].
Expand All @@ -60,6 +61,8 @@ pub struct IcebergTableScan {
predicates: Option<Predicate>,
/// Pre-planned file scan tasks per partition (eager mode), or `None` (lazy mode).
buckets: Option<Vec<Vec<FileScanTask>>>,
/// `None` when partitioning is `UnknownPartitioning`.
partition_keys_kind: Option<PartitionKeysKind>,
/// Optional limit on the number of rows to return.
limit: Option<usize>,
}
Expand Down Expand Up @@ -146,10 +149,19 @@ impl IcebergTableScan {
projection,
predicates,
buckets,
partition_keys_kind: None,
limit,
}
}

pub(crate) fn with_partition_keys_kind(
mut self,
partition_keys_kind: Option<PartitionKeysKind>,
) -> Self {
self.partition_keys_kind = partition_keys_kind;
self
}

pub fn table(&self) -> &Table {
&self.table
}
Expand All @@ -171,6 +183,12 @@ impl IcebergTableScan {
self.buckets.as_deref().unwrap_or(&[])
}

/// Returns the transform family behind the `Partitioning::Hash` declaration,
/// or `None` when the scan declares `UnknownPartitioning`.
pub fn partition_keys_kind(&self) -> Option<PartitionKeysKind> {
self.partition_keys_kind
}

pub fn limit(&self) -> Option<usize> {
self.limit
}
Expand Down
18 changes: 18 additions & 0 deletions crates/integrations/datafusion/src/table/bucketing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,17 @@ pub(super) fn compute_bucket_cols(
}])
}

/// Identifies the transform family behind a `Partitioning::Hash` declaration
/// on an [`IcebergTableScan`][crate::physical_plan::scan::IcebergTableScan].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum PartitionKeysKind {
/// Keys come from `Transform::Identity` fields.
Identity,
/// Keys come from `Transform::Bucket(_)` fields.
Bucket,
}

/// Single-entry partition-key descriptor used by [`bucket_tasks`] and
/// `IcebergTableProvider::scan` to drive both task distribution and the
/// `Partitioning::Hash` declaration.
Expand All @@ -175,6 +186,13 @@ impl PartitionKeys {
.collect(),
}
}

pub(super) fn kind(&self) -> PartitionKeysKind {
match self {
PartitionKeys::Identity(_) => PartitionKeysKind::Identity,
PartitionKeys::Bucket(_) => PartitionKeysKind::Bucket,
}
}
}

/// Return the partition keys that drive a `Partitioning::Hash` declaration,
Expand Down
51 changes: 36 additions & 15 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
//! table snapshot. Use for consistent analytical queries or time-travel scenarios.

mod bucketing;
pub use bucketing::PartitionKeysKind;
pub mod metadata_table;
pub mod table_provider_factory;

Expand Down Expand Up @@ -242,23 +243,27 @@ impl TableProvider for IcebergTableProvider {
let (buckets, all_had_full_key) =
bucketing::bucket_tasks(tasks, n_partitions, keys.as_ref());

let partitioning = match &keys {
Some(keys) if all_had_full_key && n_partitions > 0 => {
Partitioning::Hash(keys.column_exprs(), n_partitions)
}
_ => Partitioning::UnknownPartitioning(n_partitions),
let (partitioning, partition_keys_kind) = match &keys {
Some(keys) if all_had_full_key && n_partitions > 0 => (
Partitioning::Hash(keys.column_exprs(), n_partitions),
Some(keys.kind()),
),
_ => (Partitioning::UnknownPartitioning(n_partitions), None),
};

Ok(Arc::new(IcebergTableScan::new_with_tasks(
table,
None, // Always use current snapshot for catalog-backed provider
self.schema.clone(),
projection,
filters,
limit,
buckets,
partitioning,
)))
Ok(Arc::new(
IcebergTableScan::new_with_tasks(
table,
None, // Always use current snapshot for catalog-backed provider
self.schema.clone(),
projection,
filters,
limit,
buckets,
partitioning,
)
.with_partition_keys_kind(partition_keys_kind),
))
}

fn supports_filters_pushdown(
Expand Down Expand Up @@ -1326,6 +1331,10 @@ mod tests {
}
other => panic!("expected Partitioning::Hash, got {other:?}"),
}
assert_eq!(
scan.partition_keys_kind(),
Some(super::PartitionKeysKind::Identity),
);
}

/// A projection that omits the partition source column drops
Expand Down Expand Up @@ -1358,6 +1367,7 @@ mod tests {
scan.properties().partitioning,
Partitioning::UnknownPartitioning(_)
));
assert_eq!(scan.partition_keys_kind(), None);
}

// ── Bucket-transform partitioning tests ─────────────────────────────────
Expand Down Expand Up @@ -1516,6 +1526,10 @@ mod tests {
}
other => panic!("expected Partitioning::Hash, got {other:?}"),
}
assert_eq!(
scan.partition_keys_kind(),
Some(super::PartitionKeysKind::Bucket),
);
}

/// Single-column bucket spec where the projection excludes the *only*
Expand Down Expand Up @@ -1556,6 +1570,7 @@ mod tests {
scan.properties().partitioning,
Partitioning::UnknownPartitioning(_)
));
assert_eq!(scan.partition_keys_kind(), None);
}

/// A `None` partition slot makes `bucket_hash` return `None`, so the
Expand Down Expand Up @@ -1587,6 +1602,7 @@ mod tests {
scan.properties().partitioning,
Partitioning::UnknownPartitioning(_)
));
assert_eq!(scan.partition_keys_kind(), None);
}

/// Mixed `Bucket[N] + Truncate(_)` spec: `compute_bucket_cols` rejects
Expand Down Expand Up @@ -1693,6 +1709,7 @@ mod tests {
scan.properties().partitioning,
Partitioning::UnknownPartitioning(_)
));
assert_eq!(scan.partition_keys_kind(), None);
}

/// Mixed `Identity + Bucket` spec must keep the existing behaviour:
Expand Down Expand Up @@ -1825,6 +1842,10 @@ mod tests {
}
other => panic!("expected Partitioning::Hash, got {other:?}"),
}
assert_eq!(
scan.partition_keys_kind(),
Some(super::PartitionKeysKind::Identity),
);
}

/// Pure `Bucket[N]` with `target_partitions == N`: tasks must land
Expand Down