Fix memory reservation starvation in sort-merge#20642
Conversation
|
Update: I added an end-to-end test which fails on main |
0f2140c to
651e9c9
Compare
| // Transfer the pre-reserved merge memory to the streaming merge | ||
| // using `take()` instead of `new_empty()`. This ensures the merge | ||
| // stream starts with `sort_spill_reservation_bytes` already | ||
| // allocated, preventing starvation when concurrent sort partitions | ||
| // compete for pool memory. `take()` moves the bytes atomically | ||
| // without releasing them back to the pool, so other partitions | ||
| // cannot race to consume the freed memory. |
There was a problem hiding this comment.
The pre reserved merge memory should be used as part of the sort merge stream.
I mean that if x pre reserved merge memory was reserved the sort merge stream should know about that so it wont think it starting from 0, otherwise this just reserve for unaccounted memory
There was a problem hiding this comment.
Thanks! Good point — just take() alone wouldn't be enough if the merge stream doesn't know about the pre-reserved bytes.
The PR does address this in the other changed files:
- In builder.rs: BatchBuilder now tracks batches_mem_used separately and only calls try_grow() when actual usage exceeds the current reservation size. It also records initial_reservation so
it never shrinks below that during build_output. This way the pre-reserved bytes are used as the initial budget rather than requesting from the pool on top of them. - In multi_level_merge.rs: get_sorted_spill_files_to_merge now tracks total_needed and only requests additional pool memory when total_needed > reservation.size(), so spill file buffers
covered by the pre-reserved bytes don't trigger extra pool allocations.
So the merge stream is aware of the pre-reserved bytes and uses them as its starting budget — it doesn't think it's starting from 0.
DataFusion 52.1.0 has a TOCTOU race in ExternalSorter where merge reservations are freed and re-created empty, letting other partitions steal the memory (apache/datafusion#20642). Until the upstream fix lands, compute a data-aware sort_spill_reservation_bytes by sampling actual Arrow row sizes from the input, estimating spill file count, and reserving enough for the merge phase.
* Sample-based sort spill reservation to mitigate merge OOM DataFusion 52.1.0 has a TOCTOU race in ExternalSorter where merge reservations are freed and re-created empty, letting other partitions steal the memory (apache/datafusion#20642). Until the upstream fix lands, compute a data-aware sort_spill_reservation_bytes by sampling actual Arrow row sizes from the input, estimating spill file count, and reserving enough for the merge phase. * Add more tests * formatting * Allow some truncation here * Handle when budgets are tight * Better estimate in-memory size using row count and avg row size * Remove dead code * Lint fix * linting * Fix low memory situations
01ae92d to
acd7caa
Compare
|
|
||
| for spill in &self.sorted_spill_files { | ||
| // For memory pools that are not shared this is good, for other this is not | ||
| // and there should be some upper limit to memory reservation so we won't starve the system |
There was a problem hiding this comment.
I think this comment still applies: if you have multiple partitions running, one partition will still be able to starve the others
There was a problem hiding this comment.
yes, I'll keep the comment
| ) -> Result<(Vec<SortedSpillFile>, usize)> { | ||
| assert_ne!(buffer_len, 0, "Buffer length must be greater than 0"); | ||
| let mut number_of_spills_to_read_for_current_phase = 0; | ||
| // Track total memory needed for spill file buffers. When the |
There was a problem hiding this comment.
It feels like this whole method is ripe for a refactor, and introducing a memory floor is making it even more complex. Is there a way to incorporate the memory floor, but also simplify this a little bit
There was a problem hiding this comment.
add a try_grow_reservation_to_at_least help to reduce complexity
kosiew
left a comment
There was a problem hiding this comment.
Thanks for working on this.
| // concurrent sort partitions compete for pool memory: the pre-reserved | ||
| // bytes cover spill file buffer reservations without additional pool | ||
| // allocation. | ||
| let mut memory_reservation = self.reservation.take(); |
There was a problem hiding this comment.
It looks like merge_sorted_runs_within_mem_limit() is transferring self.reservation into memory_reservation before it actually knows whether any spill files will be merged. If the builder already has enough in-memory streams to satisfy minimum_number_of_required_streams, but the first spill file still cannot fit, then get_sorted_spill_files_to_merge() could legitimately return zero spill files.
In that situation, is_only_merging_memory_streams would become true, but memory_reservation would still contain the bytes taken from self.reservation. That seems like it could trigger the assertion at lines 297–302 even though falling back to an all-in-memory merge is valid.
My understanding is that this creates a behavior regression in the mixed {sorted_streams + sorted_spill_files} path. Should the reservation transfer instead happen only after at least one spill file is selected, or should the unused reservation be returned to the all-in-memory merge path rather than being asserted away? 🤔
| if self.batches_mem_used > self.reservation.size() { | ||
| self.reservation | ||
| .try_grow(self.batches_mem_used - self.reservation.size())?; |
There was a problem hiding this comment.
This “grow only when usage exceeds current reservation” pattern is also checked at get_sorted_spill_files_to_merge in multi_level_merge.rs.
I think extracting this into a helper will make the intended invariant easier to check.
|
I plan to merge the PR tomorrow if there are no more comments |
|
I am merging to try and keep the code flowing |
|
Let's address any additional comments as follow on PRs |
<!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes #. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> This PR fixes memory reservation starvation in sort-merge when multiple sort partitions share a GreedyMemoryPool. When multiple `ExternalSorter` instances run concurrently and share a single memory pool, the merge phase starves: 1. Each partition pre-reserves sort_spill_reservation_bytes via merge_reservation 2. When entering the merge phase, new_empty() was used to create a new reservation starting at 0 bytes, while the pre-reserved bytes sat idle in ExternalSorter.merge_reservation 3. Those freed bytes were immediately consumed by other partitions racing for memory 4. The merge could no longer allocate memory from the pool → OOM / starvation <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ~~I can't find a deterministic way to reproduce the bug, but it occurs in our production.~~ Add an end-to-end test to verify the fix <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
<!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes #. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> This PR fixes memory reservation starvation in sort-merge when multiple sort partitions share a GreedyMemoryPool. When multiple `ExternalSorter` instances run concurrently and share a single memory pool, the merge phase starves: 1. Each partition pre-reserves sort_spill_reservation_bytes via merge_reservation 2. When entering the merge phase, new_empty() was used to create a new reservation starting at 0 bytes, while the pre-reserved bytes sat idle in ExternalSorter.merge_reservation 3. Those freed bytes were immediately consumed by other partitions racing for memory 4. The merge could no longer allocate memory from the pool → OOM / starvation <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ~~I can't find a deterministic way to reproduce the bug, but it occurs in our production.~~ Add an end-to-end test to verify the fix <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> This PR fixes memory reservation starvation in sort-merge when multiple sort partitions share a GreedyMemoryPool. When multiple `ExternalSorter` instances run concurrently and share a single memory pool, the merge phase starves: 1. Each partition pre-reserves sort_spill_reservation_bytes via merge_reservation 2. When entering the merge phase, new_empty() was used to create a new reservation starting at 0 bytes, while the pre-reserved bytes sat idle in ExternalSorter.merge_reservation 3. Those freed bytes were immediately consumed by other partitions racing for memory 4. The merge could no longer allocate memory from the pool → OOM / starvation ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ~~I can't find a deterministic way to reproduce the bug, but it occurs in our production.~~ Add an end-to-end test to verify the fix ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
Which issue does this PR close?
Rationale for this change
This PR fixes memory reservation starvation in sort-merge when multiple sort partitions share a GreedyMemoryPool.
When multiple
ExternalSorterinstances run concurrently and share a single memory pool, the merge phase starves:What changes are included in this PR?
Are these changes tested?
I can't find a deterministic way to reproduce the bug, but it occurs in our production.Add an end-to-end test to verify the fixAre there any user-facing changes?