Skip to content

Fix memory reservation starvation in sort-merge#20642

Merged
alamb merged 5 commits intoapache:mainfrom
xudong963:fix/sort-merge-reservation-starvation
Mar 18, 2026
Merged

Fix memory reservation starvation in sort-merge#20642
alamb merged 5 commits intoapache:mainfrom
xudong963:fix/sort-merge-reservation-starvation

Conversation

@xudong963
Copy link
Copy Markdown
Member

@xudong963 xudong963 commented Mar 2, 2026

Which issue does this PR close?

  • Closes #.

Rationale for this change

This PR fixes memory reservation starvation in sort-merge when multiple sort partitions share a GreedyMemoryPool.

When multiple ExternalSorter instances run concurrently and share a single memory pool, the merge phase starves:

  1. Each partition pre-reserves sort_spill_reservation_bytes via merge_reservation
  2. When entering the merge phase, new_empty() was used to create a new reservation starting at 0 bytes, while the pre-reserved bytes sat idle in ExternalSorter.merge_reservation
  3. Those freed bytes were immediately consumed by other partitions racing for memory
  4. The merge could no longer allocate memory from the pool → OOM / starvation

What changes are included in this PR?

Are these changes tested?

I can't find a deterministic way to reproduce the bug, but it occurs in our production. Add an end-to-end test to verify the fix

Are there any user-facing changes?

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Mar 2, 2026
@xudong963 xudong963 marked this pull request as draft March 2, 2026 09:49
@xudong963 xudong963 marked this pull request as ready for review March 2, 2026 10:09
@xudong963
Copy link
Copy Markdown
Member Author

xudong963 commented Mar 2, 2026

I can't find a deterministic way to reproduce the bug now but it occurs in our production. I'd like to get more eyes for the PR!

Update: I added an end-to-end test which fails on main

@xudong963 xudong963 force-pushed the fix/sort-merge-reservation-starvation branch from 0f2140c to 651e9c9 Compare March 2, 2026 21:15
Comment on lines +358 to +364
// Transfer the pre-reserved merge memory to the streaming merge
// using `take()` instead of `new_empty()`. This ensures the merge
// stream starts with `sort_spill_reservation_bytes` already
// allocated, preventing starvation when concurrent sort partitions
// compete for pool memory. `take()` moves the bytes atomically
// without releasing them back to the pool, so other partitions
// cannot race to consume the freed memory.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pre reserved merge memory should be used as part of the sort merge stream.
I mean that if x pre reserved merge memory was reserved the sort merge stream should know about that so it wont think it starting from 0, otherwise this just reserve for unaccounted memory

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Good point — just take() alone wouldn't be enough if the merge stream doesn't know about the pre-reserved bytes.

The PR does address this in the other changed files:

  • In builder.rs: BatchBuilder now tracks batches_mem_used separately and only calls try_grow() when actual usage exceeds the current reservation size. It also records initial_reservation so
    it never shrinks below that during build_output. This way the pre-reserved bytes are used as the initial budget rather than requesting from the pool on top of them.
  • In multi_level_merge.rs: get_sorted_spill_files_to_merge now tracks total_needed and only requests additional pool memory when total_needed > reservation.size(), so spill file buffers
    covered by the pre-reserved bytes don't trigger extra pool allocations.

So the merge stream is aware of the pre-reserved bytes and uses them as its starting budget — it doesn't think it's starting from 0.

