Skip to content

Commit c10a519

Browse files
authored
feat(datafusion): expose PartitionKeysKind getter on IcebergTableScan (#21)
Add `PartitionKeysKind` (#[non_exhaustive] enum: Identity | Bucket) and a public `partition_keys_kind() -> Option<PartitionKeysKind>` getter on `IcebergTableScan`, so callers can distinguish identity-backed from bucket-backed `Partitioning::Hash` without re-inspecting table metadata. Wired through `IcebergTableProvider::scan` via a crate-internal `with_partition_keys_kind` setter; public constructor signatures are unchanged. Existing bucket/identity tests extended with `partition_keys_kind()` assertions. (cherry picked from commit 28d117f)
1 parent 4b23b0b commit c10a519

3 files changed

Lines changed: 72 additions & 15 deletions

File tree

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use iceberg::scan::{FileScanTask, TableScan};
3434
use iceberg::table::Table;
3535

3636
use super::expr_to_predicate::convert_filters_to_predicate;
37+
use crate::table::PartitionKeysKind;
3738
use crate::to_datafusion_error;
3839

3940
/// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`].
@@ -60,6 +61,8 @@ pub struct IcebergTableScan {
6061
predicates: Option<Predicate>,
6162
/// Pre-planned file scan tasks per partition (eager mode), or `None` (lazy mode).
6263
buckets: Option<Vec<Vec<FileScanTask>>>,
64+
/// `None` when partitioning is `UnknownPartitioning`.
65+
partition_keys_kind: Option<PartitionKeysKind>,
6366
/// Optional limit on the number of rows to return.
6467
limit: Option<usize>,
6568
}
@@ -146,10 +149,19 @@ impl IcebergTableScan {
146149
projection,
147150
predicates,
148151
buckets,
152+
partition_keys_kind: None,
149153
limit,
150154
}
151155
}
152156

157+
pub(crate) fn with_partition_keys_kind(
158+
mut self,
159+
partition_keys_kind: Option<PartitionKeysKind>,
160+
) -> Self {
161+
self.partition_keys_kind = partition_keys_kind;
162+
self
163+
}
164+
153165
pub fn table(&self) -> &Table {
154166
&self.table
155167
}
@@ -171,6 +183,12 @@ impl IcebergTableScan {
171183
self.buckets.as_deref().unwrap_or(&[])
172184
}
173185

186+
/// Returns the transform family behind the `Partitioning::Hash` declaration,
187+
/// or `None` when the scan declares `UnknownPartitioning`.
188+
pub fn partition_keys_kind(&self) -> Option<PartitionKeysKind> {
189+
self.partition_keys_kind
190+
}
191+
174192
pub fn limit(&self) -> Option<usize> {
175193
self.limit
176194
}

crates/integrations/datafusion/src/table/bucketing.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,17 @@ pub(super) fn compute_bucket_cols(
152152
}])
153153
}
154154

155+
/// Identifies the transform family behind a `Partitioning::Hash` declaration
156+
/// on an [`IcebergTableScan`][crate::physical_plan::scan::IcebergTableScan].
157+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158+
#[non_exhaustive]
159+
pub enum PartitionKeysKind {
160+
/// Keys come from `Transform::Identity` fields.
161+
Identity,
162+
/// Keys come from `Transform::Bucket(_)` fields.
163+
Bucket,
164+
}
165+
155166
/// Single-entry partition-key descriptor used by [`bucket_tasks`] and
156167
/// `IcebergTableProvider::scan` to drive both task distribution and the
157168
/// `Partitioning::Hash` declaration.
@@ -175,6 +186,13 @@ impl PartitionKeys {
175186
.collect(),
176187
}
177188
}
189+
190+
pub(super) fn kind(&self) -> PartitionKeysKind {
191+
match self {
192+
PartitionKeys::Identity(_) => PartitionKeysKind::Identity,
193+
PartitionKeys::Bucket(_) => PartitionKeysKind::Bucket,
194+
}
195+
}
178196
}
179197

180198
/// Return the partition keys that drive a `Partitioning::Hash` declaration,

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
//! table snapshot. Use for consistent analytical queries or time-travel scenarios.
2727
2828
mod bucketing;
29+
pub use bucketing::PartitionKeysKind;
2930
pub mod metadata_table;
3031
pub mod table_provider_factory;
3132

