@@ -53,7 +53,6 @@ use futures::{ready, Stream, StreamExt};
5353use log:: { debug, info} ;
5454use parking_lot:: Mutex ;
5555#[ cfg( target_os = "linux" ) ]
56- use std:: mem;
5756use std:: sync:: OnceLock ;
5857use tokio:: sync:: { OwnedSemaphorePermit , Semaphore } ;
5958
@@ -115,6 +114,8 @@ enum GraceJoinState {
115114 current_stream : Option < SendableRecordBatchStream > ,
116115 left_fut : Option < OnceFut < LoadedPartitionBatches > > ,
117116 right_fut : Option < OnceFut < LoadedPartitionBatches > > ,
117+ base_reservation : Arc < Mutex < MemoryReservation > > ,
118+ prefetch_reservation : Arc < Mutex < MemoryReservation > > ,
118119 /// Bytes reserved in the memory pool for the current partition's
119120 /// loaded left batches
120121 left_bytes : Arc < Mutex < usize > > ,
@@ -476,6 +477,7 @@ pub struct GraceHashJoinStream {
476477 left_input_schema : SchemaRef ,
477478 right_input_schema : SchemaRef ,
478479 spill_fut : OnceFut < SpillFut > ,
480+ partition : usize ,
479481 spill_left : Arc < SpillManager > ,
480482 spill_right : Arc < SpillManager > ,
481483 on_left : Vec < PhysicalExprRef > ,
@@ -486,10 +488,10 @@ pub struct GraceHashJoinStream {
486488 column_indices : Vec < ColumnIndex > ,
487489 join_metrics : Arc < BuildProbeJoinMetrics > ,
488490 context : Arc < TaskContext > ,
489- /// Memory reservation tracking in-memory buffers used by the join stream
490- reservation : Arc < Mutex < MemoryReservation > > ,
491- /// Memory reservation dedicated to prefetching the next partition
492- prefetch_reservation : Arc < Mutex < MemoryReservation > > ,
491+ /// Lazily registered reservation tracking in-memory buffers used by the join stream
492+ reservation : OnceLock < Arc < Mutex < MemoryReservation > > > ,
493+ /// Lazily registered reservation dedicated to prefetching the next partition
494+ prefetch_reservation : OnceLock < Arc < Mutex < MemoryReservation > > > ,
493495 random_state : RandomState ,
494496 partition_batch_size : usize ,
495497 adaptive_budget : AdaptivePartitionBudget ,
@@ -538,11 +540,34 @@ impl RecordBatchStream for GraceHashJoinStream {
538540}
539541
540542impl GraceHashJoinStream {
543+ /// Ensure the main join reservation is registered only when the join phase starts.
544+ fn main_reservation ( & self ) -> Arc < Mutex < MemoryReservation > > {
545+ Arc :: clone ( self . reservation . get_or_init ( || {
546+ let reservation =
547+ MemoryConsumer :: new ( format ! ( "GraceHashJoinStream[{}]" , self . partition) )
548+ . with_can_spill ( true )
549+ . register ( self . context . memory_pool ( ) ) ;
550+ Arc :: new ( Mutex :: new ( reservation) )
551+ } ) )
552+ }
553+
554+ /// Lazily register a reservation for prefetching; avoids bloating spillable consumer count while partitioning.
555+ fn ensure_prefetch_reservation ( & self ) -> Arc < Mutex < MemoryReservation > > {
556+ Arc :: clone ( self . prefetch_reservation . get_or_init ( || {
557+ let reservation =
558+ MemoryConsumer :: new ( format ! ( "GraceHashJoinPrefetch[{}]" , self . partition) )
559+ . with_can_spill ( true )
560+ . register ( self . context . memory_pool ( ) ) ;
561+ Arc :: new ( Mutex :: new ( reservation) )
562+ } ) )
563+ }
564+
541565 pub fn new (
542566 schema : SchemaRef ,
543567 left_input_schema : SchemaRef ,
544568 right_input_schema : SchemaRef ,
545569 spill_fut : OnceFut < SpillFut > ,
570+ partition : usize ,
546571 spill_left : Arc < SpillManager > ,
547572 spill_right : Arc < SpillManager > ,
548573 on_left : Vec < PhysicalExprRef > ,
@@ -553,8 +578,6 @@ impl GraceHashJoinStream {
553578 column_indices : Vec < ColumnIndex > ,
554579 join_metrics : Arc < BuildProbeJoinMetrics > ,
555580 context : Arc < TaskContext > ,
556- reservation : MemoryReservation ,
557- prefetch_reservation : MemoryReservation ,
558581 random_state : RandomState ,
559582 partition_batch_size : usize ,
560583 base_partition_budget_bytes : usize ,
@@ -574,6 +597,7 @@ impl GraceHashJoinStream {
574597 left_input_schema,
575598 right_input_schema,
576599 spill_fut,
600+ partition,
577601 spill_left,
578602 spill_right,
579603 on_left,
@@ -584,8 +608,8 @@ impl GraceHashJoinStream {
584608 column_indices,
585609 join_metrics,
586610 context,
587- reservation : Arc :: new ( Mutex :: new ( reservation ) ) ,
588- prefetch_reservation : Arc :: new ( Mutex :: new ( prefetch_reservation ) ) ,
611+ reservation : OnceLock :: new ( ) ,
612+ prefetch_reservation : OnceLock :: new ( ) ,
589613 random_state,
590614 partition_batch_size,
591615 adaptive_budget,
@@ -652,15 +676,19 @@ impl GraceHashJoinStream {
652676 }
653677 let left_bytes = Arc :: new ( Mutex :: new ( 0usize ) ) ;
654678 let right_bytes = Arc :: new ( Mutex :: new ( 0usize ) ) ;
679+ let base_reservation = self . main_reservation ( ) ;
680+ let prefetch_reservation = self . ensure_prefetch_reservation ( ) ;
655681 self . state = GraceJoinState :: JoinPartition {
656682 work_queue,
657683 current_work : None ,
658684 current_stream : None ,
659685 left_fut : None ,
660686 right_fut : None ,
687+ base_reservation : Arc :: clone ( & base_reservation) ,
688+ prefetch_reservation : Arc :: clone ( & prefetch_reservation) ,
661689 left_bytes,
662690 right_bytes,
663- reservation : Arc :: clone ( & self . reservation ) ,
691+ reservation : base_reservation ,
664692 join_permit : None ,
665693 join_permit_fut : None ,
666694 current_join_start : None ,
@@ -676,6 +704,8 @@ impl GraceHashJoinStream {
676704 current_stream,
677705 left_fut,
678706 right_fut,
707+ base_reservation,
708+ prefetch_reservation,
679709 left_bytes,
680710 right_bytes,
681711 reservation,
@@ -692,7 +722,7 @@ impl GraceHashJoinStream {
692722 * current_work = Some ( work) ;
693723 * left_bytes. lock ( ) = 0 ;
694724 * right_bytes. lock ( ) = 0 ;
695- * reservation = Arc :: clone ( & self . reservation ) ;
725+ * reservation = Arc :: clone ( base_reservation ) ;
696726 * join_permit = None ;
697727 * join_permit_fut = None ;
698728 self . adaptive_budget . update_active_partitions ( 1 ) ;
@@ -1221,13 +1251,13 @@ impl GraceHashJoinStream {
12211251 let left_fut_pf = load_partition_async (
12221252 Arc :: clone ( & self . spill_left ) ,
12231253 next_work. left . clone ( ) ,
1224- Arc :: clone ( & self . prefetch_reservation ) ,
1254+ Arc :: clone ( prefetch_reservation) ,
12251255 Arc :: clone ( & left_bytes_pf) ,
12261256 ) ;
12271257 let right_fut_pf = load_partition_async (
12281258 Arc :: clone ( & self . spill_right ) ,
12291259 next_work. right . clone ( ) ,
1230- Arc :: clone ( & self . prefetch_reservation ) ,
1260+ Arc :: clone ( prefetch_reservation) ,
12311261 Arc :: clone ( & right_bytes_pf) ,
12321262 ) ;
12331263 debug ! (
@@ -1242,7 +1272,7 @@ impl GraceHashJoinStream {
12421272 right_fut : right_fut_pf,
12431273 left_bytes : left_bytes_pf,
12441274 right_bytes : right_bytes_pf,
1245- reservation : Arc :: clone ( & self . prefetch_reservation ) ,
1275+ reservation : Arc :: clone ( prefetch_reservation) ,
12461276 } ) ;
12471277 } else {
12481278 let key = ( next_work. partition_id , next_work. pass ) ;
@@ -1311,7 +1341,7 @@ impl GraceHashJoinStream {
13111341 * current_stream = None ;
13121342 * current_work = None ;
13131343 * last_prefetch_skip = None ;
1314- * reservation = Arc :: clone ( & self . reservation ) ;
1344+ * reservation = Arc :: clone ( base_reservation ) ;
13151345 * join_permit = None ;
13161346 self . adaptive_budget . update_active_partitions ( 1 ) ;
13171347 continue ;
0 commit comments