Skip to content

Commit 2ebcfa8

Browse files
Dandandanclaude
andcommitted
Add preallocate to GroupsAccumulator trait and key implementations
Add `preallocate(total_num_groups)` method to the `GroupsAccumulator` trait (default no-op) and implement it for: - PrimitiveGroupsAccumulator (SUM, MIN, MAX, etc.) - CountGroupsAccumulator - VarianceGroupsAccumulator - CorrelationGroupsAccumulator Call preallocate on all accumulators in GroupedHashAggregateStream when NDV capacity hint is available. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 104e098 commit 2ebcfa8

6 files changed

Lines changed: 40 additions & 1 deletion

File tree

datafusion/expr-common/src/groups_accumulator.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,16 @@ pub trait GroupsAccumulator: Send + std::any::Any {
247247
false
248248
}
249249

250+
/// Pre-allocates internal storage for the given number of groups.
251+
///
252+
/// This is an optional optimization hint. When statistics (such as NDV
253+
/// from Parquet metadata) predict the number of distinct groups, calling
254+
/// this before the first `update_batch` avoids repeated resizing of
255+
/// internal vectors.
256+
///
257+
/// The default implementation is a no-op.
258+
fn preallocate(&mut self, _total_num_groups: usize) {}
259+
250260
/// Amount of memory used to store the state of this accumulator,
251261
/// in bytes.
252262
///

datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,10 @@ where
195195
true
196196
}
197197

198+
fn preallocate(&mut self, total_num_groups: usize) {
199+
self.values.resize(total_num_groups, self.starting_value);
200+
}
201+
198202
fn size(&self) -> usize {
199203
self.values.capacity() * size_of::<T::Native>() + self.null_state.size()
200204
}

datafusion/functions-aggregate/src/correlation.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,15 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
536536
Ok(())
537537
}
538538

539+
fn preallocate(&mut self, total_num_groups: usize) {
540+
self.count.resize(total_num_groups, 0);
541+
self.sum_x.resize(total_num_groups, 0.0);
542+
self.sum_y.resize(total_num_groups, 0.0);
543+
self.sum_xy.resize(total_num_groups, 0.0);
544+
self.sum_xx.resize(total_num_groups, 0.0);
545+
self.sum_yy.resize(total_num_groups, 0.0);
546+
}
547+
539548
fn size(&self) -> usize {
540549
self.count.capacity() * size_of::<u64>()
541550
+ self.sum_x.capacity() * size_of::<f64>()

datafusion/functions-aggregate/src/count.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,10 @@ impl GroupsAccumulator for CountGroupsAccumulator {
712712
true
713713
}
714714

715+
fn preallocate(&mut self, total_num_groups: usize) {
716+
self.counts.resize(total_num_groups, 0);
717+
}
718+
715719
fn size(&self) -> usize {
716720
self.counts.capacity() * size_of::<usize>()
717721
}

datafusion/functions-aggregate/src/variance.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,12 @@ impl GroupsAccumulator for VarianceGroupsAccumulator {
575575
])
576576
}
577577

578+
fn preallocate(&mut self, total_num_groups: usize) {
579+
self.m2s.resize(total_num_groups, 0.0);
580+
self.means.resize(total_num_groups, 0.0);
581+
self.counts.resize(total_num_groups, 0);
582+
}
583+
578584
fn size(&self) -> usize {
579585
self.m2s.capacity() * size_of::<f64>()
580586
+ self.means.capacity() * size_of::<f64>()

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ impl GroupedHashAggregateStream {
498498
};
499499

500500
// Instantiate the accumulators
501-
let accumulators: Vec<_> = aggregate_exprs
501+
let mut accumulators: Vec<_> = aggregate_exprs
502502
.iter()
503503
.map(create_group_accumulator)
504504
.collect::<Result<_>>()?;
@@ -597,6 +597,12 @@ impl GroupedHashAggregateStream {
597597
.and_then(|stats| agg.compute_group_ndv(&stats))
598598
.map(|ndv: usize| ndv.min(MAX_NDV_CAPACITY));
599599

600+
if let Some(capacity) = capacity_hint {
601+
for acc in &mut accumulators {
602+
acc.preallocate(capacity);
603+
}
604+
}
605+
600606
let group_values =
601607
new_group_values(group_schema, &group_ordering, capacity_hint)?;
602608
let reservation = MemoryConsumer::new(name)

0 commit comments

Comments
 (0)