Skip to content

Commit d8d09ce

Browse files
committed
feat(datafusion): expose PartitionKeysKind getter on IcebergTableScan
Add `PartitionKeysKind` (#[non_exhaustive] enum: Identity | Bucket) and a public `partition_keys_kind() -> Option<PartitionKeysKind>` getter on `IcebergTableScan`, so callers can distinguish identity-backed from bucket-backed `Partitioning::Hash` without re-inspecting table metadata. Wired through `IcebergTableProvider::scan` via a crate-internal `with_partition_keys_kind` setter; public constructor signatures are unchanged. Existing bucket/identity tests extended with `partition_keys_kind()` assertions. (cherry picked from commit 467cc61)
1 parent 25af0c7 commit d8d09ce

3 files changed

Lines changed: 72 additions & 15 deletions

File tree

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use iceberg::scan::{FileScanTask, TableScan};
3434
use iceberg::table::Table;
3535

3636
use super::expr_to_predicate::convert_filters_to_predicate;
37+
use crate::table::PartitionKeysKind;
3738
use crate::to_datafusion_error;
3839

3940
/// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`].
@@ -60,6 +61,8 @@ pub struct IcebergTableScan {
6061
predicates: Option<Predicate>,
6162
/// Pre-planned file scan tasks per partition (eager mode), or `None` (lazy mode).
6263
buckets: Option<Vec<Vec<FileScanTask>>>,
64+
/// `None` when partitioning is `UnknownPartitioning`.
65+
partition_keys_kind: Option<PartitionKeysKind>,
6366
/// Optional limit on the number of rows to return.
6467
limit: Option<usize>,
6568
}
@@ -146,10 +149,19 @@ impl IcebergTableScan {
146149
projection,
147150
predicates,
148151
buckets,
152+
partition_keys_kind: None,
149153
limit,
150154
}
151155
}
152156

157+
pub(crate) fn with_partition_keys_kind(
158+
mut self,
159+
partition_keys_kind: Option<PartitionKeysKind>,
160+
) -> Self {
161+
self.partition_keys_kind = partition_keys_kind;
162+
self
163+
}
164+
153165
pub fn table(&self) -> &Table {
154166
&self.table
155167
}
@@ -171,6 +183,12 @@ impl IcebergTableScan {
171183
self.buckets.as_deref().unwrap_or(&[])
172184
}
173185

186+
/// Returns the transform family behind the `Partitioning::Hash` declaration,
187+
/// or `None` when the scan declares `UnknownPartitioning`.
188+
pub fn partition_keys_kind(&self) -> Option<PartitionKeysKind> {
189+
self.partition_keys_kind
190+
}
191+
174192
pub fn limit(&self) -> Option<usize> {
175193
self.limit
176194
}

crates/integrations/datafusion/src/table/bucketing.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,17 @@ pub(super) fn compute_bucket_cols(
150150
Some(cols)
151151
}
152152

153+
/// Identifies the transform family behind a `Partitioning::Hash` declaration
154+
/// on an [`IcebergTableScan`][crate::physical_plan::scan::IcebergTableScan].
155+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156+
#[non_exhaustive]
157+
pub enum PartitionKeysKind {
158+
/// Keys come from `Transform::Identity` fields.
159+
Identity,
160+
/// Keys come from `Transform::Bucket(_)` fields.
161+
Bucket,
162+
}
163+
153164
/// Single-entry partition-key descriptor used by [`bucket_tasks`] and
154165
/// `IcebergTableProvider::scan` to drive both task distribution and the
155166
/// `Partitioning::Hash` declaration.
@@ -173,6 +184,13 @@ impl PartitionKeys {
173184
.collect(),
174185
}
175186
}
187+
188+
pub(super) fn kind(&self) -> PartitionKeysKind {
189+
match self {
190+
PartitionKeys::Identity(_) => PartitionKeysKind::Identity,
191+
PartitionKeys::Bucket(_) => PartitionKeysKind::Bucket,
192+
}
193+
}
176194
}
177195

178196
/// Try identity detection first (preserves the existing behaviour, including

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
//! table snapshot. Use for consistent analytical queries or time-travel scenarios.
2727
2828
mod bucketing;
29+
pub use bucketing::PartitionKeysKind;
2930
pub mod metadata_table;
3031
pub mod table_provider_factory;
3132

