Skip to content

Commit 1e5aa10

Browse files
committed
refactor(physical-plan): Update SortMergeJoin to use async spill abstractions
1 parent c8b784a commit 1e5aa10

2 files changed

Lines changed: 289 additions & 184 deletions

File tree

datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs

Lines changed: 130 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,6 @@
119119
//! factor than the pair-materialization approach.
120120
121121
use std::cmp::Ordering;
122-
use std::fs::File;
123-
use std::io::BufReader;
124122
use std::pin::Pin;
125123
use std::sync::Arc;
126124
use std::task::{Context, Poll};
@@ -134,7 +132,6 @@ use crate::{EmptyRecordBatchStream, RecordBatchStream};
134132
use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, RecordBatch};
135133
use arrow::compute::{BatchCoalescer, SortOptions, filter_record_batch, not};
136134
use arrow::datatypes::SchemaRef;
137-
use arrow::ipc::reader::StreamReader;
138135
use arrow::util::bit_chunk_iterator::UnalignedBitChunk;
139136
use arrow::util::bit_util::apply_bitwise_binary_op;
140137
use datafusion_common::{
@@ -259,6 +256,11 @@ pub(crate) struct BitwiseSortMergeJoinStream {
259256
inner_key_buffer: Vec<RecordBatch>,
260257
inner_key_spill: Option<RefCountedTempFile>,
261258

259+
//Track the active spill_stream
260+
spill_stream: Option<SendableRecordBatchStream>,
261+
//Prevents wiping out the buffer if we yield while evaluating the filter
262+
inner_group_buffered: bool,
263+
262264
// True when buffer_inner_key_group returned Pending after partially
263265
// filling inner_key_buffer. On re-entry, buffer_inner_key_group
264266
// must skip clear() and resume from poll_next_inner_batch (the
@@ -371,6 +373,8 @@ impl BitwiseSortMergeJoinStream {
371373
matched: BooleanBufferBuilder::new(0),
372374
inner_key_buffer: vec![],
373375
inner_key_spill: None,
376+
spill_stream: None,
377+
inner_group_buffered: false,
374378
buffering_inner_pending: false,
375379
pending_boundary: None,
376380
on_outer,
@@ -468,6 +472,8 @@ impl BitwiseSortMergeJoinStream {
468472
fn clear_inner_key_group(&mut self) {
469473
self.inner_key_buffer.clear();
470474
self.inner_key_spill = None;
475+
self.spill_stream = None;
476+
self.inner_group_buffered = false;
471477
self.inner_buffer_size = 0;
472478
}
473479

@@ -749,7 +755,10 @@ impl BitwiseSortMergeJoinStream {
749755
/// Process a key match with a filter. For each inner row in the buffered
750756
/// key group, evaluates the filter against the outer key group and ORs
751757
/// the results into the matched bitset using u64-chunked bitwise ops.
752-
fn process_key_match_with_filter(&mut self) -> Result<()> {
758+
fn process_key_match_with_filter(
759+
&mut self,
760+
cx: &mut Context<'_>,
761+
) -> Poll<Result<()>> {
753762
self.get_outer_self_cmp()?;
754763
let filter = self.filter.as_ref().unwrap();
755764
let outer_batch = self.outer_batch.as_ref().unwrap();
@@ -785,24 +794,40 @@ impl BitwiseSortMergeJoinStream {
785794
)
786795
.count_ones();
787796

788-
// Process spilled inner batches first (read back from disk).
789-
if let Some(spill_file) = &self.inner_key_spill {
790-
let file = BufReader::new(File::open(spill_file.path())?);
791-
let reader = StreamReader::try_new(file, None)?;
792-
for batch_result in reader {
793-
let inner_slice = batch_result?;
794-
matched_count = eval_filter_for_inner_slice(
795-
self.outer_is_left,
796-
filter,
797-
&outer_slice,
798-
&inner_slice,
799-
&mut self.matched,
800-
self.outer_offset,
801-
outer_group_len,
802-
matched_count,
803-
)?;
804-
if matched_count == outer_group_len {
805-
break;
797+
// Process spilled inner batches first asynchronously.
798+
if self.inner_key_spill.is_some() || self.spill_stream.is_some() {
799+
if self.spill_stream.is_none()
800+
&& let Some(spill_file) = &self.inner_key_spill
801+
{
802+
let stream = self
803+
.spill_manager
804+
.read_spill_as_stream(spill_file.clone(), None)?;
805+
self.spill_stream = Some(stream);
806+
}
807+
808+
while matched_count < outer_group_len {
809+
let stream = self.spill_stream.as_mut().unwrap();
810+
match ready!(stream.poll_next_unpin(cx)) {
811+
Some(Ok(inner_slice)) => {
812+
matched_count = eval_filter_for_inner_slice(
813+
self.outer_is_left,
814+
filter,
815+
&outer_slice,
816+
&inner_slice,
817+
&mut self.matched,
818+
self.outer_offset,
819+
outer_group_len,
820+
matched_count,
821+
)?;
822+
}
823+
Some(Err(e)) => {
824+
self.spill_stream = None;
825+
return Poll::Ready(Err(e));
826+
}
827+
None => {
828+
self.spill_stream = None;
829+
break;
830+
}
806831
}
807832
}
808833
}
@@ -830,13 +855,16 @@ impl BitwiseSortMergeJoinStream {
830855
}
831856

832857
self.outer_offset = outer_group_end;
833-
Ok(())
858+
859+
self.spill_stream = None;
860+
861+
Poll::Ready(Ok(()))
834862
}
835863

836864
/// Continue processing an outer key group that spans multiple outer
837865
/// batches. Returns `true` if this outer batch was fully consumed
838866
/// by the key group and the caller should load another.
839-
fn resume_boundary(&mut self) -> Result<bool> {
867+
fn resume_boundary(&mut self, cx: &mut Context<'_>) -> Poll<Result<bool>> {
840868
debug_assert!(
841869
self.outer_batch.is_some(),
842870
"caller must load outer_batch first"
@@ -858,7 +886,7 @@ impl BitwiseSortMergeJoinStream {
858886
});
859887
self.emit_outer_batch()?;
860888
self.outer_batch = None;
861-
return Ok(true);
889+
return Poll::Ready(Ok(true));
862890
}
863891
}
864892
}
@@ -874,22 +902,79 @@ impl BitwiseSortMergeJoinStream {
874902
self.null_equality,
875903
)?;
876904
if same_key {
877-
self.process_key_match_with_filter()?;
905+
match self.process_key_match_with_filter(cx) {
906+
Poll::Ready(Ok(())) => (),
907+
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
908+
Poll::Pending => {
909+
self.pending_boundary =
910+
Some(PendingBoundary::Filtered { saved_keys });
911+
return Poll::Pending;
912+
}
913+
}
878914
let num_outer = self.outer_batch.as_ref().unwrap().num_rows();
879915
if self.outer_offset >= num_outer {
880916
self.pending_boundary = Some(PendingBoundary::Filtered {
881917
saved_keys: slice_keys(&self.outer_key_arrays, num_outer - 1),
882918
});
883919
self.emit_outer_batch()?;
884920
self.outer_batch = None;
885-
return Ok(true);
921+
return Poll::Ready(Ok(true));
886922
}
887923
}
888924
self.clear_inner_key_group();
889925
}
890926
None => {}
891927
}
892-
Ok(false)
928+
Poll::Ready(Ok(false))
929+
}
930+
931+
/// Helper to process an Equal match across potential outer batch boundaries.
932+
fn process_filtered_match_loop(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
933+
loop {
934+
ready!(self.process_key_match_with_filter(cx))?;
935+
936+
let outer_batch = self.outer_batch.as_ref().unwrap();
937+
if self.outer_offset >= outer_batch.num_rows() {
938+
let saved_keys =
939+
slice_keys(&self.outer_key_arrays, outer_batch.num_rows() - 1);
940+
941+
self.emit_outer_batch()?;
942+
self.pending_boundary = Some(PendingBoundary::Filtered { saved_keys });
943+
944+
// Clear stale batch before polling
945+
self.outer_batch = None;
946+
947+
match ready!(self.poll_next_outer_batch(cx)) {
948+
Err(e) => return Poll::Ready(Err(e)),
949+
Ok(false) => {
950+
self.pending_boundary = None;
951+
break;
952+
}
953+
Ok(true) => {
954+
let Some(PendingBoundary::Filtered { saved_keys }) =
955+
self.pending_boundary.take()
956+
else {
957+
unreachable!()
958+
};
959+
let same = keys_match(
960+
&saved_keys,
961+
&self.outer_key_arrays,
962+
&self.sort_options,
963+
self.null_equality,
964+
)?;
965+
if same {
966+
continue;
967+
}
968+
break;
969+
}
970+
}
971+
} else {
972+
break;
973+
}
974+
}
975+
976+
self.clear_inner_key_group(); // This resets inner_group_buffered to false
977+
Poll::Ready(Ok(()))
893978
}
894979

895980
/// Main loop: drive the merge-scan to produce output batches.
@@ -911,14 +996,21 @@ impl BitwiseSortMergeJoinStream {
911996
}
912997
return Poll::Ready(Ok(None));
913998
}
914-
Ok(true) => {
915-
if self.resume_boundary()? {
916-
continue;
917-
}
918-
}
999+
Ok(true) => {} // Loaded batch, move on to checks
9191000
}
9201001
}
9211002

1003+
// Handles pausing while fetching a NEW outer batch.
1004+
if self.pending_boundary.is_some() && ready!(self.resume_boundary(cx))? {
1005+
continue;
1006+
}
1007+
1008+
// Handles pausing while reading the disk stream mid-batch.
1009+
if self.inner_group_buffered {
1010+
ready!(self.process_filtered_match_loop(cx))?;
1011+
continue;
1012+
}
1013+
9221014
// 2. Ensure we have an inner batch (unless inner is exhausted).
9231015
// Skip this when resuming a pending boundary — inner was already
9241016
// advanced past the key group before the boundary loop started.
@@ -1043,65 +1135,17 @@ impl BitwiseSortMergeJoinStream {
10431135
}
10441136
Ordering::Equal => {
10451137
if self.filter.is_some() {
1138+
debug_assert!(!self.inner_group_buffered);
10461139
// Buffer inner key group (may span batches)
10471140
match ready!(self.buffer_inner_key_group(cx)) {
10481141
Err(e) => return Poll::Ready(Err(e)),
1049-
Ok(_inner_exhausted) => {}
1142+
Ok(_inner_exhausted) => {
1143+
self.inner_group_buffered = true;
1144+
}
10501145
}
1051-
10521146
// Process outer rows against buffered inner group
10531147
// (may need to handle outer batch boundary)
1054-
loop {
1055-
self.process_key_match_with_filter()?;
1056-
1057-
let outer_batch = self.outer_batch.as_ref().unwrap();
1058-
if self.outer_offset >= outer_batch.num_rows() {
1059-
let saved_keys = slice_keys(
1060-
&self.outer_key_arrays,
1061-
outer_batch.num_rows() - 1,
1062-
);
1063-
1064-
self.emit_outer_batch()?;
1065-
debug_assert!(
1066-
!self.inner_key_buffer.is_empty()
1067-
|| self.inner_key_spill.is_some(),
1068-
"Filtered pending boundary requires inner key data in buffer or spill"
1069-
);
1070-
self.pending_boundary =
1071-
Some(PendingBoundary::Filtered { saved_keys });
1072-
1073-
match ready!(self.poll_next_outer_batch(cx)) {
1074-
Err(e) => return Poll::Ready(Err(e)),
1075-
Ok(false) => {
1076-
self.pending_boundary = None;
1077-
self.outer_batch = None;
1078-
break;
1079-
}
1080-
Ok(true) => {
1081-
let Some(PendingBoundary::Filtered {
1082-
saved_keys,
1083-
}) = self.pending_boundary.take()
1084-
else {
1085-
unreachable!()
1086-
};
1087-
let same = keys_match(
1088-
&saved_keys,
1089-
&self.outer_key_arrays,
1090-
&self.sort_options,
1091-
self.null_equality,
1092-
)?;
1093-
if same {
1094-
continue;
1095-
}
1096-
break;
1097-
}
1098-
}
1099-
} else {
1100-
break;
1101-
}
1102-
}
1103-
1104-
self.clear_inner_key_group();
1148+
ready!(self.process_filtered_match_loop(cx))?;
11051149
} else {
11061150
// No filter: advance inner past key group, then
11071151
// mark all outer rows with this key as matched.

0 commit comments

Comments
 (0)