-
Notifications
You must be signed in to change notification settings - Fork 2k
Fix memory reservation starvation in sort-merge #20642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ use arrow::datatypes::SchemaRef; | |
| use datafusion_common::Result; | ||
| use datafusion_execution::memory_pool::MemoryReservation; | ||
|
|
||
| use crate::sorts::builder::try_grow_reservation_to_at_least; | ||
| use crate::sorts::sort::get_reserved_bytes_for_record_batch_size; | ||
| use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; | ||
| use crate::stream::RecordBatchStreamAdapter; | ||
|
|
@@ -253,7 +254,12 @@ impl MultiLevelMergeBuilder { | |
|
|
||
| // Need to merge multiple streams | ||
| (_, _) => { | ||
| let mut memory_reservation = self.reservation.new_empty(); | ||
| // Transfer any pre-reserved bytes (from sort_spill_reservation_bytes) | ||
| // to the merge memory reservation. This prevents starvation when | ||
| // concurrent sort partitions compete for pool memory: the pre-reserved | ||
| // bytes cover spill file buffer reservations without additional pool | ||
| // allocation. | ||
| let mut memory_reservation = self.reservation.take(); | ||
|
|
||
| // Don't account for existing streams memory | ||
| // as we are not holding the memory for them | ||
|
|
@@ -269,6 +275,15 @@ impl MultiLevelMergeBuilder { | |
|
|
||
| let is_only_merging_memory_streams = sorted_spill_files.is_empty(); | ||
|
|
||
| // If no spill files were selected (e.g. all too large for | ||
| // available memory but enough in-memory streams exist), | ||
| // return the pre-reserved bytes to self.reservation so | ||
| // create_new_merge_sort can transfer them to the merge | ||
| // stream's BatchBuilder. | ||
| if is_only_merging_memory_streams { | ||
| mem::swap(&mut self.reservation, &mut memory_reservation); | ||
| } | ||
|
|
||
| for spill in sorted_spill_files { | ||
| let stream = self | ||
| .spill_manager | ||
|
|
@@ -337,8 +352,10 @@ impl MultiLevelMergeBuilder { | |
| builder = builder.with_bypass_mempool(); | ||
| } else { | ||
| // If we are only merging in-memory streams, we need to use the memory reservation | ||
| // because we don't know the maximum size of the batches in the streams | ||
| builder = builder.with_reservation(self.reservation.new_empty()); | ||
| // because we don't know the maximum size of the batches in the streams. | ||
| // Use take() to transfer any pre-reserved bytes so the merge can use them | ||
| // as its initial budget without additional pool allocation. | ||
| builder = builder.with_reservation(self.reservation.take()); | ||
| } | ||
|
|
||
| builder.build() | ||
|
|
@@ -356,17 +373,24 @@ impl MultiLevelMergeBuilder { | |
| ) -> Result<(Vec<SortedSpillFile>, usize)> { | ||
| assert_ne!(buffer_len, 0, "Buffer length must be greater than 0"); | ||
| let mut number_of_spills_to_read_for_current_phase = 0; | ||
| // Track total memory needed for spill file buffers. When the | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels like this whole method is ripe for a refactor, and introducing a memory floor is making it even more complex. Is there a way to incorporate the memory floor, but also simplify this a little bit
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a |
||
| // reservation has pre-reserved bytes (from sort_spill_reservation_bytes), | ||
| // those bytes cover the first N spill files without additional pool | ||
| // allocation, preventing starvation under memory pressure. | ||
| let mut total_needed: usize = 0; | ||
|
|
||
| for spill in &self.sorted_spill_files { | ||
| // For memory pools that are not shared this is good, for other this is not | ||
| // and there should be some upper limit to memory reservation so we won't starve the system | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this comment still applies: if you have multiple partitions running, one partition will still be able to starve the others
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, I'll keep the comment |
||
| match reservation.try_grow( | ||
| get_reserved_bytes_for_record_batch_size( | ||
| spill.max_record_batch_memory, | ||
| // Size will be the same as the sliced size, bc it is a spilled batch. | ||
| spill.max_record_batch_memory, | ||
| ) * buffer_len, | ||
| ) { | ||
| let per_spill = get_reserved_bytes_for_record_batch_size( | ||
| spill.max_record_batch_memory, | ||
| // Size will be the same as the sliced size, bc it is a spilled batch. | ||
| spill.max_record_batch_memory, | ||
| ) * buffer_len; | ||
| total_needed += per_spill; | ||
|
|
||
| // For memory pools that are not shared this is good, for other | ||
| // this is not and there should be some upper limit to memory | ||
| // reservation so we won't starve the system. | ||
| match try_grow_reservation_to_at_least(reservation, total_needed) { | ||
| Ok(_) => { | ||
| number_of_spills_to_read_for_current_phase += 1; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -342,11 +342,6 @@ impl ExternalSorter { | |
| /// 2. A combined streaming merge incorporating both in-memory | ||
| /// batches and data from spill files on disk. | ||
| async fn sort(&mut self) -> Result<SendableRecordBatchStream> { | ||
| // Release the memory reserved for merge back to the pool so | ||
| // there is some left when `in_mem_sort_stream` requests an | ||
| // allocation. | ||
| self.merge_reservation.free(); | ||
|
|
||
| if self.spilled_before() { | ||
| // Sort `in_mem_batches` and spill it first. If there are many | ||
| // `in_mem_batches` and the memory limit is almost reached, merging | ||
|
|
@@ -355,6 +350,13 @@ impl ExternalSorter { | |
| self.sort_and_spill_in_mem_batches().await?; | ||
| } | ||
|
|
||
| // Transfer the pre-reserved merge memory to the streaming merge | ||
| // using `take()` instead of `new_empty()`. This ensures the merge | ||
| // stream starts with `sort_spill_reservation_bytes` already | ||
| // allocated, preventing starvation when concurrent sort partitions | ||
| // compete for pool memory. `take()` moves the bytes atomically | ||
| // without releasing them back to the pool, so other partitions | ||
| // cannot race to consume the freed memory. | ||
|
Comment on lines
+353
to
+359
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The pre reserved merge memory should be used as part of the sort merge stream.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! Good point — just take() alone wouldn't be enough if the merge stream doesn't know about the pre-reserved bytes. The PR does address this in the other changed files:
So the merge stream is aware of the pre-reserved bytes and uses them as its starting budget — it doesn't think it's starting from 0. |
||
| StreamingMergeBuilder::new() | ||
| .with_sorted_spill_files(std::mem::take(&mut self.finished_spill_files)) | ||
| .with_spill_manager(self.spill_manager.clone()) | ||
|
|
@@ -363,9 +365,14 @@ impl ExternalSorter { | |
| .with_metrics(self.metrics.baseline.clone()) | ||
| .with_batch_size(self.batch_size) | ||
| .with_fetch(None) | ||
| .with_reservation(self.merge_reservation.new_empty()) | ||
| .with_reservation(self.merge_reservation.take()) | ||
| .build() | ||
| } else { | ||
| // Release the memory reserved for merge back to the pool so | ||
| // there is some left when `in_mem_sort_stream` requests an | ||
| // allocation. Only needed for the non-spill path; the spill | ||
| // path transfers the reservation to the merge stream instead. | ||
| self.merge_reservation.free(); | ||
| self.in_mem_sort_stream(self.metrics.baseline.clone()) | ||
| } | ||
| } | ||
|
|
@@ -375,6 +382,12 @@ impl ExternalSorter { | |
| self.reservation.size() | ||
| } | ||
|
|
||
| /// How much memory is reserved for the merge phase? | ||
| #[cfg(test)] | ||
| fn merge_reservation_size(&self) -> usize { | ||
| self.merge_reservation.size() | ||
| } | ||
|
|
||
| /// How many bytes have been spilled to disk? | ||
| fn spilled_bytes(&self) -> usize { | ||
| self.metrics.spill_metrics.spilled_bytes.value() | ||
|
|
@@ -2716,4 +2729,138 @@ mod tests { | |
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Verifies that `ExternalSorter::sort()` transfers the pre-reserved | ||
| /// merge bytes to the merge stream via `take()`, rather than leaving | ||
| /// them in the sorter (via `new_empty()`). | ||
| /// | ||
| /// 1. Create a sorter with a tight memory pool and insert enough data | ||
| /// to force spilling | ||
| /// 2. Verify `merge_reservation` holds the pre-reserved bytes before sort | ||
| /// 3. Call `sort()` to get the merge stream | ||
| /// 4. Verify `merge_reservation` is now 0 (bytes transferred to merge stream) | ||
| /// 5. Simulate contention: a competing consumer grabs all available pool memory | ||
| /// 6. Verify the merge stream still works (it uses its pre-reserved bytes | ||
| /// as initial budget, not requesting from pool starting at 0) | ||
| /// | ||
| /// With `new_empty()` (before fix), step 4 fails: `merge_reservation` | ||
| /// still holds the bytes, the merge stream starts with 0 budget, and | ||
| /// those bytes become unaccounted-for reserved memory that nobody uses. | ||
| #[tokio::test] | ||
| async fn test_sort_merge_reservation_transferred_not_freed() -> Result<()> { | ||
| use datafusion_execution::memory_pool::{ | ||
| GreedyMemoryPool, MemoryConsumer, MemoryPool, | ||
| }; | ||
| use futures::TryStreamExt; | ||
|
|
||
| let sort_spill_reservation_bytes: usize = 10 * 1024; // 10 KB | ||
|
|
||
| // Pool: merge reservation (10KB) + enough room for sort to work. | ||
| // The room must accommodate batch data accumulation before spilling. | ||
| let sort_working_memory: usize = 40 * 1024; // 40 KB for sort operations | ||
| let pool_size = sort_spill_reservation_bytes + sort_working_memory; | ||
| let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(pool_size)); | ||
|
|
||
| let runtime = RuntimeEnvBuilder::new() | ||
| .with_memory_pool(Arc::clone(&pool)) | ||
| .build_arc()?; | ||
|
|
||
| let metrics_set = ExecutionPlanMetricsSet::new(); | ||
| let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); | ||
|
|
||
| let mut sorter = ExternalSorter::new( | ||
| 0, | ||
| Arc::clone(&schema), | ||
| [PhysicalSortExpr::new_default(Arc::new(Column::new("x", 0)))].into(), | ||
| 128, // batch_size | ||
| sort_spill_reservation_bytes, | ||
| usize::MAX, // sort_in_place_threshold_bytes (high to avoid concat path) | ||
| SpillCompression::Uncompressed, | ||
| &metrics_set, | ||
| Arc::clone(&runtime), | ||
| )?; | ||
|
|
||
| // Insert enough data to force spilling. | ||
| let num_batches = 200; | ||
| for i in 0..num_batches { | ||
| let values: Vec<i32> = ((i * 100)..((i + 1) * 100)).rev().collect(); | ||
| let batch = RecordBatch::try_new( | ||
| Arc::clone(&schema), | ||
| vec![Arc::new(Int32Array::from(values))], | ||
| )?; | ||
| sorter.insert_batch(batch).await?; | ||
| } | ||
|
|
||
| assert!( | ||
| sorter.spilled_before(), | ||
| "Test requires spilling to exercise the merge path" | ||
| ); | ||
|
|
||
| // Before sort(), merge_reservation holds sort_spill_reservation_bytes. | ||
| assert!( | ||
| sorter.merge_reservation_size() >= sort_spill_reservation_bytes, | ||
| "merge_reservation should hold the pre-reserved bytes before sort()" | ||
| ); | ||
|
|
||
| // Call sort() to get the merge stream. With the fix (take()), | ||
| // the pre-reserved merge bytes are transferred to the merge | ||
| // stream. Without the fix (free() + new_empty()), the bytes | ||
| // are released back to the pool and the merge stream starts | ||
| // with 0 bytes. | ||
| let merge_stream = sorter.sort().await?; | ||
|
|
||
| // THE KEY ASSERTION: after sort(), merge_reservation must be 0. | ||
| // This proves take() transferred the bytes to the merge stream, | ||
| // rather than them being freed back to the pool where other | ||
| // partitions could steal them. | ||
| assert_eq!( | ||
| sorter.merge_reservation_size(), | ||
| 0, | ||
| "After sort(), merge_reservation should be 0 (bytes transferred \ | ||
| to merge stream via take()). If non-zero, the bytes are still \ | ||
| held by the sorter and will be freed on drop, allowing other \ | ||
| partitions to steal them." | ||
| ); | ||
|
|
||
| // Drop the sorter to free its reservations back to the pool. | ||
| drop(sorter); | ||
|
|
||
| // Simulate contention: another partition grabs ALL available | ||
| // pool memory. If the merge stream didn't receive the | ||
| // pre-reserved bytes via take(), it will fail when it tries | ||
| // to allocate memory for reading spill files. | ||
| let contender = MemoryConsumer::new("CompetingPartition").register(&pool); | ||
| let available = pool_size.saturating_sub(pool.reserved()); | ||
| if available > 0 { | ||
| contender.try_grow(available).unwrap(); | ||
| } | ||
|
|
||
| // The merge stream must still produce correct results despite | ||
| // the pool being fully consumed by the contender. This only | ||
| // works if sort() transferred the pre-reserved bytes to the | ||
| // merge stream (via take()) rather than freeing them. | ||
| let batches: Vec<RecordBatch> = merge_stream.try_collect().await?; | ||
| let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); | ||
| assert_eq!( | ||
| total_rows, | ||
| (num_batches * 100) as usize, | ||
| "Merge stream should produce all rows even under memory contention" | ||
| ); | ||
|
|
||
| // Verify data is sorted | ||
| let merged = concat_batches(&schema, &batches)?; | ||
| let col = merged.column(0).as_primitive::<Int32Type>(); | ||
| for i in 1..col.len() { | ||
| assert!( | ||
| col.value(i - 1) <= col.value(i), | ||
| "Output should be sorted, but found {} > {} at index {}", | ||
| col.value(i - 1), | ||
| col.value(i), | ||
| i | ||
| ); | ||
| } | ||
|
|
||
| drop(contender); | ||
| Ok(()) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like merge_sorted_runs_within_mem_limit() is transferring self.reservation into memory_reservation before it actually knows whether any spill files will be merged. If the builder already has enough in-memory streams to satisfy minimum_number_of_required_streams, but the first spill file still cannot fit, then get_sorted_spill_files_to_merge() could legitimately return zero spill files.
In that situation, is_only_merging_memory_streams would become true, but memory_reservation would still contain the bytes taken from self.reservation. That seems like it could trigger the assertion at lines 297–302 even though falling back to an all-in-memory merge is valid.
My understanding is that this creates a behavior regression in the mixed {sorted_streams + sorted_spill_files} path. Should the reservation transfer instead happen only after at least one spill file is selected, or should the unused reservation be returned to the all-in-memory merge path rather than being asserted away? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c478d43