@@ -2708,109 +2708,4 @@ mod tests {
27082708
27092709 Ok ( ( ) )
27102710 }
2711-
2712- /// Test that concurrent sort partitions sharing a tight memory pool
2713- /// don't starve during the merge phase.
2714- ///
2715- /// This reproduces the starvation scenario where:
2716- /// 1. Multiple ExternalSorter instances share a single GreedyMemoryPool
2717- /// 2. Each reserves `sort_spill_reservation_bytes` for its merge phase
2718- /// 3. After spilling, the merge must proceed using the pre-reserved bytes
2719- /// without additional pool allocation
2720- ///
2721- /// Without the fix (using `take()` + smart tracking), the merge's
2722- /// `new_empty()` reservation starts at 0 bytes and the pre-reserved bytes
2723- /// sit unused in ExternalSorter's merge_reservation. When other partitions
2724- /// consume the freed memory, the merge starves.
2725- ///
2726- /// With the fix, the pre-reserved bytes are atomically transferred to the
2727- /// merge stream and used for spill file buffer reservations, preventing
2728- /// starvation.
2729- #[ tokio:: test]
2730- async fn test_sort_merge_no_starvation_with_concurrent_partitions ( ) -> Result < ( ) > {
2731- use futures:: TryStreamExt ;
2732-
2733- let sort_spill_reservation_bytes: usize = 10 * 1024 ; // 10 KB per partition
2734- let num_partitions: usize = 4 ;
2735-
2736- // Pool: each partition needs sort_spill_reservation_bytes for its merge,
2737- // plus a small amount for data accumulation before spilling.
2738- // Total: 4 * 10KB + 8KB = 48KB -- very tight.
2739- let memory_limit =
2740- sort_spill_reservation_bytes * num_partitions + 8 * 1024 ;
2741-
2742- let session_config = SessionConfig :: new ( )
2743- . with_batch_size ( 128 )
2744- . with_sort_spill_reservation_bytes ( sort_spill_reservation_bytes) ;
2745-
2746- let runtime = RuntimeEnvBuilder :: new ( )
2747- . with_memory_limit ( memory_limit, 1.0 )
2748- . build_arc ( ) ?;
2749-
2750- let task_ctx = Arc :: new (
2751- TaskContext :: default ( )
2752- . with_session_config ( session_config)
2753- . with_runtime ( runtime) ,
2754- ) ;
2755-
2756- // Create multiple batches per partition to force spilling.
2757- // Each batch: 100 rows of Int32 ≈ 400 bytes.
2758- // 20 batches per partition ≈ 8KB per partition.
2759- // With only ~2KB of pool headroom per partition, this forces spilling.
2760- let batches_per_partition = 20 ;
2761- let rows_per_batch: i32 = 100 ;
2762-
2763- let all_partitions: Vec < Vec < RecordBatch > > = ( 0 ..num_partitions)
2764- . map ( |_| {
2765- ( 0 ..batches_per_partition)
2766- . map ( |_| make_partition ( rows_per_batch) )
2767- . collect ( )
2768- } )
2769- . collect ( ) ;
2770-
2771- let schema = all_partitions[ 0 ] [ 0 ] . schema ( ) ;
2772- let input = TestMemoryExec :: try_new_exec ( & all_partitions, schema. clone ( ) , None ) ?;
2773-
2774- let sort_exec = Arc :: new (
2775- SortExec :: new (
2776- [ PhysicalSortExpr {
2777- expr : col ( "i" , & schema) ?,
2778- options : SortOptions :: default ( ) ,
2779- } ]
2780- . into ( ) ,
2781- input,
2782- )
2783- . with_preserve_partitioning ( true ) ,
2784- ) ;
2785-
2786- // Execute all partitions concurrently -- they share the same pool.
2787- let mut tasks = Vec :: new ( ) ;
2788- for partition in 0 ..num_partitions {
2789- let sort = Arc :: clone ( & sort_exec) ;
2790- let ctx = Arc :: clone ( & task_ctx) ;
2791- tasks. push ( tokio:: spawn ( async move {
2792- let stream = sort. execute ( partition, ctx) ?;
2793- let batches: Vec < RecordBatch > = stream. try_collect ( ) . await ?;
2794- let total_rows: usize = batches. iter ( ) . map ( |b| b. num_rows ( ) ) . sum ( ) ;
2795- Ok :: < usize , DataFusionError > ( total_rows)
2796- } ) ) ;
2797- }
2798-
2799- let mut total_rows = 0 ;
2800- for task in tasks {
2801- total_rows += task. await . unwrap ( ) ?;
2802- }
2803-
2804- let expected_rows =
2805- num_partitions * batches_per_partition * ( rows_per_batch as usize ) ;
2806- assert_eq ! ( total_rows, expected_rows) ;
2807-
2808- assert_eq ! (
2809- task_ctx. runtime_env( ) . memory_pool. reserved( ) ,
2810- 0 ,
2811- "All memory should be returned to the pool after sort completes"
2812- ) ;
2813-
2814- Ok ( ( ) )
2815- }
28162711}
0 commit comments