coracuity added a commit to acuitymd/silk-chiffon that referenced this pull request Mar 4, 2026
DataFusion 52.1.0 has a TOCTOU race in ExternalSorter where merge
reservations are freed and re-created empty, letting other partitions
steal the memory (apache/datafusion#20642). Until the upstream fix
lands, compute a data-aware sort_spill_reservation_bytes by sampling
actual Arrow row sizes from the input, estimating spill file count,
and reserving enough for the merge phase.
coracuity added a commit to acuitymd/silk-chiffon that referenced this pull request Mar 4, 2026
* Sample-based sort spill reservation to mitigate merge OOM

DataFusion 52.1.0 has a TOCTOU race in ExternalSorter where merge
reservations are freed and re-created empty, letting other partitions
steal the memory (apache/datafusion#20642). Until the upstream fix
lands, compute a data-aware sort_spill_reservation_bytes by sampling
actual Arrow row sizes from the input, estimating spill file count,
and reserving enough for the merge phase.

* Add more tests

* formatting

* Allow some truncation here

* Handle when budgets are tight

* Better estimate in-memory size using row count and avg row size

* Remove dead code

* Lint fix

* linting

* Fix low memory situations
@xudong963 xudong963 force-pushed the fix/sort-merge-reservation-starvation branch from 01ae92d to acd7caa Compare March 10, 2026 12:08

for spill in &self.sorted_spill_files {
// For memory pools that are not shared this is good, for other this is not
// and there should be some upper limit to memory reservation so we won't starve the system
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment still applies: if you have multiple partitions running, one partition will still be able to starve the others

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'll keep the comment

) -> Result<(Vec<SortedSpillFile>, usize)> {
assert_ne!(buffer_len, 0, "Buffer length must be greater than 0");
let mut number_of_spills_to_read_for_current_phase = 0;
// Track total memory needed for spill file buffers. When the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like this whole method is ripe for a refactor, and introducing a memory floor is making it even more complex. Is there a way to incorporate the memory floor, but also simplify this a little bit

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a try_grow_reservation_to_at_least help to reduce complexity

Copy link
Copy Markdown
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xudong963

Thanks for working on this.

// concurrent sort partitions compete for pool memory: the pre-reserved
// bytes cover spill file buffer reservations without additional pool
// allocation.
let mut memory_reservation = self.reservation.take();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like merge_sorted_runs_within_mem_limit() is transferring self.reservation into memory_reservation before it actually knows whether any spill files will be merged. If the builder already has enough in-memory streams to satisfy minimum_number_of_required_streams, but the first spill file still cannot fit, then get_sorted_spill_files_to_merge() could legitimately return zero spill files.

In that situation, is_only_merging_memory_streams would become true, but memory_reservation would still contain the bytes taken from self.reservation. That seems like it could trigger the assertion at lines 297–302 even though falling back to an all-in-memory merge is valid.

My understanding is that this creates a behavior regression in the mixed {sorted_streams + sorted_spill_files} path. Should the reservation transfer instead happen only after at least one spill file is selected, or should the unused reservation be returned to the all-in-memory merge path rather than being asserted away? 🤔

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +96 to +98
if self.batches_mem_used > self.reservation.size() {
self.reservation
.try_grow(self.batches_mem_used - self.reservation.size())?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This “grow only when usage exceeds current reservation” pattern is also checked at get_sorted_spill_files_to_merge in multi_level_merge.rs.
I think extracting this into a helper will make the intended invariant easier to check.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@xudong963
Copy link
Copy Markdown
Member Author

I plan to merge the PR tomorrow if there are no more comments

@xudong963 xudong963 requested review from cetra3 and rluvaton March 17, 2026 01:42
@alamb alamb added this pull request to the merge queue Mar 18, 2026
Merged via the queue into apache:main with commit a6a4df9 Mar 18, 2026
34 checks passed
@alamb
Copy link
Copy Markdown
Contributor

alamb commented Mar 18, 2026

I am merging to try and keep the code flowing

@alamb
Copy link
Copy Markdown
Contributor

alamb commented Mar 18, 2026

Let's address any additional comments as follow on PRs

xudong963 added a commit to massive-com/arrow-datafusion that referenced this pull request Mar 20, 2026
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

- Closes #.

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

This PR fixes memory reservation starvation in sort-merge when multiple
sort partitions share a GreedyMemoryPool.

When multiple `ExternalSorter` instances run concurrently and share a
single memory pool, the merge phase starves:

1. Each partition pre-reserves sort_spill_reservation_bytes via
merge_reservation
2. When entering the merge phase, new_empty() was used to create a new
reservation starting at 0 bytes, while the pre-reserved bytes sat idle
in ExternalSorter.merge_reservation
3. Those freed bytes were immediately consumed by other partitions
racing for memory
4. The merge could no longer allocate memory from the pool → OOM /
starvation

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

~~I can't find a deterministic way to reproduce the bug, but it occurs
in our production.~~ Add an end-to-end test to verify the fix

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
xudong963 added a commit to massive-com/arrow-datafusion that referenced this pull request Mar 23, 2026
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

- Closes #.

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

This PR fixes memory reservation starvation in sort-merge when multiple
sort partitions share a GreedyMemoryPool.

When multiple `ExternalSorter` instances run concurrently and share a
single memory pool, the merge phase starves:

1. Each partition pre-reserves sort_spill_reservation_bytes via
merge_reservation
2. When entering the merge phase, new_empty() was used to create a new
reservation starting at 0 bytes, while the pre-reserved bytes sat idle
in ExternalSorter.merge_reservation
3. Those freed bytes were immediately consumed by other partitions
racing for memory
4. The merge could no longer allocate memory from the pool → OOM /
starvation

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

~~I can't find a deterministic way to reproduce the bug, but it occurs
in our production.~~ Add an end-to-end test to verify the fix

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
de-bgunter pushed a commit to de-bgunter/datafusion that referenced this pull request Mar 24, 2026
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

- Closes #.

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

This PR fixes memory reservation starvation in sort-merge when multiple
sort partitions share a GreedyMemoryPool.
 
When multiple `ExternalSorter` instances run concurrently and share a
single memory pool, the merge phase starves:

1. Each partition pre-reserves sort_spill_reservation_bytes via
merge_reservation
2. When entering the merge phase, new_empty() was used to create a new
reservation starting at 0 bytes, while the pre-reserved bytes sat idle
in ExternalSorter.merge_reservation
3. Those freed bytes were immediately consumed by other partitions
racing for memory
4. The merge could no longer allocate memory from the pool → OOM /
starvation

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

~~I can't find a deterministic way to reproduce the bug, but it occurs
in our production.~~ Add an end-to-end test to verify the fix

## Are there any user-facing changes?

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants