Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,137 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
count
}

/// Splits `vec` at index `n`, returning the first `n` elements and leaving the
/// remaining `vec.len() - n` elements in `vec`.
///
/// Allocates for whichever side is smaller, so the new allocation is
/// `min(n, vec.len() - n)` rather than always `n` (as `vec.drain(0..n).collect()`
/// would). This matters when the split emits a prefix under memory pressure,
/// where `n` can be close to `vec.len()`.
pub fn split_vec_min_alloc<T>(vec: &mut Vec<T>, n: usize) -> Vec<T> {
if n * 2 <= vec.len() {
vec.drain(0..n).collect()
} else {
let remaining = vec.split_off(n);
Comment thread
alamb marked this conversation as resolved.
std::mem::replace(vec, remaining)
}
}

#[cfg(test)]
mod split_vec_min_alloc_tests {
use super::split_vec_min_alloc;

#[test]
fn drain_branch() {
// n * 2 <= len -> drain+collect branch (allocates n elements)
let mut v = vec![1, 2, 3, 4, 5, 6];
let first = split_vec_min_alloc(&mut v, 2);
assert_eq!(first, vec![1, 2]);
assert_eq!(v, vec![3, 4, 5, 6]);
}

#[test]
fn split_off_branch() {
// remaining < n -> split_off+replace branch (allocates remaining elements)
let mut v = vec![1, 2, 3, 4, 5, 6];
let first = split_vec_min_alloc(&mut v, 4);
assert_eq!(first, vec![1, 2, 3, 4]);
assert_eq!(v, vec![5, 6]);
}

#[test]
fn exactly_half() {
// n * 2 == len -> drain branch (boundary)
let mut v = vec![1, 2, 3, 4];
let first = split_vec_min_alloc(&mut v, 2);
assert_eq!(first, vec![1, 2]);
assert_eq!(v, vec![3, 4]);
}

#[test]
fn take_all() {
let mut v = vec![1, 2, 3];
let first = split_vec_min_alloc(&mut v, 3);
assert_eq!(first, vec![1, 2, 3]);
assert!(v.is_empty());
}

#[test]
fn take_none() {
let mut v = vec![1, 2, 3];
let first = split_vec_min_alloc(&mut v, 0);
assert!(first.is_empty());
assert_eq!(v, vec![1, 2, 3]);
}

#[test]
fn emitted_prefix_does_not_realloc_on_push() {
// Demonstrates *why* the split-off branch must NOT call `shrink_to_fit`.
//
// Downstream callers (e.g. `multi_group_by/bytes.rs`, which does
// `first_n_offsets.push(offset_n)` right after the split) push onto the
// emitted prefix immediately. The split-off branch hands the original
// backing allocation to that prefix, so the prefix already has spare
// capacity for the very next push.
//
// If we shrank the prefix to fit, that next push would have to
// reallocate, and Vec's growth strategy would land it at a *larger*
// capacity than the original allocation we started with -- the opposite
// of the memory saving `shrink_to_fit` was meant to deliver.

// A Vec with a known, deliberately large capacity. n*2 > len, so this
// takes the split-off branch.
let mut v: Vec<u32> = Vec::with_capacity(64);
v.extend(0..10);
let original_capacity = v.capacity();
assert!(original_capacity >= 64);

// Emit a prefix that is most of the Vec (n = 8, remaining = 2).
let mut prefix = split_vec_min_alloc(&mut v, 8);
assert_eq!(prefix, vec![0, 1, 2, 3, 4, 5, 6, 7]);

// The split-off branch moved the original backing store into `prefix`,
// so it keeps the original (large) capacity -- no shrink happened.
assert_eq!(
prefix.capacity(),
original_capacity,
"split-off branch must hand the original allocation to the prefix"
);

// The caller's very next operation: push one element onto the prefix.
prefix.push(99);

// Because the capacity was preserved, the push reused the existing
// allocation: post-push capacity is unchanged and still <= original.
// This is the realloc that `shrink_to_fit` would have forced.
assert_eq!(
prefix.capacity(),
original_capacity,
"push must reuse the preserved allocation (no realloc)"
);
assert!(prefix.capacity() <= original_capacity);

// Counter-demonstration: had we shrunk the prefix to fit (capacity 8),
// the same push would have reallocated. Vec doubles on growth, so the
// post-push capacity (16) ends up LARGER than where a length-8 prefix
// started -- and we paid a realloc for it.
let mut shrunk: Vec<u32> = prefix[..8].to_vec();
shrunk.shrink_to_fit();
let shrunk_capacity = shrink_then_push_capacity(&mut shrunk);
assert!(
shrunk_capacity > 8,
"shrink-to-fit then push reallocates to a larger capacity"
);
}

/// Helper for the counter-demonstration above: push one element and report
/// the resulting capacity.
fn shrink_then_push_capacity(v: &mut Vec<u32>) -> usize {
v.push(99);
v.capacity()
}
}

/// Creates single element [`ListArray`], [`LargeListArray`] and
/// [`FixedSizeListArray`] from other arrays
///
Expand Down
4 changes: 3 additions & 1 deletion datafusion/functions-aggregate/src/min_max/min_max_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls:
use std::mem::size_of;
use std::sync::Arc;

use datafusion_common::utils::split_vec_min_alloc;

