Skip to content

Commit 25af0c7

Browse files
committed
feat(datafusion): declare Hash partitioning for pure bucket-transform specs
Extend scan-time partition detection in IcebergTableProvider so that a default partition spec whose every field is a `Transform::Bucket(_)` is exposed to DataFusion as `Partitioning::Hash([source_cols], n)`. This lets the planner skip a `RepartitionExec` for GROUP BY / joins on the bucket source column, mirroring the existing identity-transform path. Correctness: DataFusion's `EquivalenceProperties::is_partition_satisfied` compares `Partitioning::Hash` against `Distribution::HashPartitioned` by expression equality only, not by the underlying hash function. Iceberg `bucket[N]` already co-locates same-source-value rows at the file level (same value -> same bucket index -> same files); the task distributor sends every unique bucket index to a single DataFusion partition, so co-location is preserved at the row level. - bucketing.rs: add `BucketCol`, `compute_bucket_cols` (pure-bucket only, rejects spec evolution / mixed transforms / missing source), and a `PartitionKeys::{Identity, Bucket}` wrapper used by `bucket_tasks`. - `bucket_tasks` now hashes the `i32` bucket-index slot (always Int32 per spec) for the bucket variant, keeping the identity branch unchanged. - `compute_partition_keys` tries identity first, so mixed identity+bucket specs keep the current identity-only Hash behaviour. - table/mod.rs::scan(): use `compute_partition_keys` + `column_exprs()` instead of inlining the identity-only branch. - Five new tests: pure-bucket Hash declaration, projection excluding the source, null partition slot fallback, mixed bucket+truncate fallback, and identity+bucket regression lock. (cherry picked from commit d4f1170)
1 parent 7c4e63d commit 25af0c7

2 files changed

Lines changed: 622 additions & 33 deletions

File tree

crates/integrations/datafusion/src/table/bucketing.rs

Lines changed: 146 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ use datafusion::arrow::array::{
2323
};
2424
use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema};
2525
use datafusion::common::hash_utils::create_hashes;
26+
use datafusion::physical_expr::PhysicalExpr;
27+
use datafusion::physical_expr::expressions::Column;
2628
use datafusion::physical_plan::repartition::REPARTITION_RANDOM_STATE;
2729
use iceberg::scan::FileScanTask;
2830
use iceberg::spec::{Literal, PrimitiveLiteral, Transform};
@@ -96,27 +98,128 @@ fn is_supported_dtype(dt: &DataType) -> bool {
9698
)
9799
}
98100

