119119//! factor than the pair-materialization approach.
120120
121121use std:: cmp:: Ordering ;
122+ use std:: io:: BufReader ;
122123use std:: pin:: Pin ;
123124use std:: sync:: Arc ;
124125use std:: task:: { Context , Poll } ;
@@ -132,6 +133,7 @@ use crate::{EmptyRecordBatchStream, RecordBatchStream};
132133use arrow:: array:: { Array , ArrayRef , BooleanArray , BooleanBufferBuilder , RecordBatch } ;
133134use arrow:: compute:: { BatchCoalescer , SortOptions , filter_record_batch, not} ;
134135use arrow:: datatypes:: SchemaRef ;
136+ use arrow:: ipc:: reader:: StreamReader ;
135137use arrow:: util:: bit_chunk_iterator:: UnalignedBitChunk ;
136138use arrow:: util:: bit_util:: apply_bitwise_binary_op;
137139use datafusion_common:: {
@@ -255,11 +257,6 @@ pub(crate) struct BitwiseSortMergeJoinStream {
255257 inner_key_buffer : Vec < RecordBatch > ,
256258 inner_key_spill : Option < Arc < dyn SpillFile > > ,
257259
258- //Track the active spill_stream
259- spill_stream : Option < SendableRecordBatchStream > ,
260- //Prevents wiping out the buffer if we yield while evaluating the filter
261- inner_group_buffered : bool ,
262-
263260 // True when buffer_inner_key_group returned Pending after partially
264261 // filling inner_key_buffer. On re-entry, buffer_inner_key_group
265262 // must skip clear() and resume from poll_next_inner_batch (the
@@ -372,8 +369,6 @@ impl BitwiseSortMergeJoinStream {
372369 matched : BooleanBufferBuilder :: new ( 0 ) ,
373370 inner_key_buffer : vec ! [ ] ,
374371 inner_key_spill : None ,
375- spill_stream : None ,
376- inner_group_buffered : false ,
377372 buffering_inner_pending : false ,
378373 pending_boundary : None ,
379374 on_outer,
@@ -471,8 +466,6 @@ impl BitwiseSortMergeJoinStream {
471466 fn clear_inner_key_group ( & mut self ) {
472467 self . inner_key_buffer . clear ( ) ;
473468 self . inner_key_spill = None ;
474- self . spill_stream = None ;
475- self . inner_group_buffered = false ;
476469 self . inner_buffer_size = 0 ;
477470 }
478471
@@ -754,10 +747,7 @@ impl BitwiseSortMergeJoinStream {
754747 /// Process a key match with a filter. For each inner row in the buffered
755748 /// key group, evaluates the filter against the outer key group and ORs
756749 /// the results into the matched bitset using u64-chunked bitwise ops.
757- fn process_key_match_with_filter (
758- & mut self ,
759- cx : & mut Context < ' _ > ,
760- ) -> Poll < Result < ( ) > > {
750+ fn process_key_match_with_filter ( & mut self ) -> Result < ( ) > {
761751 self . get_outer_self_cmp ( ) ?;
762752 let filter = self . filter . as_ref ( ) . unwrap ( ) ;
763753 let outer_batch = self . outer_batch . as_ref ( ) . unwrap ( ) ;
@@ -793,40 +783,25 @@ impl BitwiseSortMergeJoinStream {
793783 )
794784 . count_ones ( ) ;
795785
796- // Process spilled inner batches first asynchronously.
797- if self . inner_key_spill . is_some ( ) || self . spill_stream . is_some ( ) {
798- if self . spill_stream . is_none ( )
799- && let Some ( spill_file) = & self . inner_key_spill
800- {
801- let stream = self
802- . spill_manager
803- . read_spill_as_stream ( Arc :: clone ( spill_file) , None ) ?;
804- self . spill_stream = Some ( stream) ;
805- }
806-
807- while matched_count < outer_group_len {
808- let stream = self . spill_stream . as_mut ( ) . unwrap ( ) ;
809- match ready ! ( stream. poll_next_unpin( cx) ) {
810- Some ( Ok ( inner_slice) ) => {
811- matched_count = eval_filter_for_inner_slice (
812- self . outer_is_left ,
813- filter,
814- & outer_slice,
815- & inner_slice,
816- & mut self . matched ,
817- self . outer_offset ,
818- outer_group_len,
819- matched_count,
820- ) ?;
821- }
822- Some ( Err ( e) ) => {
823- self . spill_stream = None ;
824- return Poll :: Ready ( Err ( e) ) ;
825- }
826- None => {
827- self . spill_stream = None ;
828- break ;
829- }
786+ // Process spilled inner batches first (read back from disk).
787+ if let Some ( spill_file) = & self . inner_key_spill {
788+ let sync_reader = spill_file. open_sync_reader ( ) ?;
789+ let file = BufReader :: new ( sync_reader) ;
790+ let reader = StreamReader :: try_new ( file, None ) ?;
791+ for batch_result in reader {
792+ let inner_slice = batch_result?;
793+ matched_count = eval_filter_for_inner_slice (
794+ self . outer_is_left ,
795+ filter,
796+ & outer_slice,
797+ & inner_slice,
798+ & mut self . matched ,
799+ self . outer_offset ,
800+ outer_group_len,
801+ matched_count,
802+ ) ?;
803+ if matched_count == outer_group_len {
804+ break ;
830805 }
831806 }
832807 }
@@ -854,16 +829,13 @@ impl BitwiseSortMergeJoinStream {
854829 }
855830
856831 self . outer_offset = outer_group_end;
857-
858- self . spill_stream = None ;
859-
860- Poll :: Ready ( Ok ( ( ) ) )
832+ Ok ( ( ) )
861833 }
862834
863835 /// Continue processing an outer key group that spans multiple outer
864836 /// batches. Returns `true` if this outer batch was fully consumed
865837 /// by the key group and the caller should load another.
866- fn resume_boundary ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < bool > > {
838+ fn resume_boundary ( & mut self ) -> Result < bool > {
867839 debug_assert ! (
868840 self . outer_batch. is_some( ) ,
869841 "caller must load outer_batch first"
@@ -885,7 +857,7 @@ impl BitwiseSortMergeJoinStream {
885857 } ) ;
886858 self . emit_outer_batch ( ) ?;
887859 self . outer_batch = None ;
888- return Poll :: Ready ( Ok ( true ) ) ;
860+ return Ok ( true ) ;
889861 }
890862 }
891863 }
@@ -901,79 +873,22 @@ impl BitwiseSortMergeJoinStream {
901873 self . null_equality ,
902874 ) ?;
903875 if same_key {
904- match self . process_key_match_with_filter ( cx) {
905- Poll :: Ready ( Ok ( ( ) ) ) => ( ) ,
906- Poll :: Ready ( Err ( e) ) => return Poll :: Ready ( Err ( e) ) ,
907- Poll :: Pending => {
908- self . pending_boundary =
909- Some ( PendingBoundary :: Filtered { saved_keys } ) ;
910- return Poll :: Pending ;
911- }
912- }
876+ self . process_key_match_with_filter ( ) ?;
913877 let num_outer = self . outer_batch . as_ref ( ) . unwrap ( ) . num_rows ( ) ;
914878 if self . outer_offset >= num_outer {
915879 self . pending_boundary = Some ( PendingBoundary :: Filtered {
916880 saved_keys : slice_keys ( & self . outer_key_arrays , num_outer - 1 ) ,
917881 } ) ;
918882 self . emit_outer_batch ( ) ?;
919883 self . outer_batch = None ;
920- return Poll :: Ready ( Ok ( true ) ) ;
884+ return Ok ( true ) ;
921885 }
922886 }
923887 self . clear_inner_key_group ( ) ;
924888 }
925889 None => { }
926890 }
927- Poll :: Ready ( Ok ( false ) )
928- }
929-
930- /// Helper to process an Equal match across potential outer batch boundaries.
931- fn process_filtered_match_loop ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
932- loop {
933- ready ! ( self . process_key_match_with_filter( cx) ) ?;
934-
935- let outer_batch = self . outer_batch . as_ref ( ) . unwrap ( ) ;
936- if self . outer_offset >= outer_batch. num_rows ( ) {
937- let saved_keys =
938- slice_keys ( & self . outer_key_arrays , outer_batch. num_rows ( ) - 1 ) ;
939-
940- self . emit_outer_batch ( ) ?;
941- self . pending_boundary = Some ( PendingBoundary :: Filtered { saved_keys } ) ;
942-
943- // Clear stale batch before polling
944- self . outer_batch = None ;
945-
946- match ready ! ( self . poll_next_outer_batch( cx) ) {
947- Err ( e) => return Poll :: Ready ( Err ( e) ) ,
948- Ok ( false ) => {
949- self . pending_boundary = None ;
950- break ;
951- }
952- Ok ( true ) => {
953- let Some ( PendingBoundary :: Filtered { saved_keys } ) =
954- self . pending_boundary . take ( )
955- else {
956- unreachable ! ( )
957- } ;
958- let same = keys_match (
959- & saved_keys,
960- & self . outer_key_arrays ,
961- & self . sort_options ,
962- self . null_equality ,
963- ) ?;
964- if same {
965- continue ;
966- }
967- break ;
968- }
969- }
970- } else {
971- break ;
972- }
973- }
974-
975- self . clear_inner_key_group ( ) ; // This resets inner_group_buffered to false
976- Poll :: Ready ( Ok ( ( ) ) )
891+ Ok ( false )
977892 }
978893
979894 /// Main loop: drive the merge-scan to produce output batches.
@@ -995,21 +910,14 @@ impl BitwiseSortMergeJoinStream {
995910 }
996911 return Poll :: Ready ( Ok ( None ) ) ;
997912 }
998- Ok ( true ) => { } // Loaded batch, move on to checks
913+ Ok ( true ) => {
914+ if self . resume_boundary ( ) ? {
915+ continue ;
916+ }
917+ }
999918 }
1000919 }
1001920
1002- // Handles pausing while fetching a NEW outer batch.
1003- if self . pending_boundary . is_some ( ) && ready ! ( self . resume_boundary( cx) ) ? {
1004- continue ;
1005- }
1006-
1007- // Handles pausing while reading the disk stream mid-batch.
1008- if self . inner_group_buffered {
1009- ready ! ( self . process_filtered_match_loop( cx) ) ?;
1010- continue ;
1011- }
1012-
1013921 // 2. Ensure we have an inner batch (unless inner is exhausted).
1014922 // Skip this when resuming a pending boundary — inner was already
1015923 // advanced past the key group before the boundary loop started.
@@ -1134,17 +1042,65 @@ impl BitwiseSortMergeJoinStream {
11341042 }
11351043 Ordering :: Equal => {
11361044 if self . filter . is_some ( ) {
1137- debug_assert ! ( !self . inner_group_buffered) ;
11381045 // Buffer inner key group (may span batches)
11391046 match ready ! ( self . buffer_inner_key_group( cx) ) {
11401047 Err ( e) => return Poll :: Ready ( Err ( e) ) ,
1141- Ok ( _inner_exhausted) => {
1142- self . inner_group_buffered = true ;
1143- }
1048+ Ok ( _inner_exhausted) => { }
11441049 }
1050+
11451051 // Process outer rows against buffered inner group
11461052 // (may need to handle outer batch boundary)
1147- ready ! ( self . process_filtered_match_loop( cx) ) ?;
1053+ loop {
1054+ self . process_key_match_with_filter ( ) ?;
1055+
1056+ let outer_batch = self . outer_batch . as_ref ( ) . unwrap ( ) ;
1057+ if self . outer_offset >= outer_batch. num_rows ( ) {
1058+ let saved_keys = slice_keys (
1059+ & self . outer_key_arrays ,
1060+ outer_batch. num_rows ( ) - 1 ,
1061+ ) ;
1062+
1063+ self . emit_outer_batch ( ) ?;
1064+ debug_assert ! (
1065+ !self . inner_key_buffer. is_empty( )
1066+ || self . inner_key_spill. is_some( ) ,
1067+ "Filtered pending boundary requires inner key data in buffer or spill"
1068+ ) ;
1069+ self . pending_boundary =
1070+ Some ( PendingBoundary :: Filtered { saved_keys } ) ;
1071+
1072+ match ready ! ( self . poll_next_outer_batch( cx) ) {
1073+ Err ( e) => return Poll :: Ready ( Err ( e) ) ,
1074+ Ok ( false ) => {
1075+ self . pending_boundary = None ;
1076+ self . outer_batch = None ;
1077+ break ;
1078+ }
1079+ Ok ( true ) => {
1080+ let Some ( PendingBoundary :: Filtered {
1081+ saved_keys,
1082+ } ) = self . pending_boundary . take ( )
1083+ else {
1084+ unreachable ! ( )
1085+ } ;
1086+ let same = keys_match (
1087+ & saved_keys,
1088+ & self . outer_key_arrays ,
1089+ & self . sort_options ,
1090+ self . null_equality ,
1091+ ) ?;
1092+ if same {
1093+ continue ;
1094+ }
1095+ break ;
1096+ }
1097+ }
1098+ } else {
1099+ break ;
1100+ }
1101+ }
1102+
1103+ self . clear_inner_key_group ( ) ;
11481104 } else {
11491105 // No filter: advance inner past key group, then
11501106 // mark all outer rows with this key as matched.
0 commit comments