@@ -242,23 +243,27 @@ impl TableProvider for IcebergTableProvider {
242243
let (buckets, all_had_full_key) =
243244
bucketing::bucket_tasks(tasks, n_partitions, keys.as_ref());
244245

245-
let partitioning = match &keys {
246-
Some(keys) if all_had_full_key && n_partitions > 0 => {
247-
Partitioning::Hash(keys.column_exprs(), n_partitions)
248-
}
249-
_ => Partitioning::UnknownPartitioning(n_partitions),
246+
let (partitioning, partition_keys_kind) = match &keys {
247+
Some(keys) if all_had_full_key && n_partitions > 0 => (
248+
Partitioning::Hash(keys.column_exprs(), n_partitions),
249+
Some(keys.kind()),
250+
),
251+
_ => (Partitioning::UnknownPartitioning(n_partitions), None),
250252
};
251253

252-
Ok(Arc::new(IcebergTableScan::new_with_tasks(
253-
table,
254-
None, // Always use current snapshot for catalog-backed provider
255-
self.schema.clone(),
256-
projection,
257-
filters,
258-
limit,
259-
buckets,
260-
partitioning,
261-
)))
254+
Ok(Arc::new(
255+
IcebergTableScan::new_with_tasks(
256+
table,
257+
None, // Always use current snapshot for catalog-backed provider
258+
self.schema.clone(),
259+
projection,
260+
filters,
261+
limit,
262+
buckets,
263+
partitioning,
264+
)
265+
.with_partition_keys_kind(partition_keys_kind),
266+
))
262267
}
263268

264269
fn supports_filters_pushdown(
@@ -1326,6 +1331,10 @@ mod tests {
13261331
}
13271332
other => panic!("expected Partitioning::Hash, got {other:?}"),
13281333
}
1334+
assert_eq!(
1335+
scan.partition_keys_kind(),
1336+
Some(super::PartitionKeysKind::Identity),
1337+
);
13291338
}
13301339

13311340
/// A projection that omits the partition source column drops
@@ -1358,6 +1367,7 @@ mod tests {
13581367
scan.properties().partitioning,
13591368
Partitioning::UnknownPartitioning(_)
13601369
));
1370+
assert_eq!(scan.partition_keys_kind(), None);
13611371
}
13621372

13631373
// ── Bucket-transform partitioning tests ─────────────────────────────────
@@ -1516,6 +1526,10 @@ mod tests {
15161526
}
15171527
other => panic!("expected Partitioning::Hash, got {other:?}"),
15181528
}
1529+
assert_eq!(
1530+
scan.partition_keys_kind(),
1531+
Some(super::PartitionKeysKind::Bucket),
1532+
);
15191533
}
15201534

15211535
/// Single-column bucket spec where the projection excludes the *only*
@@ -1556,6 +1570,7 @@ mod tests {
15561570
scan.properties().partitioning,
15571571
Partitioning::UnknownPartitioning(_)
15581572
));
1573+
assert_eq!(scan.partition_keys_kind(), None);
15591574
}
15601575

15611576
/// A `None` partition slot makes `bucket_hash` return `None`, so the
@@ -1587,6 +1602,7 @@ mod tests {
15871602
scan.properties().partitioning,
15881603
Partitioning::UnknownPartitioning(_)
15891604
));
1605+
assert_eq!(scan.partition_keys_kind(), None);
15901606
}
15911607

15921608
/// Mixed `Bucket[N] + Truncate(_)` spec: `compute_bucket_cols` rejects
@@ -1693,6 +1709,7 @@ mod tests {
16931709
scan.properties().partitioning,
16941710
Partitioning::UnknownPartitioning(_)
16951711
));
1712+
assert_eq!(scan.partition_keys_kind(), None);
16961713
}
16971714

16981715
/// Mixed `Identity + Bucket` spec must keep the existing behaviour:
@@ -1825,6 +1842,10 @@ mod tests {
18251842
}
18261843
other => panic!("expected Partitioning::Hash, got {other:?}"),
18271844
}
1845+
assert_eq!(
1846+
scan.partition_keys_kind(),
1847+
Some(super::PartitionKeysKind::Identity),
1848+
);
18281849
}
18291850

18301851
/// Pure `Bucket[N]` with `target_partitions == N`: tasks must land

0 commit comments

Comments
 (0)