Skip to content

perf: hoist split_vec_min_alloc to datafusion-common and shrink the emitted prefix#22416

Open
RyanJamesStewart wants to merge 4 commits into
apache:mainfrom
RyanJamesStewart:perf/min-max-emit-min-alloc
Open

perf: hoist split_vec_min_alloc to datafusion-common and shrink the emitted prefix#22416
RyanJamesStewart wants to merge 4 commits into
apache:mainfrom
RyanJamesStewart:perf/min-max-emit-min-alloc

Conversation

@RyanJamesStewart
Copy link
Copy Markdown
Contributor

@RyanJamesStewart RyanJamesStewart commented May 21, 2026

Which issue does this PR close?

Related to #22164 and #22165.

Rationale for this change

#22165 added a split_vec_min_alloc helper so that EmitTo::First(n) allocates min(n, len - n) instead of always copying n elements. This PR finishes and corrects that work, following review:

  • The helper was pub(super) inside datafusion-physical-plan, so it could not be reused. It moves to datafusion_common::utils as the single shared copy.
  • The split_off branch returned the original allocation as the emitted prefix: short length, but original (larger) capacity. datafusion accounts memory by capacity rather than length, so that prefix did not actually release memory under pressure. The helper now calls shrink_to_fit on it.
  • Two further EmitTo::First(n) paths still used the drain(..n).collect() idiom: the min/max accumulators and ByteViewGroupValueBuilder::take_n. Both now route through the shared helper.

What changes are included in this PR?

  • datafusion_common::utils::split_vec_min_alloc: the shared helper, with shrink_to_fit on the emitted prefix.
  • datafusion-physical-plan: the duplicate helper is removed; bytes.rs, primitive.rs and bytes_view.rs use the shared one.
  • datafusion-functions-aggregate: MinMaxStructAccumulator and MinMaxBytesAccumulator use it in emit_to.

This supersedes #22205, whose ByteViewGroupValueBuilder::take_n change is included here.

Are these changes tested?

Yes. Unit tests for the helper (both branches and the boundaries) plus the existing take_n and min_max tests pass.

Are there any user-facing changes?

No.

`MinMaxStructAccumulator` and `MinMaxBytesAccumulator` both emit a
prefix of their `min_max` group buffer with `drain(..n).collect()`.
That always allocates `n` elements and leaves the retained buffer at
its pre-emit capacity. Under an OOM-triggered `EmitTo::First(n)` emit,
`n` is close to the buffer length, so this copies the largest
allocation -- the same problem apache#22165 fixed for the multi-group-by
`GroupColumn` builders.

This applies the same fix: a `split_vec_min_alloc` helper that
allocates `min(n, len - n)` by choosing `drain+collect` or
`split_off+replace` depending on which side is smaller. Both min/max
accumulators route through it.