99-
/// Distribute `tasks` across `n_partitions` buckets. When `identity_cols`
100-
/// describes a non-empty, hashable identity key, each task is hashed on
101-
/// that key using DataFusion's repartition hash so the resulting partitioning
102-
/// matches what `RepartitionExec` would produce on the same data. Tasks
103-
/// missing partition data fall back to hashing `data_file_path`, which still
104-
/// distributes evenly but breaks the `Hash` contract — the second tuple
105-
/// element flags whether every task supplied a full identity key.
101+
/// Spec field with `Transform::Bucket(_)`. The source column must be in the
102+
/// output projection so we can reference it via `Column` in `Partitioning::Hash`.
103+
/// We don't need the Arrow type because the partition tuple slot for a bucket
104+
/// transform is always `Int32` (the spec-defined `result_type`).
105+
pub(super) struct BucketCol {
106+
pub(super) name: String,
107+
/// Position of this column in the *output* schema (after projection).
108+
pub(super) output_idx: usize,
109+
/// Position of this column inside the partition spec's `fields()` slice,
110+
/// matching the slot order of `FileScanTask::partition`.
111+
pub(super) spec_field_idx: usize,
112+
}
113+
114+
/// Inspect the table's default partition spec and return the list of bucket
115+
/// columns when the spec is *purely* bucketed: every field must be a
116+
/// `Transform::Bucket(_)` and every source column must be present in the
117+
/// output projection. Returns `None` otherwise (mixed transforms, spec
118+
/// evolution, missing source column, or empty spec).
119+
///
120+
/// This deliberately rejects mixed identity+bucket specs: those are handled
121+
/// by [`compute_identity_cols`] which retains only the identity fields.
122+
pub(super) fn compute_bucket_cols(
123+
table: &Table,
124+
output_schema: &ArrowSchema,
125+
) -> Option<Vec<BucketCol>> {
126+
let metadata = table.metadata();
127+
if metadata.partition_specs_iter().len() > 1 {
128+
return None;
129+
}
130+
let spec = metadata.default_partition_spec();
131+
let fields = spec.fields();
132+
if fields.is_empty() {
133+
return None;
134+
}
135+
let table_schema = metadata.current_schema();
136+
137+
let mut cols = Vec::with_capacity(fields.len());
138+
for (spec_field_idx, pf) in fields.iter().enumerate() {
139+
if !matches!(pf.transform, Transform::Bucket(_)) {
140+
return None;
141+
}
142+
let source_field = table_schema.field_by_id(pf.source_id)?;
143+
let output_idx = output_schema.index_of(source_field.name.as_str()).ok()?;
144+
cols.push(BucketCol {
145+
name: source_field.name.clone(),
146+
output_idx,
147+
spec_field_idx,
148+
});
149+
}
150+
Some(cols)
151+
}
152+
153+
/// Single-entry partition-key descriptor used by [`bucket_tasks`] and
154+
/// `IcebergTableProvider::scan` to drive both task distribution and the
155+
/// `Partitioning::Hash` declaration.
156+
pub(super) enum PartitionKeys {
157+
Identity(Vec<IdentityCol>),
158+
Bucket(Vec<BucketCol>),
159+
}
160+
161+
impl PartitionKeys {
162+
/// `Column` exprs (one per key column) referencing the *output* schema,
163+
/// suitable for `Partitioning::Hash`.
164+
pub(super) fn column_exprs(&self) -> Vec<Arc<dyn PhysicalExpr>> {
165+
match self {
166+
PartitionKeys::Identity(cols) => cols
167+
.iter()
168+
.map(|c| Arc::new(Column::new(&c.name, c.output_idx)) as Arc<dyn PhysicalExpr>)
169+
.collect(),
170+
PartitionKeys::Bucket(cols) => cols
171+
.iter()
172+
.map(|c| Arc::new(Column::new(&c.name, c.output_idx)) as Arc<dyn PhysicalExpr>)
173+
.collect(),
174+
}
175+
}
176+
}
177+
178+
/// Try identity detection first (preserves the existing behaviour, including
179+
/// extracting identity-only keys from mixed identity+bucket specs). If no
180+
/// identity columns exist, fall back to *pure* bucket detection.
181+
///
182+
/// Why declaring `Hash` is correct for a pure-bucket spec even though the
183+
/// hash function differs from DataFusion's: DataFusion checks
184+
/// `Partitioning::Hash` against `Distribution::HashPartitioned` purely by
185+
/// expression equality, not by the underlying hash function. The contract to
186+
/// honour is "rows with the same key tuple end up in the same partition",
187+
/// which Iceberg `bucket[N]` already guarantees at the file level (same
188+
/// source value implies same bucket index, hence same files), and our
189+
/// task-distribution preserves at the partition level by sending each
190+
/// unique bucket index to a single DataFusion partition.
191+
pub(super) fn compute_partition_keys(
192+
table: &Table,
193+
output_schema: &ArrowSchema,
194+
) -> Option<PartitionKeys> {
195+
if let Some(cols) = compute_identity_cols(table, output_schema)
196+
&& !cols.is_empty()
197+
{
198+
return Some(PartitionKeys::Identity(cols));
199+
}
200+
compute_bucket_cols(table, output_schema).map(PartitionKeys::Bucket)
201+
}
202+
203+
/// Distribute `tasks` across `n_partitions` buckets. When `keys` describes a
204+
/// non-empty, hashable partition key (identity or bucket-index), each task is
205+
/// hashed on that key using DataFusion's repartition random state so the
206+
/// resulting partitioning satisfies the `Hash` contract at the row level.
207+
/// Tasks missing partition data fall back to hashing `data_file_path`, which
208+
/// still distributes evenly but breaks the `Hash` contract: the second tuple
209+
/// element flags whether every task supplied a full key.
106210
pub(super) fn bucket_tasks(
107211
tasks: Vec<FileScanTask>,
108212
n_partitions: usize,
109-
identity_cols: Option<&[IdentityCol]>,
213+
keys: Option<&PartitionKeys>,
110214
) -> (Vec<Vec<FileScanTask>>, bool) {
111215
if n_partitions == 0 {
112216
return (Vec::new(), tasks.is_empty());
113217
}
114218
let mut buckets: Vec<Vec<FileScanTask>> = (0..n_partitions).map(|_| Vec::new()).collect();
115219
let mut all_full_key = true;
116-
let cols = identity_cols.unwrap_or(&[]);
117220

118221
for task in tasks {
119-
let bucket_idx = match identity_hash(&task, cols) {
222+
let bucket_idx = match partition_hash(&task, keys) {
120223
Some(h) => (h % n_partitions as u64) as usize,
121224
None => {
122225
all_full_key = false;
@@ -128,6 +231,13 @@ pub(super) fn bucket_tasks(
128231
(buckets, all_full_key)
129232
}
130233

234+
fn partition_hash(task: &FileScanTask, keys: Option<&PartitionKeys>) -> Option<u64> {
235+
match keys? {
236+
PartitionKeys::Identity(cols) => identity_hash(task, cols),
237+
PartitionKeys::Bucket(cols) => bucket_hash(task, cols),
238+
}
239+
}
240+
131241
/// Hash the identity-partition values of `task` using
132242
/// [`REPARTITION_RANDOM_STATE`] so the bucket assignment matches DataFusion's
133243
/// hash-repartition convention. Returns `None` if the task lacks partition
@@ -142,13 +252,33 @@ fn identity_hash(task: &FileScanTask, cols: &[IdentityCol]) -> Option<u64> {
142252
let lit = partition.fields().get(col.spec_field_idx)?.as_ref()?;
143253
arrays.push(literal_to_array(lit, &col.output_dtype)?);
144254
}
255+
hash_arrays(&arrays)
256+
}
257+
258+
/// Hash the bucket-index values stored in `task`'s partition tuple. The slot
259+
/// for a `Transform::Bucket(_)` field is always an `Int32` per the Iceberg
260+
/// spec, so we materialise it as `Int32Array` regardless of the source
261+
/// column's Arrow type.
262+
fn bucket_hash(task: &FileScanTask, cols: &[BucketCol]) -> Option<u64> {
263+
if cols.is_empty() {
264+
return None;
265+
}
266+
let partition = task.partition.as_ref()?;
267+
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(cols.len());
268+
for col in cols {
269+
let lit = partition.fields().get(col.spec_field_idx)?.as_ref()?;
270+
let idx = match lit {
271+
Literal::Primitive(PrimitiveLiteral::Int(v)) => *v,
272+
_ => return None,
273+
};
274+
arrays.push(Arc::new(Int32Array::from(vec![idx])) as ArrayRef);
275+
}
276+
hash_arrays(&arrays)
277+
}
278+
279+
fn hash_arrays(arrays: &[ArrayRef]) -> Option<u64> {
145280
let mut hashes = vec![0u64; 1];
146-
create_hashes(
147-
&arrays,
148-
REPARTITION_RANDOM_STATE.random_state(),
149-
&mut hashes,
150-
)
151-
.ok()?;
281+
create_hashes(arrays, REPARTITION_RANDOM_STATE.random_state(), &mut hashes).ok()?;
152282
Some(hashes[0])
153283
}
154284

0 commit comments

Comments
 (0)