/// Implements fast Min/Max [`GroupsAccumulator`] for "bytes" types ([`StringArray`],
/// [`BinaryArray`], [`StringViewArray`], etc)
///
Expand Down Expand Up @@ -493,7 +495,7 @@ impl MinMaxBytesState {
)
}
EmitTo::First(n) => {
let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect();
let first_min_maxes = split_vec_min_alloc(&mut self.min_max, n);
let first_data_capacity: usize = first_min_maxes
.iter()
.map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0))
Expand Down
4 changes: 3 additions & 1 deletion datafusion/functions-aggregate/src/min_max/min_max_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use datafusion_common::{
use datafusion_expr::{EmitTo, GroupsAccumulator};
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::apply_filter_as_nulls;

use datafusion_common::utils::split_vec_min_alloc;

/// Accumulator for MIN/MAX operations on Struct data types.
///
/// This accumulator tracks the minimum or maximum struct value encountered
Expand Down Expand Up @@ -282,7 +284,7 @@ impl MinMaxStructState {
)
}
EmitTo::First(n) => {
let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect();
let first_min_maxes = split_vec_min_alloc(&mut self.min_max, n);
let first_data_capacity: usize = first_min_maxes
.iter()
.map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::aggregates::group_values::multi_group_by::{
GroupColumn, Nulls, nulls_equal_to, split_vec_min_alloc,
GroupColumn, Nulls, nulls_equal_to,
};
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow::array::{
Expand All @@ -26,6 +26,7 @@ use arrow::array::{
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{ByteArrayType, DataType, GenericBinaryType};
use datafusion_common::utils::proxy::VecAllocExt;
use datafusion_common::utils::split_vec_min_alloc;
use datafusion_common::{Result, exec_datafusion_err};
use datafusion_physical_expr_common::binary_map::{INITIAL_BUFFER_CAPACITY, OutputType};
use std::mem::size_of;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow::array::{
use arrow::buffer::{Buffer, ScalarBuffer};
use arrow::datatypes::ByteViewType;
use datafusion_common::Result;
use datafusion_common::utils::split_vec_min_alloc;
use std::marker::PhantomData;
use std::mem::{replace, size_of};
use std::sync::Arc;
Expand Down Expand Up @@ -363,7 +364,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
//
// - Shift the `buffer index` of remaining non-inlined `views`
//
let first_n_views = self.views.drain(0..n).collect::<Vec<_>>();
let first_n_views = split_vec_min_alloc(&mut self.views, n);

let last_non_inlined_view = first_n_views
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,6 @@ pub trait GroupColumn: Send + Sync {
fn take_n(&mut self, n: usize) -> ArrayRef;
}

/// Splits `vec` at `n`, returning the first `n` elements and leaving the
/// remainder in `vec`. Allocates for whichever portion is smaller to minimize
/// peak memory: `drain+collect` when `n <= remaining`, `split_off+replace`
/// when `remaining < n`.
pub(super) fn split_vec_min_alloc<T>(vec: &mut Vec<T>, n: usize) -> Vec<T> {
if n * 2 <= vec.len() {
vec.drain(0..n).collect()
} else {
let remaining = vec.split_off(n);
mem::replace(vec, remaining)
}
}

/// Determines if the nullability of the existing and new input array can be used
/// to short-circuit the comparison of the two values.
///
Expand Down Expand Up @@ -1285,50 +1272,7 @@ mod tests {
GroupValues, multi_group_by::GroupValuesColumn,
};

use super::{GroupIndexView, split_vec_min_alloc};

#[test]
fn test_split_vec_min_alloc_drain_branch() {
// n * 2 <= len → drain+collect branch (allocates n elements)
let mut v = vec![1, 2, 3, 4, 5, 6];
let first = split_vec_min_alloc(&mut v, 2);
assert_eq!(first, vec![1, 2]);
assert_eq!(v, vec![3, 4, 5, 6]);
}

#[test]
fn test_split_vec_min_alloc_split_off_branch() {
// remaining < n → split_off+replace branch (allocates remaining elements)
let mut v = vec![1, 2, 3, 4, 5, 6];
let first = split_vec_min_alloc(&mut v, 4);
assert_eq!(first, vec![1, 2, 3, 4]);
assert_eq!(v, vec![5, 6]);
}

#[test]
fn test_split_vec_min_alloc_exactly_half() {
// n * 2 == len → drain branch (boundary condition)
let mut v = vec![1, 2, 3, 4];
let first = split_vec_min_alloc(&mut v, 2);
assert_eq!(first, vec![1, 2]);
assert_eq!(v, vec![3, 4]);
}

#[test]
fn test_split_vec_min_alloc_take_all() {
let mut v = vec![1, 2, 3];
let first = split_vec_min_alloc(&mut v, 3);
assert_eq!(first, vec![1, 2, 3]);
assert!(v.is_empty());
}

#[test]
fn test_split_vec_min_alloc_take_none() {
let mut v = vec![1, 2, 3];
let first = split_vec_min_alloc(&mut v, 0);
assert!(first.is_empty());
assert_eq!(v, vec![1, 2, 3]);
}
use super::GroupIndexView;

#[test]
fn test_intern_for_vectorized_group_values() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::aggregates::group_values::multi_group_by::{
GroupColumn, Nulls, nulls_equal_to, split_vec_min_alloc,
GroupColumn, Nulls, nulls_equal_to,
};
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow::array::ArrowNativeTypeOp;
Expand All @@ -28,6 +28,7 @@ use arrow::buffer::ScalarBuffer;
use arrow::datatypes::DataType;
use arrow::util::bit_util::apply_bitwise_binary_op;
use datafusion_common::Result;
use datafusion_common::utils::split_vec_min_alloc;
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use std::iter;
use std::sync::Arc;
Expand Down
Loading