119119//! factor than the pair-materialization approach.
120120
121121use std:: cmp:: Ordering ;
122- use std:: io:: BufReader ;
123122use std:: pin:: Pin ;
124123use std:: sync:: Arc ;
125124use std:: task:: { Context , Poll } ;
@@ -133,7 +132,6 @@ use crate::spill::spill_manager::SpillManager;
133132use arrow:: array:: { Array , ArrayRef , BooleanArray , BooleanBufferBuilder , RecordBatch } ;
134133use arrow:: compute:: { BatchCoalescer , SortOptions , filter_record_batch, not} ;
135134use arrow:: datatypes:: SchemaRef ;
136- use arrow:: ipc:: reader:: StreamReader ;
137135use arrow:: util:: bit_chunk_iterator:: UnalignedBitChunk ;
138136use arrow:: util:: bit_util:: apply_bitwise_binary_op;
139137use datafusion_common:: {
@@ -257,6 +255,11 @@ pub(crate) struct BitwiseSortMergeJoinStream {
257255 inner_key_buffer : Vec < RecordBatch > ,
258256 inner_key_spill : Option < Arc < dyn SpillFile > > ,
259257
258+ //Track the active spill_stream
259+ spill_stream : Option < SendableRecordBatchStream > ,
260+ //Prevents wiping out the buffer if we yield while evaluating the filter
261+ inner_group_buffered : bool ,
262+
260263 // True when buffer_inner_key_group returned Pending after partially
261264 // filling inner_key_buffer. On re-entry, buffer_inner_key_group
262265 // must skip clear() and resume from poll_next_inner_batch (the
@@ -369,6 +372,8 @@ impl BitwiseSortMergeJoinStream {
369372 matched : BooleanBufferBuilder :: new ( 0 ) ,
370373 inner_key_buffer : vec ! [ ] ,
371374 inner_key_spill : None ,
375+ spill_stream : None ,
376+ inner_group_buffered : false ,
372377 buffering_inner_pending : false ,
373378 pending_boundary : None ,
374379 on_outer,
@@ -466,6 +471,8 @@ impl BitwiseSortMergeJoinStream {
466471 fn clear_inner_key_group ( & mut self ) {
467472 self . inner_key_buffer . clear ( ) ;
468473 self . inner_key_spill = None ;
474+ self . spill_stream = None ;
475+ self . inner_group_buffered = false ;
469476 self . inner_buffer_size = 0 ;
470477 }
471478
@@ -737,7 +744,10 @@ impl BitwiseSortMergeJoinStream {
737744 /// Process a key match with a filter. For each inner row in the buffered
738745 /// key group, evaluates the filter against the outer key group and ORs
739746 /// the results into the matched bitset using u64-chunked bitwise ops.
740- fn process_key_match_with_filter ( & mut self ) -> Result < ( ) > {
747+ fn process_key_match_with_filter (
748+ & mut self ,
749+ cx : & mut Context < ' _ > ,
750+ ) -> Poll < Result < ( ) > > {
741751 self . get_outer_self_cmp ( ) ?;
742752 let filter = self . filter . as_ref ( ) . unwrap ( ) ;
743753 let outer_batch = self . outer_batch . as_ref ( ) . unwrap ( ) ;
@@ -773,25 +783,40 @@ impl BitwiseSortMergeJoinStream {
773783 )
774784 . count_ones ( ) ;
775785
776- // Process spilled inner batches first (read back from disk).
777- if let Some ( spill_file) = & self . inner_key_spill {
778- let sync_reader = spill_file. open_sync_reader ( ) ?;
779- let file = BufReader :: new ( sync_reader) ;
780- let reader = StreamReader :: try_new ( file, None ) ?;
781- for batch_result in reader {
782- let inner_slice = batch_result?;
783- matched_count = eval_filter_for_inner_slice (
784- self . outer_is_left ,
785- filter,
786- & outer_slice,
787- & inner_slice,
788- & mut self . matched ,
789- self . outer_offset ,
790- outer_group_len,
791- matched_count,
792- ) ?;
793- if matched_count == outer_group_len {
794- break ;
786+ // Process spilled inner batches first asynchronously.
787+ if self . inner_key_spill . is_some ( ) || self . spill_stream . is_some ( ) {
788+ if self . spill_stream . is_none ( )
789+ && let Some ( spill_file) = & self . inner_key_spill
790+ {
791+ let stream = self
792+ . spill_manager
793+ . read_spill_as_stream ( Arc :: clone ( spill_file) , None ) ?;
794+ self . spill_stream = Some ( stream) ;
795+ }
796+
797+ while matched_count < outer_group_len {
798+ let stream = self . spill_stream . as_mut ( ) . unwrap ( ) ;
799+ match ready ! ( stream. poll_next_unpin( cx) ) {
800+ Some ( Ok ( inner_slice) ) => {
801+ matched_count = eval_filter_for_inner_slice (
802+ self . outer_is_left ,
803+ filter,
804+ & outer_slice,
805+ & inner_slice,
806+ & mut self . matched ,
807+ self . outer_offset ,
808+ outer_group_len,
809+ matched_count,
810+ ) ?;
811+ }
812+ Some ( Err ( e) ) => {
813+ self . spill_stream = None ;
814+ return Poll :: Ready ( Err ( e) ) ;
815+ }
816+ None => {
817+ self . spill_stream = None ;
818+ break ;
819+ }
795820 }
796821 }
797822 }
@@ -819,13 +844,16 @@ impl BitwiseSortMergeJoinStream {
819844 }
820845
821846 self . outer_offset = outer_group_end;
822- Ok ( ( ) )
847+
848+ self . spill_stream = None ;
849+
850+ Poll :: Ready ( Ok ( ( ) ) )
823851 }
824852
825853 /// Continue processing an outer key group that spans multiple outer
826854 /// batches. Returns `true` if this outer batch was fully consumed
827855 /// by the key group and the caller should load another.
828- fn resume_boundary ( & mut self ) -> Result < bool > {
856+ fn resume_boundary ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < bool > > {
829857 debug_assert ! (
830858 self . outer_batch. is_some( ) ,
831859 "caller must load outer_batch first"
@@ -847,7 +875,7 @@ impl BitwiseSortMergeJoinStream {
847875 } ) ;
848876 self . emit_outer_batch ( ) ?;
849877 self . outer_batch = None ;
850- return Ok ( true ) ;
878+ return Poll :: Ready ( Ok ( true ) ) ;
851879 }
852880 }
853881 }
@@ -863,22 +891,79 @@ impl BitwiseSortMergeJoinStream {
863891 self . null_equality ,
864892 ) ?;
865893 if same_key {
866- self . process_key_match_with_filter ( ) ?;
894+ match self . process_key_match_with_filter ( cx) {
895+ Poll :: Ready ( Ok ( ( ) ) ) => ( ) ,
896+ Poll :: Ready ( Err ( e) ) => return Poll :: Ready ( Err ( e) ) ,
897+ Poll :: Pending => {
898+ self . pending_boundary =
899+ Some ( PendingBoundary :: Filtered { saved_keys } ) ;
900+ return Poll :: Pending ;
901+ }
902+ }
867903 let num_outer = self . outer_batch . as_ref ( ) . unwrap ( ) . num_rows ( ) ;
868904 if self . outer_offset >= num_outer {
869905 self . pending_boundary = Some ( PendingBoundary :: Filtered {
870906 saved_keys : slice_keys ( & self . outer_key_arrays , num_outer - 1 ) ,
871907 } ) ;
872908 self . emit_outer_batch ( ) ?;
873909 self . outer_batch = None ;
874- return Ok ( true ) ;
910+ return Poll :: Ready ( Ok ( true ) ) ;
875911 }
876912 }
877913 self . clear_inner_key_group ( ) ;
878914 }
879915 None => { }
880916 }
881- Ok ( false )
917+ Poll :: Ready ( Ok ( false ) )
918+ }
919+
920+ /// Helper to process an Equal match across potential outer batch boundaries.
921+ fn process_filtered_match_loop ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
922+ loop {
923+ ready ! ( self . process_key_match_with_filter( cx) ) ?;
924+
925+ let outer_batch = self . outer_batch . as_ref ( ) . unwrap ( ) ;
926+ if self . outer_offset >= outer_batch. num_rows ( ) {
927+ let saved_keys =
928+ slice_keys ( & self . outer_key_arrays , outer_batch. num_rows ( ) - 1 ) ;
929+
930+ self . emit_outer_batch ( ) ?;
931+ self . pending_boundary = Some ( PendingBoundary :: Filtered { saved_keys } ) ;
932+
933+ // Clear stale batch before polling
934+ self . outer_batch = None ;
935+
936+ match ready ! ( self . poll_next_outer_batch( cx) ) {
937+ Err ( e) => return Poll :: Ready ( Err ( e) ) ,
938+ Ok ( false ) => {
939+ self . pending_boundary = None ;
940+ break ;
941+ }
942+ Ok ( true ) => {
943+ let Some ( PendingBoundary :: Filtered { saved_keys } ) =
944+ self . pending_boundary . take ( )
945+ else {
946+ unreachable ! ( )
947+ } ;
948+ let same = keys_match (
949+ & saved_keys,
950+ & self . outer_key_arrays ,
951+ & self . sort_options ,
952+ self . null_equality ,
953+ ) ?;
954+ if same {
955+ continue ;
956+ }
957+ break ;
958+ }
959+ }
960+ } else {
961+ break ;
962+ }
963+ }
964+
965+ self . clear_inner_key_group ( ) ; // This resets inner_group_buffered to false
966+ Poll :: Ready ( Ok ( ( ) ) )
882967 }
883968
884969 /// Main loop: drive the merge-scan to produce output batches.
@@ -900,14 +985,21 @@ impl BitwiseSortMergeJoinStream {
900985 }
901986 return Poll :: Ready ( Ok ( None ) ) ;
902987 }
903- Ok ( true ) => {
904- if self . resume_boundary ( ) ? {
905- continue ;
906- }
907- }
988+ Ok ( true ) => { } // Loaded batch, move on to checks
908989 }
909990 }
910991
992+ // Handles pausing while fetching a NEW outer batch.
993+ if self . pending_boundary . is_some ( ) && ready ! ( self . resume_boundary( cx) ) ? {
994+ continue ;
995+ }
996+
997+ // Handles pausing while reading the disk stream mid-batch.
998+ if self . inner_group_buffered {
999+ ready ! ( self . process_filtered_match_loop( cx) ) ?;
1000+ continue ;
1001+ }
1002+
9111003 // 2. Ensure we have an inner batch (unless inner is exhausted).
9121004 // Skip this when resuming a pending boundary — inner was already
9131005 // advanced past the key group before the boundary loop started.
@@ -1032,65 +1124,17 @@ impl BitwiseSortMergeJoinStream {
10321124 }
10331125 Ordering :: Equal => {
10341126 if self . filter . is_some ( ) {
1127+ debug_assert ! ( !self . inner_group_buffered) ;
10351128 // Buffer inner key group (may span batches)
10361129 match ready ! ( self . buffer_inner_key_group( cx) ) {
10371130 Err ( e) => return Poll :: Ready ( Err ( e) ) ,
1038- Ok ( _inner_exhausted) => { }
1131+ Ok ( _inner_exhausted) => {
1132+ self . inner_group_buffered = true ;
1133+ }
10391134 }
1040-
10411135 // Process outer rows against buffered inner group
10421136 // (may need to handle outer batch boundary)
1043- loop {
1044- self . process_key_match_with_filter ( ) ?;
1045-
1046- let outer_batch = self . outer_batch . as_ref ( ) . unwrap ( ) ;
1047- if self . outer_offset >= outer_batch. num_rows ( ) {
1048- let saved_keys = slice_keys (
1049- & self . outer_key_arrays ,
1050- outer_batch. num_rows ( ) - 1 ,
1051- ) ;
1052-
1053- self . emit_outer_batch ( ) ?;
1054- debug_assert ! (
1055- !self . inner_key_buffer. is_empty( )
1056- || self . inner_key_spill. is_some( ) ,
1057- "Filtered pending boundary requires inner key data in buffer or spill"
1058- ) ;
1059- self . pending_boundary =
1060- Some ( PendingBoundary :: Filtered { saved_keys } ) ;
1061-
1062- match ready ! ( self . poll_next_outer_batch( cx) ) {
1063- Err ( e) => return Poll :: Ready ( Err ( e) ) ,
1064- Ok ( false ) => {
1065- self . pending_boundary = None ;
1066- self . outer_batch = None ;
1067- break ;
1068- }
1069- Ok ( true ) => {
1070- let Some ( PendingBoundary :: Filtered {
1071- saved_keys,
1072- } ) = self . pending_boundary . take ( )
1073- else {
1074- unreachable ! ( )
1075- } ;
1076- let same = keys_match (
1077- & saved_keys,
1078- & self . outer_key_arrays ,
1079- & self . sort_options ,
1080- self . null_equality ,
1081- ) ?;
1082- if same {
1083- continue ;
1084- }
1085- break ;
1086- }
1087- }
1088- } else {
1089- break ;
1090- }
1091- }
1092-
1093- self . clear_inner_key_group ( ) ;
1137+ ready ! ( self . process_filtered_match_loop( cx) ) ?;
10941138 } else {
10951139 // No filter: advance inner past key group, then
10961140 // mark all outer rows with this key as matched.
0 commit comments