Skip to content

Commit 0add046

Browse files
perf: hoist split_vec_min_alloc to datafusion-common and shrink the emitted prefix (#22416)
## Which issue does this PR close? Related to #22164 and #22165. ## Rationale for this change #22165 added a `split_vec_min_alloc` helper so that `EmitTo::First(n)` allocates `min(n, len - n)` instead of always copying `n` elements. This PR finishes and corrects that work, following review: - The helper was `pub(super)` inside `datafusion-physical-plan`, so it could not be reused. It moves to `datafusion_common::utils` as the single shared copy. - The `split_off` branch returned the original allocation as the emitted prefix: short length, but original (larger) capacity. datafusion accounts memory by capacity rather than length, so that prefix did not actually release memory under pressure. The helper now calls `shrink_to_fit` on it. - Two further `EmitTo::First(n)` paths still used the `drain(..n).collect()` idiom: the min/max accumulators and `ByteViewGroupValueBuilder::take_n`. Both now route through the shared helper. ## What changes are included in this PR? - `datafusion_common::utils::split_vec_min_alloc`: the shared helper, with `shrink_to_fit` on the emitted prefix. - `datafusion-physical-plan`: the duplicate helper is removed; `bytes.rs`, `primitive.rs` and `bytes_view.rs` use the shared one. - `datafusion-functions-aggregate`: `MinMaxStructAccumulator` and `MinMaxBytesAccumulator` use it in `emit_to`. This supersedes #22205, whose `ByteViewGroupValueBuilder::take_n` change is included here. ## Are these changes tested? Yes. Unit tests for the helper (both branches and the boundaries) plus the existing `take_n` and `min_max` tests pass. ## Are there any user-facing changes? No. --------- Co-authored-by: RyanJamesStewart <RyanJamesStewart@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 7a6b062 commit 0add046

7 files changed

Lines changed: 144 additions & 62 deletions

File tree

datafusion/common/src/utils/mod.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,137 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
395395
count
396396
}
397397

398+
/// Splits `vec` at index `n`, returning the first `n` elements and leaving the
399+
/// remaining `vec.len() - n` elements in `vec`.
400+
///
401+
/// Allocates for whichever side is smaller, so the new allocation is
402+
/// `min(n, vec.len() - n)` rather than always `n` (as `vec.drain(0..n).collect()`
403+
/// would). This matters when the split emits a prefix under memory pressure,
404+
/// where `n` can be close to `vec.len()`.
405+
pub fn split_vec_min_alloc<T>(vec: &mut Vec<T>, n: usize) -> Vec<T> {
406+
if n * 2 <= vec.len() {
407+
vec.drain(0..n).collect()
408+
} else {
409+
let remaining = vec.split_off(n);
410+
std::mem::replace(vec, remaining)
411+
}
412+
}
413+
414+
#[cfg(test)]
415+
mod split_vec_min_alloc_tests {
416+
use super::split_vec_min_alloc;
417+
418+
#[test]
419+
fn drain_branch() {
420+
// n * 2 <= len -> drain+collect branch (allocates n elements)
421+
let mut v = vec![1, 2, 3, 4, 5, 6];
422+
let first = split_vec_min_alloc(&mut v, 2);
423+
assert_eq!(first, vec![1, 2]);
424+
assert_eq!(v, vec![3, 4, 5, 6]);
425+
}
426+
427+
#[test]
428+
fn split_off_branch() {
429+
// remaining < n -> split_off+replace branch (allocates remaining elements)
430+
let mut v = vec![1, 2, 3, 4, 5, 6];
431+
let first = split_vec_min_alloc(&mut v, 4);
432+
assert_eq!(first, vec![1, 2, 3, 4]);
433+
assert_eq!(v, vec![5, 6]);
434+
}
435+
436+
#[test]
437+
fn exactly_half() {
438+
// n * 2 == len -> drain branch (boundary)
439+
let mut v = vec![1, 2, 3, 4];
440+
let first = split_vec_min_alloc(&mut v, 2);
441+
assert_eq!(first, vec![1, 2]);
442+
assert_eq!(v, vec![3, 4]);
443+
}
444+
445+
#[test]
446+
fn take_all() {
447+
let mut v = vec![1, 2, 3];
448+
let first = split_vec_min_alloc(&mut v, 3);
449+
assert_eq!(first, vec![1, 2, 3]);
450+
assert!(v.is_empty());
451+
}
452+
453+
#[test]
454+
fn take_none() {
455+
let mut v = vec![1, 2, 3];
456+
let first = split_vec_min_alloc(&mut v, 0);
457+
assert!(first.is_empty());
458+
assert_eq!(v, vec![1, 2, 3]);
459+
}
460+
461+
#[test]
462+
fn emitted_prefix_does_not_realloc_on_push() {
463+
// Demonstrates *why* the split-off branch must NOT call `shrink_to_fit`.
464+
//
465+
// Downstream callers (e.g. `multi_group_by/bytes.rs`, which does
466+
// `first_n_offsets.push(offset_n)` right after the split) push onto the
467+
// emitted prefix immediately. The split-off branch hands the original
468+
// backing allocation to that prefix, so the prefix already has spare
469+
// capacity for the very next push.
470+
//
471+
// If we shrank the prefix to fit, that next push would have to
472+
// reallocate, and Vec's growth strategy would land it at a *larger*
473+
// capacity than the original allocation we started with -- the opposite
474+
// of the memory saving `shrink_to_fit` was meant to deliver.
475+
476+
// A Vec with a known, deliberately large capacity. n*2 > len, so this
477+
// takes the split-off branch.
478+
let mut v: Vec<u32> = Vec::with_capacity(64);
479+
v.extend(0..10);
480+
let original_capacity = v.capacity();
481+
assert!(original_capacity >= 64);
482+
483+
// Emit a prefix that is most of the Vec (n = 8, remaining = 2).
484+
let mut prefix = split_vec_min_alloc(&mut v, 8);
485+
assert_eq!(prefix, vec![0, 1, 2, 3, 4, 5, 6, 7]);
486+
487+
// The split-off branch moved the original backing store into `prefix`,
488+
// so it keeps the original (large) capacity -- no shrink happened.
489+
assert_eq!(
490+
prefix.capacity(),
491+
original_capacity,
492+
"split-off branch must hand the original allocation to the prefix"
493+
);
494+
495+
// The caller's very next operation: push one element onto the prefix.
496+
prefix.push(99);
497+
498+
// Because the capacity was preserved, the push reused the existing
499+
// allocation: post-push capacity is unchanged and still <= original.
500+
// This is the realloc that `shrink_to_fit` would have forced.
501+
assert_eq!(
502+
prefix.capacity(),
503+
original_capacity,
504+
"push must reuse the preserved allocation (no realloc)"
505+
);
506+
assert!(prefix.capacity() <= original_capacity);
507+
508+
// Counter-demonstration: had we shrunk the prefix to fit (capacity 8),
509+
// the same push would have reallocated. Vec doubles on growth, so the
510+
// post-push capacity (16) ends up LARGER than where a length-8 prefix
511+
// started -- and we paid a realloc for it.
512+
let mut shrunk: Vec<u32> = prefix[..8].to_vec();
513+
shrunk.shrink_to_fit();
514+
let shrunk_capacity = shrink_then_push_capacity(&mut shrunk);
515+
assert!(
516+
shrunk_capacity > 8,
517+
"shrink-to-fit then push reallocates to a larger capacity"
518+
);
519+
}
520+
521+
/// Helper for the counter-demonstration above: push one element and report
522+
/// the resulting capacity.
523+
fn shrink_then_push_capacity(v: &mut Vec<u32>) -> usize {
524+
v.push(99);
525+
v.capacity()
526+
}
527+
}
528+
398529
/// Creates single element [`ListArray`], [`LargeListArray`] and
399530
/// [`FixedSizeListArray`] from other arrays
400531
///

