119119//! factor than the pair-materialization approach.
120120
121121use std:: cmp:: Ordering ;
122- use std:: io:: BufReader ;
123122use std:: pin:: Pin ;
124123use std:: sync:: Arc ;
125124use std:: task:: { Context , Poll } ;
@@ -133,7 +132,6 @@ use crate::{EmptyRecordBatchStream, RecordBatchStream};
133132use arrow:: array:: { Array , ArrayRef , BooleanArray , BooleanBufferBuilder , RecordBatch } ;
134133use arrow:: compute:: { BatchCoalescer , SortOptions , filter_record_batch, not} ;
135134use arrow:: datatypes:: SchemaRef ;
136- use arrow:: ipc:: reader:: StreamReader ;
137135use arrow:: util:: bit_chunk_iterator:: UnalignedBitChunk ;
138136use arrow:: util:: bit_util:: apply_bitwise_binary_op;
139137use datafusion_common:: {
@@ -257,6 +255,11 @@ pub(crate) struct BitwiseSortMergeJoinStream {
257255 inner_key_buffer : Vec < RecordBatch > ,
258256 inner_key_spill : Option < Arc < dyn SpillFile > > ,
259257
258+ //Track the active spill_stream
259+ spill_stream : Option < SendableRecordBatchStream > ,
260+ //Prevents wiping out the buffer if we yield while evaluating the filter
261+ inner_group_buffered : bool ,
262+
260263 // True when buffer_inner_key_group returned Pending after partially
261264 // filling inner_key_buffer. On re-entry, buffer_inner_key_group
262265 // must skip clear() and resume from poll_next_inner_batch (the
@@ -369,6 +372,8 @@ impl BitwiseSortMergeJoinStream {
369372 matched : BooleanBufferBuilder :: new ( 0 ) ,
370373 inner_key_buffer : vec ! [ ] ,
371374 inner_key_spill : None ,
375+ spill_stream : None ,
376+ inner_group_buffered : false ,
372377 buffering_inner_pending : false ,
373378 pending_boundary : None ,
374379 on_outer,
@@ -466,6 +471,8 @@ impl BitwiseSortMergeJoinStream {
466471 fn clear_inner_key_group ( & mut self ) {
467472 self . inner_key_buffer . clear ( ) ;
468473 self . inner_key_spill = None ;
474+ self . spill_stream = None ;
475+ self . inner_group_buffered = false ;
469476 self . inner_buffer_size = 0 ;
470477 }
471478
@@ -747,7 +754,10 @@ impl BitwiseSortMergeJoinStream {
747754 /// Process a key match with a filter. For each inner row in the buffered
748755 /// key group, evaluates the filter against the outer key group and ORs
749756 /// the results into the matched bitset using u64-chunked bitwise ops.
750- fn process_key_match_with_filter ( & mut self ) -> Result < ( ) > {
757+ fn process_key_match_with_filter (
758+ & mut self ,
759+ cx : & mut Context < ' _ > ,
760+ ) -> Poll < Result < ( ) > > {
751761 self . get_outer_self_cmp ( ) ?;
752762 let filter = self . filter . as_ref ( ) . unwrap ( ) ;
753763 let outer_batch = self . outer_batch . as_ref ( ) . unwrap ( ) ;
@@ -783,25 +793,40 @@ impl BitwiseSortMergeJoinStream {
783793 )
784794 . count_ones ( ) ;
785795
786- // Process spilled inner batches first (read back from disk).
787- if let Some ( spill_file) = & self . inner_key_spill {
788- let sync_reader = spill_file. open_sync_reader ( ) ?;
789- let file = BufReader :: new ( sync_reader) ;
790- let reader = StreamReader :: try_new ( file, None ) ?;
791- for batch_result in reader {
792- let inner_slice = batch_result?;
793- matched_count = eval_filter_for_inner_slice (
794- self . outer_is_left ,
795- filter,
796- & outer_slice,
797- & inner_slice,
798- & mut self . matched ,
799- self . outer_offset ,
800- outer_group_len,
801- matched_count,
802- ) ?;
803- if matched_count == outer_group_len {
804- break ;
796+ // Process spilled inner batches first asynchronously.
797+ if self . inner_key_spill . is_some ( ) || self . spill_stream . is_some ( ) {
798+ if self . spill_stream . is_none ( )
799+ && let Some ( spill_file) = & self . inner_key_spill
800+ {
801+ let stream = self
802+ . spill_manager
803+ . read_spill_as_stream ( Arc :: clone ( spill_file) , None ) ?;
804+ self . spill_stream = Some ( stream) ;
805+ }
806+
807+ while matched_count < outer_group_len {
808+ let stream = self . spill_stream . as_mut ( ) . unwrap ( ) ;
809+ match ready ! ( stream. poll_next_unpin( cx) ) {
810+ Some ( Ok ( inner_slice) ) => {
811+ matched_count = eval_filter_for_inner_slice (
812+ self . outer_is_left ,
813+ filter,
814+ & outer_slice,
815+ & inner_slice,
816+ & mut self . matched ,
817+ self . outer_offset ,
818+ outer_group_len,
819+ matched_count,
820+ ) ?;
821+ }
822+ Some ( Err ( e) ) => {
823+ self . spill_stream = None ;
824+ return Poll :: Ready ( Err ( e) ) ;
825+ }
826+ None => {
827+ self . spill_stream = None ;
828+ break ;
829+ }
805830 }
806831 }
807832 }
@@ -829,13 +854,16 @@ impl BitwiseSortMergeJoinStream {
829854 }
830855
831856 self . outer_offset = outer_group_end;
832- Ok ( ( ) )
857+
858+ self . spill_stream = None ;
859+
860+ Poll :: Ready ( Ok ( ( ) ) )
833861 }
834862
835863 /// Continue processing an outer key group that spans multiple outer
836864 /// batches. Returns `true` if this outer batch was fully consumed
837865 /// by the key group and the caller should load another.
838- fn resume_boundary ( & mut self ) -> Result < bool > {
866+ fn resume_boundary ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < bool > > {
839867 debug_assert ! (
840868 self . outer_batch. is_some( ) ,
841869 "caller must load outer_batch first"
@@ -857,7 +885,7 @@ impl BitwiseSortMergeJoinStream {
857885 } ) ;
858886 self . emit_outer_batch ( ) ?;
859887 self . outer_batch = None ;
860- return Ok ( true ) ;
888+ return Poll :: Ready ( Ok ( true ) ) ;
861889 }
862890 }
863891 }
@@ -873,22 +901,79 @@ impl BitwiseSortMergeJoinStream {
873901 self . null_equality ,
874902 ) ?;
875903 if same_key {
876- self . process_key_match_with_filter ( ) ?;
904+ match self . process_key_match_with_filter ( cx) {
905+ Poll :: Ready ( Ok ( ( ) ) ) => ( ) ,
906+ Poll :: Ready ( Err ( e) ) => return Poll :: Ready ( Err ( e) ) ,
907+ Poll :: Pending => {
908+ self . pending_boundary =
909+ Some ( PendingBoundary :: Filtered { saved_keys } ) ;
910+ return Poll :: Pending ;
911+ }
912+ }
877913 let num_outer = self . outer_batch . as_ref ( ) . unwrap ( ) . num_rows ( ) ;
878914 if self . outer_offset >= num_outer {
879915 self . pending_boundary = Some ( PendingBoundary :: Filtered {
880916 saved_keys : slice_keys ( & self . outer_key_arrays , num_outer - 1 ) ,
881917 } ) ;
882918 self . emit_outer_batch ( ) ?;
883919 self . outer_batch = None ;
884- return Ok ( true ) ;
920+ return Poll :: Ready ( Ok ( true ) ) ;
885921 }
886922 }
887923 self . clear_inner_key_group ( ) ;
888924 }
889925 None => { }
890926 }
891- Ok ( false )
927+ Poll :: Ready ( Ok ( false ) )
928+ }
929+
930+ /// Helper to process an Equal match across potential outer batch boundaries.
931+ fn process_filtered_match_loop ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
932+ loop {
933+ ready ! ( self . process_key_match_with_filter( cx) ) ?;
934+
935+ let outer_batch = self . outer_batch . as_ref ( ) . unwrap ( ) ;
936+ if self . outer_offset >= outer_batch. num_rows ( ) {
937+ let saved_keys =
938+ slice_keys ( & self . outer_key_arrays , outer_batch. num_rows ( ) - 1 ) ;
939+
940+ self . emit_outer_batch ( ) ?;
941+ self . pending_boundary = Some ( PendingBoundary :: Filtered { saved_keys } ) ;
942+
943+ // Clear stale batch before polling
944+ self . outer_batch = None ;
945+
946+ match ready ! ( self . poll_next_outer_batch( cx) ) {
947+ Err ( e) => return Poll :: Ready ( Err ( e) ) ,
948+ Ok ( false ) => {
949+ self . pending_boundary = None ;
950+ break ;
951+ }
952+ Ok ( true ) => {
953+ let Some ( PendingBoundary :: Filtered { saved_keys } ) =
954+ self . pending_boundary . take ( )
955+ else {
956+ unreachable ! ( )
957+ } ;
958+ let same = keys_match (
959+ & saved_keys,
960+ & self . outer_key_arrays ,
961+ & self . sort_options ,
962+ self . null_equality ,
963+ ) ?;
964+ if same {
965+ continue ;
966+ }
967+ break ;
968+ }
969+ }
970+ } else {
971+ break ;
972+ }
973+ }
974+
975+ self . clear_inner_key_group ( ) ; // This resets inner_group_buffered to false
976+ Poll :: Ready ( Ok ( ( ) ) )
892977 }
893978
894979 /// Main loop: drive the merge-scan to produce output batches.
@@ -910,14 +995,21 @@ impl BitwiseSortMergeJoinStream {
910995 }
911996 return Poll :: Ready ( Ok ( None ) ) ;
912997 }
913- Ok ( true ) => {
914- if self . resume_boundary ( ) ? {
915- continue ;
916- }
917- }
998+ Ok ( true ) => { } // Loaded batch, move on to checks
918999 }
9191000 }
9201001
1002+ // Handles pausing while fetching a NEW outer batch.
1003+ if self . pending_boundary . is_some ( ) && ready ! ( self . resume_boundary( cx) ) ? {
1004+ continue ;
1005+ }
1006+
1007+ // Handles pausing while reading the disk stream mid-batch.
1008+ if self . inner_group_buffered {
1009+ ready ! ( self . process_filtered_match_loop( cx) ) ?;
1010+ continue ;
1011+ }
1012+
9211013 // 2. Ensure we have an inner batch (unless inner is exhausted).
9221014 // Skip this when resuming a pending boundary — inner was already
9231015 // advanced past the key group before the boundary loop started.
@@ -1042,65 +1134,17 @@ impl BitwiseSortMergeJoinStream {
10421134 }
10431135 Ordering :: Equal => {
10441136 if self . filter . is_some ( ) {
1137+ debug_assert ! ( !self . inner_group_buffered) ;
10451138 // Buffer inner key group (may span batches)
10461139 match ready ! ( self . buffer_inner_key_group( cx) ) {
10471140 Err ( e) => return Poll :: Ready ( Err ( e) ) ,
1048- Ok ( _inner_exhausted) => { }
1141+ Ok ( _inner_exhausted) => {
1142+ self . inner_group_buffered = true ;
1143+ }
10491144 }
1050-
10511145 // Process outer rows against buffered inner group
10521146 // (may need to handle outer batch boundary)
1053- loop {
1054- self . process_key_match_with_filter ( ) ?;
1055-
1056- let outer_batch = self . outer_batch . as_ref ( ) . unwrap ( ) ;
1057- if self . outer_offset >= outer_batch. num_rows ( ) {
1058- let saved_keys = slice_keys (
1059- & self . outer_key_arrays ,
1060- outer_batch. num_rows ( ) - 1 ,
1061- ) ;
1062-
1063- self . emit_outer_batch ( ) ?;
1064- debug_assert ! (
1065- !self . inner_key_buffer. is_empty( )
1066- || self . inner_key_spill. is_some( ) ,
1067- "Filtered pending boundary requires inner key data in buffer or spill"
1068- ) ;
1069- self . pending_boundary =
1070- Some ( PendingBoundary :: Filtered { saved_keys } ) ;
1071-
1072- match ready ! ( self . poll_next_outer_batch( cx) ) {
1073- Err ( e) => return Poll :: Ready ( Err ( e) ) ,
1074- Ok ( false ) => {
1075- self . pending_boundary = None ;
1076- self . outer_batch = None ;
1077- break ;
1078- }
1079- Ok ( true ) => {
1080- let Some ( PendingBoundary :: Filtered {
1081- saved_keys,
1082- } ) = self . pending_boundary . take ( )
1083- else {
1084- unreachable ! ( )
1085- } ;
1086- let same = keys_match (
1087- & saved_keys,
1088- & self . outer_key_arrays ,
1089- & self . sort_options ,
1090- self . null_equality ,
1091- ) ?;
1092- if same {
1093- continue ;
1094- }
1095- break ;
1096- }
1097- }
1098- } else {
1099- break ;
1100- }
1101- }
1102-
1103- self . clear_inner_key_group ( ) ;
1147+ ready ! ( self . process_filtered_match_loop( cx) ) ?;
11041148 } else {
11051149 // No filter: advance inner past key group, then
11061150 // mark all outer rows with this key as matched.
0 commit comments