@@ -241,23 +242,27 @@ impl TableProvider for IcebergTableProvider {
241242
let (buckets, all_had_full_key) =
242243
bucketing::bucket_tasks(tasks, n_partitions, keys.as_ref());
243244

244-
let partitioning = match &keys {
245-
Some(keys) if all_had_full_key && n_partitions > 0 => {
246-
Partitioning::Hash(keys.column_exprs(), n_partitions)
247-
}
248-
_ => Partitioning::UnknownPartitioning(n_partitions),
245+
let (partitioning, partition_keys_kind) = match &keys {
246+
Some(keys) if all_had_full_key && n_partitions > 0 => (
247+
Partitioning::Hash(keys.column_exprs(), n_partitions),
248+
Some(keys.kind()),
249+
),
250+
_ => (Partitioning::UnknownPartitioning(n_partitions), None),
249251
};
250252

251-
Ok(Arc::new(IcebergTableScan::new_with_tasks(
252-
table,
253-
None, // Always use current snapshot for catalog-backed provider
254-
self.schema.clone(),
255-
projection,
256-
filters,
257-
limit,
258-
buckets,
259-
partitioning,
260-
)))
253+
Ok(Arc::new(
254+
IcebergTableScan::new_with_tasks(
255+
table,
256+
None, // Always use current snapshot for catalog-backed provider
257+
self.schema.clone(),
258+
projection,
259+
filters,
260+
limit,
261+
buckets,
262+
partitioning,
263+
)
264+
.with_partition_keys_kind(partition_keys_kind),
265+
))
261266
}
262267

263268
fn supports_filters_pushdown(
@@ -1325,6 +1330,10 @@ mod tests {
13251330
}
13261331
other => panic!("expected Partitioning::Hash, got {other:?}"),
13271332
}
1333+
assert_eq!(
1334+
scan.partition_keys_kind(),
1335+
Some(super::PartitionKeysKind::Identity),
1336+
);
13281337
}
13291338

13301339
/// A projection that omits the partition source column drops
@@ -1357,6 +1366,7 @@ mod tests {
13571366
scan.properties().partitioning,
13581367
Partitioning::UnknownPartitioning(_)
13591368
));
1369+
assert_eq!(scan.partition_keys_kind(), None);
13601370
}
13611371

13621372
// ── Bucket-transform partitioning tests ─────────────────────────────────
@@ -1515,6 +1525,10 @@ mod tests {
15151525
}
15161526
other => panic!("expected Partitioning::Hash, got {other:?}"),
15171527
}
1528+
assert_eq!(
1529+
scan.partition_keys_kind(),
1530+
Some(super::PartitionKeysKind::Bucket),
1531+
);
15181532
}
15191533

15201534
/// A projection that excludes the bucket source column drops
@@ -1552,6 +1566,7 @@ mod tests {
15521566
scan.properties().partitioning,
15531567
Partitioning::UnknownPartitioning(_)
15541568
));
1569+
assert_eq!(scan.partition_keys_kind(), None);
15551570
}
15561571

15571572
/// A `None` partition slot makes `bucket_hash` return `None`, so the
@@ -1583,6 +1598,7 @@ mod tests {
15831598
scan.properties().partitioning,
15841599
Partitioning::UnknownPartitioning(_)
15851600
));
1601+
assert_eq!(scan.partition_keys_kind(), None);
15861602
}
15871603

15881604
/// Mixed `Bucket[N] + Truncate(_)` spec: `compute_bucket_cols` rejects
@@ -1689,6 +1705,7 @@ mod tests {
16891705
scan.properties().partitioning,
16901706
Partitioning::UnknownPartitioning(_)
16911707
));
1708+
assert_eq!(scan.partition_keys_kind(), None);
16921709
}
16931710

16941711
/// Mixed `Identity + Bucket` spec must keep the existing behaviour:
@@ -1821,5 +1838,9 @@ mod tests {
18211838
}
18221839
other => panic!("expected Partitioning::Hash, got {other:?}"),
18231840
}
1841+
assert_eq!(
1842+
scan.partition_keys_kind(),
1843+
Some(super::PartitionKeysKind::Identity),
1844+
);
18241845
}
18251846
}

0 commit comments

Comments
 (0)