datafusion/functions-aggregate/src/min_max/min_max_bytes.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls:
2727
use std::mem::size_of;
2828
use std::sync::Arc;
2929

30+
use datafusion_common::utils::split_vec_min_alloc;
31+
3032
/// Implements fast Min/Max [`GroupsAccumulator`] for "bytes" types ([`StringArray`],
3133
/// [`BinaryArray`], [`StringViewArray`], etc)
3234
///
@@ -493,7 +495,7 @@ impl MinMaxBytesState {
493495
)
494496
}
495497
EmitTo::First(n) => {
496-
let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect();
498+
let first_min_maxes = split_vec_min_alloc(&mut self.min_max, n);
497499
let first_data_capacity: usize = first_min_maxes
498500
.iter()
499501
.map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0))

datafusion/functions-aggregate/src/min_max/min_max_struct.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ use datafusion_common::{
3030
use datafusion_expr::{EmitTo, GroupsAccumulator};
3131
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::apply_filter_as_nulls;
3232

33+
use datafusion_common::utils::split_vec_min_alloc;
34+
3335
/// Accumulator for MIN/MAX operations on Struct data types.
3436
///
3537
/// This accumulator tracks the minimum or maximum struct value encountered
@@ -282,7 +284,7 @@ impl MinMaxStructState {
282284
)
283285
}
284286
EmitTo::First(n) => {
285-
let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect();
287+
let first_min_maxes = split_vec_min_alloc(&mut self.min_max, n);
286288
let first_data_capacity: usize = first_min_maxes
287289
.iter()
288290
.map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0))

datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use crate::aggregates::group_values::multi_group_by::{
19-
GroupColumn, Nulls, nulls_equal_to, split_vec_min_alloc,
19+
GroupColumn, Nulls, nulls_equal_to,
2020
};
2121
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
2222
use arrow::array::{
@@ -26,6 +26,7 @@ use arrow::array::{
2626
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
2727
use arrow::datatypes::{ByteArrayType, DataType, GenericBinaryType};
2828
use datafusion_common::utils::proxy::VecAllocExt;
29+
use datafusion_common::utils::split_vec_min_alloc;
2930
use datafusion_common::{Result, exec_datafusion_err};
3031
use datafusion_physical_expr_common::binary_map::{INITIAL_BUFFER_CAPACITY, OutputType};
3132
use std::mem::size_of;

datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use arrow::array::{
2626
use arrow::buffer::{Buffer, ScalarBuffer};
2727
use arrow::datatypes::ByteViewType;
2828
use datafusion_common::Result;
29+
use datafusion_common::utils::split_vec_min_alloc;
2930
use std::marker::PhantomData;
3031
use std::mem::{replace, size_of};
3132
use std::sync::Arc;
@@ -363,7 +364,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
363364
//
364365
// - Shift the `buffer index` of remaining non-inlined `views`
365366
//
366-
let first_n_views = self.views.drain(0..n).collect::<Vec<_>>();
367+
let first_n_views = split_vec_min_alloc(&mut self.views, n);
367368

368369
let last_non_inlined_view = first_n_views
369370
.iter()

datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -107,19 +107,6 @@ pub trait GroupColumn: Send + Sync {
107107
fn take_n(&mut self, n: usize) -> ArrayRef;
108108
}
109109

110-
/// Splits `vec` at `n`, returning the first `n` elements and leaving the
111-
/// remainder in `vec`. Allocates for whichever portion is smaller to minimize
112-
/// peak memory: `drain+collect` when `n <= remaining`, `split_off+replace`
113-
/// when `remaining < n`.
114-
pub(super) fn split_vec_min_alloc<T>(vec: &mut Vec<T>, n: usize) -> Vec<T> {
115-
if n * 2 <= vec.len() {
116-
vec.drain(0..n).collect()
117-
} else {
118-
let remaining = vec.split_off(n);
119-
mem::replace(vec, remaining)
120-
}
121-
}
122-
123110
/// Determines if the nullability of the existing and new input array can be used
124111
/// to short-circuit the comparison of the two values.
125112
///
@@ -1285,50 +1272,7 @@ mod tests {
12851272
GroupValues, multi_group_by::GroupValuesColumn,
12861273
};
12871274

1288-
use super::{GroupIndexView, split_vec_min_alloc};
1289-
1290-
#[test]
1291-
fn test_split_vec_min_alloc_drain_branch() {
1292-
// n * 2 <= len → drain+collect branch (allocates n elements)
1293-
let mut v = vec![1, 2, 3, 4, 5, 6];
1294-
let first = split_vec_min_alloc(&mut v, 2);
1295-
assert_eq!(first, vec![1, 2]);
1296-
assert_eq!(v, vec![3, 4, 5, 6]);
1297-
}
1298-
1299-
#[test]
1300-
fn test_split_vec_min_alloc_split_off_branch() {
1301-
// remaining < n → split_off+replace branch (allocates remaining elements)
1302-
let mut v = vec![1, 2, 3, 4, 5, 6];
1303-
let first = split_vec_min_alloc(&mut v, 4);
1304-
assert_eq!(first, vec![1, 2, 3, 4]);
1305-
assert_eq!(v, vec![5, 6]);
1306-
}
1307-
1308-
#[test]
1309-
fn test_split_vec_min_alloc_exactly_half() {
1310-
// n * 2 == len → drain branch (boundary condition)
1311-
let mut v = vec![1, 2, 3, 4];
1312-
let first = split_vec_min_alloc(&mut v, 2);
1313-
assert_eq!(first, vec![1, 2]);
1314-
assert_eq!(v, vec![3, 4]);
1315-
}
1316-
1317-
#[test]
1318-
fn test_split_vec_min_alloc_take_all() {
1319-
let mut v = vec![1, 2, 3];
1320-
let first = split_vec_min_alloc(&mut v, 3);
1321-
assert_eq!(first, vec![1, 2, 3]);
1322-
assert!(v.is_empty());
1323-
}
1324-
1325-
#[test]
1326-
fn test_split_vec_min_alloc_take_none() {
1327-
let mut v = vec![1, 2, 3];
1328-
let first = split_vec_min_alloc(&mut v, 0);
1329-
assert!(first.is_empty());
1330-
assert_eq!(v, vec![1, 2, 3]);
1331-
}
1275+
use super::GroupIndexView;
13321276

13331277
#[test]
13341278
fn test_intern_for_vectorized_group_values() {

datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use crate::aggregates::group_values::multi_group_by::{
19-
GroupColumn, Nulls, nulls_equal_to, split_vec_min_alloc,
19+
GroupColumn, Nulls, nulls_equal_to,
2020
};
2121
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
2222
use arrow::array::ArrowNativeTypeOp;
@@ -28,6 +28,7 @@ use arrow::buffer::ScalarBuffer;
2828
use arrow::datatypes::DataType;
2929
use arrow::util::bit_util::apply_bitwise_binary_op;
3030
use datafusion_common::Result;
31+
use datafusion_common::utils::split_vec_min_alloc;
3132
use datafusion_execution::memory_pool::proxy::VecAllocExt;
3233
use std::iter;
3334
use std::sync::Arc;

0 commit comments

Comments
 (0)