119119//! factor than the pair-materialization approach.
120120
121121use std:: cmp:: Ordering ;
122- use std:: fs:: File ;
123- use std:: io:: BufReader ;
124122use std:: pin:: Pin ;
125123use std:: sync:: Arc ;
126124use std:: task:: { Context , Poll } ;
@@ -134,7 +132,6 @@ use crate::{EmptyRecordBatchStream, RecordBatchStream};
134132use arrow:: array:: { Array , ArrayRef , BooleanArray , BooleanBufferBuilder , RecordBatch } ;
135133use arrow:: compute:: { BatchCoalescer , SortOptions , filter_record_batch, not} ;
136134use arrow:: datatypes:: SchemaRef ;
137- use arrow:: ipc:: reader:: StreamReader ;
138135use arrow:: util:: bit_chunk_iterator:: UnalignedBitChunk ;
139136use arrow:: util:: bit_util:: apply_bitwise_binary_op;
140137use datafusion_common:: {
@@ -259,6 +256,13 @@ pub(crate) struct BitwiseSortMergeJoinStream {
259256 inner_key_buffer : Vec < RecordBatch > ,
260257 inner_key_spill : Option < RefCountedTempFile > ,
261258
259+ // Track the active spill_stream
260+ spill_stream : Option < SendableRecordBatchStream > ,
261+ // Whether the active spill stream has produced any batches yet.
262+ spill_stream_has_data : bool ,
263+ // Prevents wiping out the buffer if we yield while evaluating the filter
264+ inner_group_buffered : bool ,
265+
262266 // True when buffer_inner_key_group returned Pending after partially
263267 // filling inner_key_buffer. On re-entry, buffer_inner_key_group
264268 // must skip clear() and resume from poll_next_inner_batch (the
@@ -371,6 +375,9 @@ impl BitwiseSortMergeJoinStream {
371375 matched : BooleanBufferBuilder :: new ( 0 ) ,
372376 inner_key_buffer : vec ! [ ] ,
373377 inner_key_spill : None ,
378+ spill_stream : None ,
379+ spill_stream_has_data : false ,
380+ inner_group_buffered : false ,
374381 buffering_inner_pending : false ,
375382 pending_boundary : None ,
376383 on_outer,
@@ -468,6 +475,9 @@ impl BitwiseSortMergeJoinStream {
468475 fn clear_inner_key_group ( & mut self ) {
469476 self . inner_key_buffer . clear ( ) ;
470477 self . inner_key_spill = None ;
478+ self . spill_stream = None ;
479+ self . spill_stream_has_data = false ;
480+ self . inner_group_buffered = false ;
471481 self . inner_buffer_size = 0 ;
472482 }
473483
@@ -749,7 +759,10 @@ impl BitwiseSortMergeJoinStream {
749759 /// Process a key match with a filter. For each inner row in the buffered
750760 /// key group, evaluates the filter against the outer key group and ORs
751761 /// the results into the matched bitset using u64-chunked bitwise ops.
752- fn process_key_match_with_filter ( & mut self ) -> Result < ( ) > {
762+ fn process_key_match_with_filter (
763+ & mut self ,
764+ cx : & mut Context < ' _ > ,
765+ ) -> Poll < Result < ( ) > > {
753766 self . get_outer_self_cmp ( ) ?;
754767 let filter = self . filter . as_ref ( ) . unwrap ( ) ;
755768 let outer_batch = self . outer_batch . as_ref ( ) . unwrap ( ) ;
@@ -785,24 +798,47 @@ impl BitwiseSortMergeJoinStream {
785798 )
786799 . count_ones ( ) ;
787800
788- // Process spilled inner batches first (read back from disk).
789- if let Some ( spill_file) = & self . inner_key_spill {
790- let file = BufReader :: new ( File :: open ( spill_file. path ( ) ) ?) ;
791- let reader = StreamReader :: try_new ( file, None ) ?;
792- for batch_result in reader {
793- let inner_slice = batch_result?;
794- matched_count = eval_filter_for_inner_slice (
795- self . outer_is_left ,
796- filter,
797- & outer_slice,
798- & inner_slice,
799- & mut self . matched ,
800- self . outer_offset ,
801- outer_group_len,
802- matched_count,
803- ) ?;
804- if matched_count == outer_group_len {
805- break ;
801+ // Process spilled inner batches first asynchronously.
802+ if matched_count < outer_group_len
803+ && ( self . inner_key_spill . is_some ( ) || self . spill_stream . is_some ( ) )
804+ {
805+ if self . spill_stream . is_none ( )
806+ && let Some ( spill_file) = & self . inner_key_spill
807+ {
808+ let stream = self
809+ . spill_manager
810+ . read_spill_as_stream ( spill_file. clone ( ) , None ) ?;
811+ self . spill_stream = Some ( stream) ;
812+ }
813+
814+ while matched_count < outer_group_len {
815+ let stream = self . spill_stream . as_mut ( ) . unwrap ( ) ;
816+ match ready ! ( stream. poll_next_unpin( cx) ) {
817+ Some ( Ok ( inner_slice) ) => {
818+ self . spill_stream_has_data = true ;
819+ matched_count = eval_filter_for_inner_slice (
820+ self . outer_is_left ,
821+ filter,
822+ & outer_slice,
823+ & inner_slice,
824+ & mut self . matched ,
825+ self . outer_offset ,
826+ outer_group_len,
827+ matched_count,
828+ ) ?;
829+ }
830+ Some ( Err ( e) ) => {
831+ self . spill_stream = None ;
832+ self . spill_stream_has_data = false ;
833+ return Poll :: Ready ( Err ( e) ) ;
834+ }
835+ None => {
836+ self . spill_stream = None ;
837+ if !self . spill_stream_has_data {
838+ return Poll :: Ready ( internal_err ! ( "Spill file was empty" ) ) ;
839+ }
840+ break ;
841+ }
806842 }
807843 }
808844 }
@@ -830,13 +866,17 @@ impl BitwiseSortMergeJoinStream {
830866 }
831867
832868 self . outer_offset = outer_group_end;
833- Ok ( ( ) )
869+
870+ self . spill_stream = None ;
871+ self . spill_stream_has_data = false ;
872+
873+ Poll :: Ready ( Ok ( ( ) ) )
834874 }
835875
836876 /// Continue processing an outer key group that spans multiple outer
837877 /// batches. Returns `true` if this outer batch was fully consumed
838878 /// by the key group and the caller should load another.
839- fn resume_boundary ( & mut self ) -> Result < bool > {
879+ fn resume_boundary ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < bool > > {
840880 debug_assert ! (
841881 self . outer_batch. is_some( ) ,
842882 "caller must load outer_batch first"
@@ -858,7 +898,7 @@ impl BitwiseSortMergeJoinStream {
858898 } ) ;
859899 self . emit_outer_batch ( ) ?;
860900 self . outer_batch = None ;
861- return Ok ( true ) ;
901+ return Poll :: Ready ( Ok ( true ) ) ;
862902 }
863903 }
864904 }
@@ -874,22 +914,79 @@ impl BitwiseSortMergeJoinStream {
874914 self . null_equality ,
875915 ) ?;
876916 if same_key {
877- self . process_key_match_with_filter ( ) ?;
917+ match self . process_key_match_with_filter ( cx) {
918+ Poll :: Ready ( Ok ( ( ) ) ) => ( ) ,
919+ Poll :: Ready ( Err ( e) ) => return Poll :: Ready ( Err ( e) ) ,
920+ Poll :: Pending => {
921+ self . pending_boundary =
922+ Some ( PendingBoundary :: Filtered { saved_keys } ) ;
923+ return Poll :: Pending ;
924+ }
925+ }
878926 let num_outer = self . outer_batch . as_ref ( ) . unwrap ( ) . num_rows ( ) ;
879927 if self . outer_offset >= num_outer {
880928 self . pending_boundary = Some ( PendingBoundary :: Filtered {
881929 saved_keys : slice_keys ( & self . outer_key_arrays , num_outer - 1 ) ,
882930 } ) ;
883931 self . emit_outer_batch ( ) ?;
884932 self . outer_batch = None ;
885- return Ok ( true ) ;
933+ return Poll :: Ready ( Ok ( true ) ) ;
886934 }
887935 }
888936 self . clear_inner_key_group ( ) ;
889937 }
890938 None => { }
891939 }
892- Ok ( false )
940+ Poll :: Ready ( Ok ( false ) )
941+ }
942+
943+ /// Helper to process an Equal match across potential outer batch boundaries.
944+ fn process_filtered_match_loop ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
945+ loop {
946+ ready ! ( self . process_key_match_with_filter( cx) ) ?;
947+
948+ let outer_batch = self . outer_batch . as_ref ( ) . unwrap ( ) ;
949+ if self . outer_offset >= outer_batch. num_rows ( ) {
950+ let saved_keys =
951+ slice_keys ( & self . outer_key_arrays , outer_batch. num_rows ( ) - 1 ) ;
952+
953+ self . emit_outer_batch ( ) ?;
954+ self . pending_boundary = Some ( PendingBoundary :: Filtered { saved_keys } ) ;
955+
956+ // Clear stale batch before polling
957+ self . outer_batch = None ;
958+
959+ match ready ! ( self . poll_next_outer_batch( cx) ) {
960+ Err ( e) => return Poll :: Ready ( Err ( e) ) ,
961+ Ok ( false ) => {
962+ self . pending_boundary = None ;
963+ break ;
964+ }
965+ Ok ( true ) => {
966+ let Some ( PendingBoundary :: Filtered { saved_keys } ) =
967+ self . pending_boundary . take ( )
968+ else {
969+ unreachable ! ( )
970+ } ;
971+ let same = keys_match (
972+ & saved_keys,
973+ & self . outer_key_arrays ,
974+ & self . sort_options ,
975+ self . null_equality ,
976+ ) ?;
977+ if same {
978+ continue ;
979+ }
980+ break ;
981+ }
982+ }
983+ } else {
984+ break ;
985+ }
986+ }
987+
988+ self . clear_inner_key_group ( ) ; // This resets inner_group_buffered to false
989+ Poll :: Ready ( Ok ( ( ) ) )
893990 }
894991
895992 /// Main loop: drive the merge-scan to produce output batches.
@@ -911,14 +1008,21 @@ impl BitwiseSortMergeJoinStream {
9111008 }
9121009 return Poll :: Ready ( Ok ( None ) ) ;
9131010 }
914- Ok ( true ) => {
915- if self . resume_boundary ( ) ? {
916- continue ;
917- }
918- }
1011+ Ok ( true ) => { } // Loaded batch, move on to checks
9191012 }
9201013 }
9211014
1015+ // Handles pausing while fetching a NEW outer batch.
1016+ if self . pending_boundary . is_some ( ) && ready ! ( self . resume_boundary( cx) ) ? {
1017+ continue ;
1018+ }
1019+
1020+ // Handles pausing while reading the disk stream mid-batch.
1021+ if self . inner_group_buffered {
1022+ ready ! ( self . process_filtered_match_loop( cx) ) ?;
1023+ continue ;
1024+ }
1025+
9221026 // 2. Ensure we have an inner batch (unless inner is exhausted).
9231027 // Skip this when resuming a pending boundary — inner was already
9241028 // advanced past the key group before the boundary loop started.
@@ -1043,65 +1147,17 @@ impl BitwiseSortMergeJoinStream {
10431147 }
10441148 Ordering :: Equal => {
10451149 if self . filter . is_some ( ) {
1150+ debug_assert ! ( !self . inner_group_buffered) ;
10461151 // Buffer inner key group (may span batches)
10471152 match ready ! ( self . buffer_inner_key_group( cx) ) {
10481153 Err ( e) => return Poll :: Ready ( Err ( e) ) ,
1049- Ok ( _inner_exhausted) => { }
1154+ Ok ( _inner_exhausted) => {
1155+ self . inner_group_buffered = true ;
1156+ }
10501157 }
1051-
10521158 // Process outer rows against buffered inner group
10531159 // (may need to handle outer batch boundary)
1054- loop {
1055- self . process_key_match_with_filter ( ) ?;
1056-
1057- let outer_batch = self . outer_batch . as_ref ( ) . unwrap ( ) ;
1058- if self . outer_offset >= outer_batch. num_rows ( ) {
1059- let saved_keys = slice_keys (
1060- & self . outer_key_arrays ,
1061- outer_batch. num_rows ( ) - 1 ,
1062- ) ;
1063-
1064- self . emit_outer_batch ( ) ?;
1065- debug_assert ! (
1066- !self . inner_key_buffer. is_empty( )
1067- || self . inner_key_spill. is_some( ) ,
1068- "Filtered pending boundary requires inner key data in buffer or spill"
1069- ) ;
1070- self . pending_boundary =
1071- Some ( PendingBoundary :: Filtered { saved_keys } ) ;
1072-
1073- match ready ! ( self . poll_next_outer_batch( cx) ) {
1074- Err ( e) => return Poll :: Ready ( Err ( e) ) ,
1075- Ok ( false ) => {
1076- self . pending_boundary = None ;
1077- self . outer_batch = None ;
1078- break ;
1079- }
1080- Ok ( true ) => {
1081- let Some ( PendingBoundary :: Filtered {
1082- saved_keys,
1083- } ) = self . pending_boundary . take ( )
1084- else {
1085- unreachable ! ( )
1086- } ;
1087- let same = keys_match (
1088- & saved_keys,
1089- & self . outer_key_arrays ,
1090- & self . sort_options ,
1091- self . null_equality ,
1092- ) ?;
1093- if same {
1094- continue ;
1095- }
1096- break ;
1097- }
1098- }
1099- } else {
1100- break ;
1101- }
1102- }
1103-
1104- self . clear_inner_key_group ( ) ;
1160+ ready ! ( self . process_filtered_match_loop( cx) ) ?;
11051161 } else {
11061162 // No filter: advance inner past key group, then
11071163 // mark all outer rows with this key as matched.
0 commit comments