Skip to content

Commit 65d51e8

Browse files
committed
feat: move split_vec_min_alloc to datafusion-common
1 parent 0d99460 commit 65d51e8

4 files changed

Lines changed: 62 additions & 61 deletions

File tree

datafusion/common/src/utils/mod.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use std::num::NonZero;
4141
use std::ops::Range;
4242
use std::sync::{Arc, LazyLock};
4343
use std::thread::available_parallelism;
44+
use std::mem;
4445

4546
/// Applies an optional projection to a [`SchemaRef`], returning the
4647
/// projected schema
@@ -569,6 +570,19 @@ pub fn base_type(data_type: &DataType) -> DataType {
569570
}
570571
}
571572

573+
/// Splits `vec` at `n`, returning the first `n` elements and leaving the
574+
/// remainder in `vec`. Allocates for whichever portion is smaller to minimize
575+
/// peak memory: `drain+collect` when `n <= remaining`, `split_off+replace`
576+
/// when `remaining < n`.
577+
pub fn split_vec_min_alloc<T>(vec: &mut Vec<T>, n: usize) -> Vec<T> {
578+
if n * 2 <= vec.len() {
579+
vec.drain(0..n).collect()
580+
} else {
581+
let remaining = vec.split_off(n);
582+
mem::replace(vec, remaining)
583+
}
584+
}
585+
572586
/// Information about how to coerce lists.
573587
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
574588
pub enum ListCoercion {
@@ -1261,6 +1275,49 @@ mod tests {
12611275
assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
12621276
}
12631277

1278+
#[test]
1279+
fn test_split_vec_min_alloc_drain_branch() {
1280+
// n * 2 <= len → drain+collect branch (allocates n elements)
1281+
let mut v = vec![1, 2, 3, 4, 5, 6];
1282+
let first = split_vec_min_alloc(&mut v, 2);
1283+
assert_eq!(first, vec![1, 2]);
1284+
assert_eq!(v, vec![3, 4, 5, 6]);
1285+
}
1286+
1287+
#[test]
1288+
fn test_split_vec_min_alloc_split_off_branch() {
1289+
// remaining < n → split_off+replace branch (allocates remaining elements)
1290+
let mut v = vec![1, 2, 3, 4, 5, 6];
1291+
let first = split_vec_min_alloc(&mut v, 4);
1292+
assert_eq!(first, vec![1, 2, 3, 4]);
1293+
assert_eq!(v, vec![5, 6]);
1294+
}
1295+
1296+
#[test]
1297+
fn test_split_vec_min_alloc_exactly_half() {
1298+
// n * 2 == len → drain branch (boundary condition)
1299+
let mut v = vec![1, 2, 3, 4];
1300+
let first = split_vec_min_alloc(&mut v, 2);
1301+
assert_eq!(first, vec![1, 2]);
1302+
assert_eq!(v, vec![3, 4]);
1303+
}
1304+
1305+
#[test]
1306+
fn test_split_vec_min_alloc_take_all() {
1307+
let mut v = vec![1, 2, 3];
1308+
let first = split_vec_min_alloc(&mut v, 3);
1309+
assert_eq!(first, vec![1, 2, 3]);
1310+
assert!(v.is_empty());
1311+
}
1312+
1313+
#[test]
1314+
fn test_split_vec_min_alloc_take_none() {
1315+
let mut v = vec![1, 2, 3];
1316+
let first = split_vec_min_alloc(&mut v, 0);
1317+
assert!(first.is_empty());
1318+
assert_eq!(v, vec![1, 2, 3]);
1319+
}
1320+
12641321
#[test]
12651322
fn test_find_indices() -> Result<()> {
12661323
assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);

datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use crate::aggregates::group_values::multi_group_by::{
19-
GroupColumn, Nulls, nulls_equal_to, split_vec_min_alloc,
19+
GroupColumn, Nulls, nulls_equal_to,
2020
};
2121
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
2222
use arrow::array::{
@@ -26,7 +26,7 @@ use arrow::array::{
2626
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
2727
use arrow::datatypes::{ByteArrayType, DataType, GenericBinaryType};
2828
use datafusion_common::utils::proxy::VecAllocExt;
29-
use datafusion_common::{Result, exec_datafusion_err};
29+
use datafusion_common::{Result, exec_datafusion_err, utils::split_vec_min_alloc};
3030
use datafusion_physical_expr_common::binary_map::{INITIAL_BUFFER_CAPACITY, OutputType};
3131
use itertools::izip;
3232
use std::mem::size_of;

datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -107,19 +107,6 @@ pub trait GroupColumn: Send + Sync {
107107
fn take_n(&mut self, n: usize) -> ArrayRef;
108108
}
109109

110-
/// Splits `vec` at `n`, returning the first `n` elements and leaving the
111-
/// remainder in `vec`. Allocates for whichever portion is smaller to minimize
112-
/// peak memory: `drain+collect` when `n <= remaining`, `split_off+replace`
113-
/// when `remaining < n`.
114-
pub(super) fn split_vec_min_alloc<T>(vec: &mut Vec<T>, n: usize) -> Vec<T> {
115-
if n * 2 <= vec.len() {
116-
vec.drain(0..n).collect()
117-
} else {
118-
let remaining = vec.split_off(n);
119-
mem::replace(vec, remaining)
120-
}
121-
}
122-
123110
/// Determines if the nullability of the existing and new input array can be used
124111
/// to short-circuit the comparison of the two values.
125112
///
@@ -1275,50 +1262,7 @@ mod tests {
12751262
GroupValues, multi_group_by::GroupValuesColumn,
12761263
};
12771264

1278-
use super::{GroupIndexView, split_vec_min_alloc};
1279-
1280-
#[test]
1281-
fn test_split_vec_min_alloc_drain_branch() {
1282-
// n * 2 <= len → drain+collect branch (allocates n elements)
1283-
let mut v = vec![1, 2, 3, 4, 5, 6];
1284-
let first = split_vec_min_alloc(&mut v, 2);
1285-
assert_eq!(first, vec![1, 2]);
1286-
assert_eq!(v, vec![3, 4, 5, 6]);
1287-
}
1288-
1289-
#[test]
1290-
fn test_split_vec_min_alloc_split_off_branch() {
1291-
// remaining < n → split_off+replace branch (allocates remaining elements)
1292-
let mut v = vec![1, 2, 3, 4, 5, 6];
1293-
let first = split_vec_min_alloc(&mut v, 4);
1294-
assert_eq!(first, vec![1, 2, 3, 4]);
1295-
assert_eq!(v, vec![5, 6]);
1296-
}
1297-
1298-
#[test]
1299-
fn test_split_vec_min_alloc_exactly_half() {
1300-
// n * 2 == len → drain branch (boundary condition)
1301-
let mut v = vec![1, 2, 3, 4];
1302-
let first = split_vec_min_alloc(&mut v, 2);
1303-
assert_eq!(first, vec![1, 2]);
1304-
assert_eq!(v, vec![3, 4]);
1305-
}
1306-
1307-
#[test]
1308-
fn test_split_vec_min_alloc_take_all() {
1309-
let mut v = vec![1, 2, 3];
1310-
let first = split_vec_min_alloc(&mut v, 3);
1311-
assert_eq!(first, vec![1, 2, 3]);
1312-
assert!(v.is_empty());
1313-
}
1314-
1315-
#[test]
1316-
fn test_split_vec_min_alloc_take_none() {
1317-
let mut v = vec![1, 2, 3];
1318-
let first = split_vec_min_alloc(&mut v, 0);
1319-
assert!(first.is_empty());
1320-
assert_eq!(v, vec![1, 2, 3]);
1321-
}
1265+
use super::GroupIndexView;
13221266

13231267
#[test]
13241268
fn test_intern_for_vectorized_group_values() {

datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
// under the License.
1717

1818
use crate::aggregates::group_values::multi_group_by::{
19-
GroupColumn, Nulls, nulls_equal_to, split_vec_min_alloc,
19+
GroupColumn, Nulls, nulls_equal_to,
2020
};
2121
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
2222
use arrow::array::ArrowNativeTypeOp;
2323
use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray, cast::AsArray};
2424
use arrow::buffer::ScalarBuffer;
2525
use arrow::datatypes::DataType;
26-
use datafusion_common::Result;
26+
use datafusion_common::{Result, utils::split_vec_min_alloc};
2727
use datafusion_execution::memory_pool::proxy::VecAllocExt;
2828
use itertools::izip;
2929
use std::iter;

0 commit comments

Comments
 (0)