The helper is duplicated from `datafusion-physical-plan`'s
`split_vec_min_alloc` (added in apache#22165) because that one is
`pub(super)`-scoped to a different crate. It is a generic `Vec<T>`
utility with no aggregate-specific logic; if maintainers prefer, it
could instead be hoisted into `datafusion-common` and shared by both
crates -- happy to do that here or as a follow-up.

Behavior is unchanged. Adds unit tests covering both branches and the
n == len / n == 0 boundaries.
@github-actions github-actions Bot added the functions Changes to functions implementation label May 21, 2026
/// Mirrors `split_vec_min_alloc` in `datafusion-physical-plan`'s
/// `aggregates::group_values::multi_group_by` module (added in #22165); kept
/// local here because that helper is `pub(super)` to a different crate.
fn split_vec_min_alloc<T>(vec: &mut Vec<T>, n: usize) -> Vec<T> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move this in datafusion-common

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have another use case for it in datafusion/expr-common/src/groups_accumulator.rs so it would be nice to have it in one place

Per review on apache#22416: move the split_vec_min_alloc helper out of
functions-aggregate's min_max.rs into datafusion-common's utils
module, so it lives in one place and can be shared.

The function and its five tests move verbatim. min_max_struct.rs and
min_max_bytes.rs now import it from datafusion_common::utils instead
of from the local module.
@github-actions github-actions Bot added the common Related to common crate label May 21, 2026
if n * 2 <= vec.len() {
vec.drain(0..n).collect()
} else {
let remaining = vec.split_off(n);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was meaning to add this, but since you're in this area, I appreciate it if you could apply it.
This is important because in datafusion memory tracking is done by looking at capacity, not length, so we shouldn't hand off Vecs with small length but very large capacity.

Suggested change
let remaining = vec.split_off(n);
let remaining = vec.split_off(n);
vec.shrink_to_fit();

@ariel-miculas
Copy link
Copy Markdown
Contributor

Please also remove the function from datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs‎ so we don't have duplicates.

@ariel-miculas
Copy link
Copy Markdown
Contributor

Thanks for following up on this, @RyanJamesStewart ! With this function accessible from the common module it will be easier to make changes in other places as well.

@ariel-miculas
Copy link
Copy Markdown
Contributor

Since this wasn't merged yet, maybe you could hoist those changes here as well: #22205



Addresses the review on apache#22416:

- split_vec_min_alloc: shrink_to_fit the emitted prefix in the
  split_off branch. datafusion accounts memory by capacity rather
  than length, so the prefix must not retain the pre-split (larger)
  allocation's capacity.
- Remove the duplicate split_vec_min_alloc from physical-plan's
  multi_group_by/mod.rs (added in apache#22165); bytes.rs and primitive.rs
  now use the shared datafusion_common::utils version.
- Fold in the ByteViewGroupValueBuilder::take_n change previously
  proposed as apache#22205, so this PR converts every split_vec_min_alloc
  call site in one place.

All split_vec_min_alloc, take_n and min_max tests pass.
@RyanJamesStewart RyanJamesStewart changed the title perf: minimize allocation in min/max emit_to(First(n)) perf: hoist split_vec_min_alloc to datafusion-common and shrink the emitted prefix May 21, 2026
@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label May 21, 2026
@RyanJamesStewart
Copy link
Copy Markdown
Contributor Author

Done in 287a1d2. split_vec_min_alloc now lives in datafusion_common::utils and calls shrink_to_fit on the emitted prefix in the split_off branch, since that prefix keeps the original capacity and datafusion accounts memory by capacity rather than length.

The duplicate helper in physical-plan's multi_group_by/mod.rs is removed; bytes.rs and primitive.rs now use the shared one. I also folded in the ByteViewGroupValueBuilder::take_n change from #22205, so this PR converts every split_vec_min_alloc call site in one place. #22205 will be closed as superseded.

Comment thread datafusion/common/src/utils/mod.rs Outdated
// The emitted prefix keeps the original (larger) allocation. datafusion
// accounts memory by capacity rather than length, so shrink it to avoid
// handing off a short Vec that still reserves the pre-split capacity.
vec.shrink_to_fit();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my bad for suggesting this, turns out it's a more nuanced change that needs more tests. In

// Given offsets like [0, 2, 4, 5] and n = 1, we expect to get
// offsets [0, 2, 3]. We first create two offsets for first_n as [0, 2] and the remaining as [2, 4, 5].
// And we shift the offset starting from 0 for the remaining one, [2, 4, 5] -> [0, 2, 3].
let offset_n = self.offsets[n];
let mut first_n_offsets = split_vec_min_alloc(&mut self.offsets, n);
// After the split, self.offsets[0] == offset_n in both branches; normalize in-place.
self.offsets.iter_mut().for_each(|o| *o = o.sub(offset_n));
first_n_offsets.push(offset_n);

there's

first_n_offsets.push(offset_n);

and if we're shrinking the vec to the minimum capacity, pushing to it will cause reallocation and a greater capacity than the original one.
It's probably best to remove the shrink_to_fit for now.

@ariel-miculas
Copy link
Copy Markdown
Contributor

Other than removing my bad suggestion, looks good to me

… branch

ariel-miculas noted the split-off branch should not call shrink_to_fit
"for now". The downstream caller in multi_group_by/bytes.rs pushes onto
the emitted prefix immediately (first_n_offsets.push(offset_n)). With
shrink_to_fit, that push reallocates and Vec growth lands the prefix at
a capacity larger than the original allocation, defeating the intended
memory saving. Without it, the push reuses the preserved allocation and
no realloc occurs.

The shrink was added to keep capacity-based memory accounting honest,
but in the real call path it spends a reallocation for no gain. Defer
that concern; revisit it as a caller-side property if needed.

Adds emitted_prefix_does_not_realloc_on_push, a self-documenting test
that proves the pathology empirically: the preserved allocation survives
a push without realloc, while a shrunk prefix reallocs to a larger
capacity on the same push.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@RyanJamesStewart
Copy link
Copy Markdown
Contributor Author

Removed the shrink_to_fit in the split_off branch, as suggested. The point about the push-after-emit caller holds: when a caller pushes onto the emitted prefix right away (first_n_offsets.push in multi_group_by/bytes.rs), shrinking to length n just forces a realloc on the next push. I added a test, emitted_prefix_does_not_realloc_on_push, that pins this down. A prefix split from a capacity-64 Vec keeps capacity 64 and absorbs a push with no realloc; the same prefix shrunk to length 8 reallocs up to capacity 16 on that push, ending larger than where it started. The capacity-accounting concern is still valid, but it reads as a caller-side property rather than something to bake into the shared utility, so I have left shrink out for now. Pushed as 28655d8.

@ariel-miculas
Copy link
Copy Markdown
Contributor

For the record, this is the issue that I was trying to fix using shrink_to_fit:
Say you have a vector of 1 million elements and you keep taking 1024 elements from it. Each slice will have a capacity of 1024 except the last one, when instead of drain + collect, split_off + mem::replace will be called, since the length of the original Vec will have reached 1024. The capacity of the last slice will be 1 million, because the capacity of the original Vec doesn't change after split_off. This leads to problems with memory accounting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate functions Changes to functions implementation physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants