perf: hoist split_vec_min_alloc to datafusion-common and shrink the emitted prefix#22416
perf: hoist split_vec_min_alloc to datafusion-common and shrink the emitted prefix#22416RyanJamesStewart wants to merge 4 commits into
Conversation
`MinMaxStructAccumulator` and `MinMaxBytesAccumulator` both emit a prefix of their `min_max` group buffer with `drain(..n).collect()`. That always allocates `n` elements and leaves the retained buffer at its pre-emit capacity. Under an OOM-triggered `EmitTo::First(n)` emit, `n` is close to the buffer length, so this copies the largest allocation -- the same problem apache#22165 fixed for the multi-group-by `GroupColumn` builders. This applies the same fix: a `split_vec_min_alloc` helper that allocates `min(n, len - n)` by choosing `drain+collect` or `split_off+replace` depending on which side is smaller. Both min/max accumulators route through it. The helper is duplicated from `datafusion-physical-plan`'s `split_vec_min_alloc` (added in apache#22165) because that one is `pub(super)`-scoped to a different crate. It is a generic `Vec<T>` utility with no aggregate-specific logic; if maintainers prefer, it could instead be hoisted into `datafusion-common` and shared by both crates -- happy to do that here or as a follow-up. Behavior is unchanged. Adds unit tests covering both branches and the n == len / n == 0 boundaries.
| /// Mirrors `split_vec_min_alloc` in `datafusion-physical-plan`'s | ||
| /// `aggregates::group_values::multi_group_by` module (added in #22165); kept | ||
| /// local here because that helper is `pub(super)` to a different crate. | ||
| fn split_vec_min_alloc<T>(vec: &mut Vec<T>, n: usize) -> Vec<T> { |
There was a problem hiding this comment.
please move this in datafusion-common
There was a problem hiding this comment.
I have another use case for it in datafusion/expr-common/src/groups_accumulator.rs so it would be nice to have it in one place
Per review on apache#22416: move the split_vec_min_alloc helper out of functions-aggregate's min_max.rs into datafusion-common's utils module, so it lives in one place and can be shared. The function and its five tests move verbatim. min_max_struct.rs and min_max_bytes.rs now import it from datafusion_common::utils instead of from the local module.
| if n * 2 <= vec.len() { | ||
| vec.drain(0..n).collect() | ||
| } else { | ||
| let remaining = vec.split_off(n); |
There was a problem hiding this comment.
I was meaning to add this, but since you're in this area, I appreciate it if you could apply it.
This is important because in datafusion memory tracking is done by looking at capacity, not length, so we shouldn't hand off Vecs with small length but very large capacity.
| let remaining = vec.split_off(n); | |
| let remaining = vec.split_off(n); | |
| vec.shrink_to_fit(); |
|
Please also remove the function from datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs so we don't have duplicates. |
|
Thanks for following up on this, @RyanJamesStewart ! With this function accessible from the common module it will be easier to make changes in other places as well. |
|
Since this wasn't merged yet, maybe you could hoist those changes here as well: #22205 |
Addresses the review on apache#22416: - split_vec_min_alloc: shrink_to_fit the emitted prefix in the split_off branch. datafusion accounts memory by capacity rather than length, so the prefix must not retain the pre-split (larger) allocation's capacity. - Remove the duplicate split_vec_min_alloc from physical-plan's multi_group_by/mod.rs (added in apache#22165); bytes.rs and primitive.rs now use the shared datafusion_common::utils version. - Fold in the ByteViewGroupValueBuilder::take_n change previously proposed as apache#22205, so this PR converts every split_vec_min_alloc call site in one place. All split_vec_min_alloc, take_n and min_max tests pass.
|
Done in 287a1d2. The duplicate helper in physical-plan's |
| // The emitted prefix keeps the original (larger) allocation. datafusion | ||
| // accounts memory by capacity rather than length, so shrink it to avoid | ||
| // handing off a short Vec that still reserves the pre-split capacity. | ||
| vec.shrink_to_fit(); |
There was a problem hiding this comment.
This was my bad for suggesting this, turns out it's a more nuanced change that needs more tests. In
there's
first_n_offsets.push(offset_n);
and if we're shrinking the vec to the minimum capacity, pushing to it will cause reallocation and a greater capacity than the original one.
It's probably best to remove the shrink_to_fit for now.
|
Other than removing my bad suggestion, looks good to me |
… branch ariel-miculas noted the split-off branch should not call shrink_to_fit "for now". The downstream caller in multi_group_by/bytes.rs pushes onto the emitted prefix immediately (first_n_offsets.push(offset_n)). With shrink_to_fit, that push reallocates and Vec growth lands the prefix at a capacity larger than the original allocation, defeating the intended memory saving. Without it, the push reuses the preserved allocation and no realloc occurs. The shrink was added to keep capacity-based memory accounting honest, but in the real call path it spends a reallocation for no gain. Defer that concern; revisit it as a caller-side property if needed. Adds emitted_prefix_does_not_realloc_on_push, a self-documenting test that proves the pathology empirically: the preserved allocation survives a push without realloc, while a shrunk prefix reallocs to a larger capacity on the same push. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Removed the shrink_to_fit in the split_off branch, as suggested. The point about the push-after-emit caller holds: when a caller pushes onto the emitted prefix right away (first_n_offsets.push in multi_group_by/bytes.rs), shrinking to length n just forces a realloc on the next push. I added a test, emitted_prefix_does_not_realloc_on_push, that pins this down. A prefix split from a capacity-64 Vec keeps capacity 64 and absorbs a push with no realloc; the same prefix shrunk to length 8 reallocs up to capacity 16 on that push, ending larger than where it started. The capacity-accounting concern is still valid, but it reads as a caller-side property rather than something to bake into the shared utility, so I have left shrink out for now. Pushed as 28655d8. |
|
For the record, this is the issue that I was trying to fix using shrink_to_fit: |
Which issue does this PR close?
Related to #22164 and #22165.
Rationale for this change
#22165 added a
split_vec_min_allochelper so thatEmitTo::First(n)allocatesmin(n, len - n)instead of always copyingnelements. This PR finishes and corrects that work, following review:pub(super)insidedatafusion-physical-plan, so it could not be reused. It moves todatafusion_common::utilsas the single shared copy.split_offbranch returned the original allocation as the emitted prefix: short length, but original (larger) capacity. datafusion accounts memory by capacity rather than length, so that prefix did not actually release memory under pressure. The helper now callsshrink_to_fiton it.EmitTo::First(n)paths still used thedrain(..n).collect()idiom: the min/max accumulators andByteViewGroupValueBuilder::take_n. Both now route through the shared helper.What changes are included in this PR?
datafusion_common::utils::split_vec_min_alloc: the shared helper, withshrink_to_fiton the emitted prefix.datafusion-physical-plan: the duplicate helper is removed;bytes.rs,primitive.rsandbytes_view.rsuse the shared one.datafusion-functions-aggregate:MinMaxStructAccumulatorandMinMaxBytesAccumulatoruse it inemit_to.This supersedes #22205, whose
ByteViewGroupValueBuilder::take_nchange is included here.Are these changes tested?
Yes. Unit tests for the helper (both branches and the boundaries) plus the existing
take_nandmin_maxtests pass.Are there any user-facing changes?
No.