@@ -342,11 +342,6 @@ impl ExternalSorter {
342342 /// 2. A combined streaming merge incorporating both in-memory
343343 /// batches and data from spill files on disk.
344344 async fn sort ( & mut self ) -> Result < SendableRecordBatchStream > {
345- // Release the memory reserved for merge back to the pool so
346- // there is some left when `in_mem_sort_stream` requests an
347- // allocation.
348- self . merge_reservation . free ( ) ;
349-
350345 if self . spilled_before ( ) {
351346 // Sort `in_mem_batches` and spill it first. If there are many
352347 // `in_mem_batches` and the memory limit is almost reached, merging
@@ -373,6 +368,11 @@ impl ExternalSorter {
373368 . with_reservation ( self . merge_reservation . take ( ) )
374369 . build ( )
375370 } else {
371+ // Release the memory reserved for merge back to the pool so
372+ // there is some left when `in_mem_sort_stream` requests an
373+ // allocation. Only needed for the non-spill path; the spill
374+ // path transfers the reservation to the merge stream instead.
375+ self . merge_reservation . free ( ) ;
376376 self . in_mem_sort_stream ( self . metrics . baseline . clone ( ) )
377377 }
378378 }
@@ -382,6 +382,12 @@ impl ExternalSorter {
382382 self . reservation . size ( )
383383 }
384384
385+ /// How much memory is reserved for the merge phase?
386+ #[ cfg( test) ]
387+ fn merge_reservation_size ( & self ) -> usize {
388+ self . merge_reservation . size ( )
389+ }
390+
385391 /// How many bytes have been spilled to disk?
386392 fn spilled_bytes ( & self ) -> usize {
387393 self . metrics . spill_metrics . spilled_bytes . value ( )
@@ -2724,26 +2730,22 @@ mod tests {
27242730 Ok ( ( ) )
27252731 }
27262732
2727- /// End-to-end test that verifies `ExternalSorter::sort()` atomically
2728- /// transfers the pre-reserved merge bytes to the merge stream via `take()`.
2733+ /// Verifies that `ExternalSorter::sort()` transfers the pre-reserved
2734+ /// merge bytes to the merge stream via `take()`, rather than leaving
2735+ /// them in the sorter (via `new_empty()`).
27292736 ///
2730- /// This test directly exercises the `ExternalSorter` code path:
27312737 /// 1. Create a sorter with a tight memory pool and insert enough data
27322738 /// to force spilling
2733- /// 2. Call `sort()` to get the merge stream
2734- /// 3. Verify that dropping the sorter does NOT free the pre-reserved
2735- /// bytes back to the pool (they should have been transferred to
2736- /// the merge stream)
2737- /// 4. Simulate contention: a task grabs all available pool memory
2738- /// 5. Verify the merge stream still works (it has its own pre-reserved bytes)
2739- ///
2740- /// Before the fix, main (using `new_empty()`), step 3 fails: the sorter drop frees
2741- /// `sort_spill_reservation_bytes` back to the pool, and the task can
2742- /// steal them, causing the merge stream to starve.
2739+ /// 2. Verify `merge_reservation` holds the pre-reserved bytes before sort
2740+ /// 3. Call `sort()` to get the merge stream
2741+ /// 4. Verify `merge_reservation` is now 0 (bytes transferred to merge stream)
2742+ /// 5. Simulate contention: a competing consumer grabs all available pool memory
2743+ /// 6. Verify the merge stream still works (it uses its pre-reserved bytes
2744+ /// as initial budget, not requesting from pool starting at 0)
27432745 ///
2744- /// With the fix (using `take ()`), the bytes are atomically transferred
2745- /// to the merge stream. The sorter drop frees 0 bytes, so there's
2746- /// nothing for the task to steal .
2746+ /// With `new_empty ()` (before fix ), step 4 fails: `merge_reservation`
2747+ /// still holds the bytes, the merge stream starts with 0 budget, and
2748+ /// those bytes become unaccounted- for reserved memory that nobody uses .
27472749 #[ tokio:: test]
27482750 async fn test_sort_merge_reservation_transferred_not_freed ( ) -> Result < ( ) > {
27492751 use datafusion_execution:: memory_pool:: {
@@ -2778,10 +2780,7 @@ mod tests {
27782780 Arc :: clone ( & runtime) ,
27792781 ) ?;
27802782
2781- // Insert enough data to force spilling. Each batch is ~400 bytes
2782- // (100 rows × 4 bytes). With 40KB of working memory, we'll spill
2783- // after accumulating ~100 batches worth. 200 batches guarantees
2784- // multiple spill cycles.
2783+ // Insert enough data to force spilling.
27852784 let num_batches = 200 ;
27862785 for i in 0 ..num_batches {
27872786 let values: Vec < i32 > = ( ( i * 100 ) ..( ( i + 1 ) * 100 ) ) . rev ( ) . collect ( ) ;
@@ -2797,45 +2796,49 @@ mod tests {
27972796 "Test requires spilling to exercise the merge path"
27982797 ) ;
27992798
2800- // Call sort() to get the merge stream. After this:
2801- // - With take() (the fix): merge_reservation = 0, merge stream has R bytes
2802- // - With new_empty() (before fix): merge_reservation = R, merge stream has 0 bytes
2803- let merge_stream = sorter. sort ( ) . await ?;
2804-
2805- // Record pool state before dropping the sorter
2806- let reserved_before_drop = pool. reserved ( ) ;
2807-
2808- // Drop the sorter. This frees merge_reservation:
2809- // - With take() (the fix): frees 0 bytes (already transferred to merge stream)
2810- // - With new_empty() (before fix): frees R bytes back to pool
2811- drop ( sorter) ;
2799+ // Before sort(), merge_reservation holds sort_spill_reservation_bytes.
2800+ assert ! (
2801+ sorter. merge_reservation_size( ) >= sort_spill_reservation_bytes,
2802+ "merge_reservation should hold the pre-reserved bytes before sort()"
2803+ ) ;
28122804
2813- let reserved_after_drop = pool. reserved ( ) ;
2805+ // Call sort() to get the merge stream. With the fix (take()),
2806+ // the pre-reserved merge bytes are transferred to the merge
2807+ // stream. Without the fix (free() + new_empty()), the bytes
2808+ // are released back to the pool and the merge stream starts
2809+ // with 0 bytes.
2810+ let merge_stream = sorter. sort ( ) . await ?;
28142811
2815- // THE KEY ASSERTION: dropping the sorter should NOT free the
2816- // pre-reserved merge bytes. They must have been transferred to
2817- // the merge stream via take().
2812+ // THE KEY ASSERTION: after sort(), merge_reservation must be 0.
2813+ // This proves take() transferred the bytes to the merge stream,
2814+ // rather than them being freed back to the pool where other
2815+ // partitions could steal them.
28182816 assert_eq ! (
2819- reserved_after_drop,
2820- reserved_before_drop,
2821- "Dropping the sorter freed {} bytes back to the pool! \
2822- The merge reservation bytes should have been transferred \
2823- to the merge stream (via take()), not freed back to the pool \
2824- (via new_empty()). Freed bytes can be stolen by concurrent \
2825- partitions, causing merge starvation.",
2826- reserved_before_drop - reserved_after_drop
2817+ sorter. merge_reservation_size( ) ,
2818+ 0 ,
2819+ "After sort(), merge_reservation should be 0 (bytes transferred \
2820+ to merge stream via take()). If non-zero, the bytes are still \
2821+ held by the sorter and will be freed on drop, allowing other \
2822+ partitions to steal them."
28272823 ) ;
28282824
2829- // Simulate contention: a task (representing another partition)
2830- // grabs all available pool memory
2831- let task = MemoryConsumer :: new ( "TaskPartition" ) . register ( & pool) ;
2825+ // Drop the sorter to free its reservations back to the pool.
2826+ drop ( sorter) ;
2827+
2828+ // Simulate contention: another partition grabs ALL available
2829+ // pool memory. If the merge stream didn't receive the
2830+ // pre-reserved bytes via take(), it will fail when it tries
2831+ // to allocate memory for reading spill files.
2832+ let contender = MemoryConsumer :: new ( "CompetingPartition" ) . register ( & pool) ;
28322833 let available = pool_size. saturating_sub ( pool. reserved ( ) ) ;
28332834 if available > 0 {
2834- task . try_grow ( available) . unwrap ( ) ;
2835+ contender . try_grow ( available) . unwrap ( ) ;
28352836 }
28362837
2837- // The merge stream should still work because it holds the
2838- // pre-reserved bytes (transferred via take())
2838+ // The merge stream must still produce correct results despite
2839+ // the pool being fully consumed by the contender. This only
2840+ // works if sort() transferred the pre-reserved bytes to the
2841+ // merge stream (via take()) rather than freeing them.
28392842 let batches: Vec < RecordBatch > = merge_stream. try_collect ( ) . await ?;
28402843 let total_rows: usize = batches. iter ( ) . map ( |b| b. num_rows ( ) ) . sum ( ) ;
28412844 assert_eq ! (
@@ -2857,7 +2860,7 @@ mod tests {
28572860 ) ;
28582861 }
28592862
2860- drop ( task ) ;
2863+ drop ( contender ) ;
28612864 Ok ( ( ) )
28622865 }
28632866}
0 commit comments