Skip to content

Commit 53cb706

Browse files
committed
Replace BoundaryState enum with PendingBoundary to make state machine more clear. Simplify stored state to be a slice.
1 parent 26566c2 commit 53cb706

3 files changed

Lines changed: 41 additions & 82 deletions

File tree

datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/exec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequiremen
112112
///
113113
/// Key groups can span batch boundaries on either side. The stream handles
114114
/// this by detecting when a group extends to the end of a batch, loading the
115-
/// next batch, and continuing if the key matches. The `BoundaryState` enum
115+
/// next batch, and continuing if the key matches. The `PendingBoundary` enum
116116
/// preserves loop context across async `Poll::Pending` re-entries.
117117
///
118118
/// # Memory

datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/stream.rs

Lines changed: 38 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -127,39 +127,24 @@ fn find_key_group_end(
127127
Ok(lo)
128128
}
129129

130-
/// Tracks whether we're mid-key-group when `poll_next_outer_batch` returns
131-
/// `Poll::Pending` inside the Equal branch's boundary loop.
132-
///
133130
/// When an outer key group spans a batch boundary, the boundary loop emits
134131
/// the current batch, then polls for the next. If that poll returns Pending,
135132
/// `ready!` exits `poll_join` and we re-enter from the top on the next call.
136133
/// Without this state, the new batch would be processed fresh by the
137134
/// merge-scan — but inner already advanced past this key, so the matching
138135
/// outer rows would be skipped via `Ordering::Less` and never marked.
139136
///
140-
/// This enum saves the context needed to resume the boundary loop on
141-
/// re-entry: compare the new batch's first key with the saved key to
142-
/// decide whether to continue marking or move on.
137+
/// This enum carries the last key (as single-row sliced arrays) from the
138+
/// previous batch so we can check whether the next batch continues the same
139+
/// key group. Stored as `Option<PendingBoundary>`: `None` means normal
140+
/// processing.
143141
#[derive(Debug)]
144-
/// When a key group spans an outer batch boundary, we poll for the next
145-
/// outer batch. If that poll returns `Pending`, we must yield back to the
146-
/// executor — but we need to remember where we were so we can resume.
147-
/// This enum carries the last key from the previous batch so we can check
148-
/// whether the next batch continues the same key group.
149-
///
150-
/// Stored as `Option<PendingBoundary>`: `None` means normal processing.
151142
enum PendingBoundary {
152143
/// Resuming a no-filter boundary loop.
153-
NoFilter {
154-
saved_keys: Vec<ArrayRef>,
155-
saved_idx: usize,
156-
},
144+
NoFilter { saved_keys: Vec<ArrayRef> },
157145
/// Resuming a filtered boundary loop. Inner key data remains in the
158146
/// buffer (or spill file) for the resumed loop.
159-
Filtered {
160-
saved_keys: Vec<ArrayRef>,
161-
saved_idx: usize,
162-
},
147+
Filtered { saved_keys: Vec<ArrayRef> },
163148
}
164149

165150
pub(super) struct SemiAntiSortMergeJoinStream {
@@ -465,8 +450,7 @@ impl SemiAntiSortMergeJoinStream {
465450
}
466451

467452
// Key group extends to end of batch — need to check next batch
468-
let last_key_idx = num_inner - 1;
469-
let saved_inner_keys = self.inner_key_arrays.clone();
453+
let saved_inner_keys = slice_keys(&self.inner_key_arrays, num_inner - 1);
470454

471455
match ready!(self.poll_next_inner_batch(cx)) {
472456
Err(e) => return Poll::Ready(Err(e)),
@@ -476,9 +460,7 @@ impl SemiAntiSortMergeJoinStream {
476460
Ok(true) => {
477461
if keys_match(
478462
&saved_inner_keys,
479-
last_key_idx,
480463
&self.inner_key_arrays,
481-
0,
482464
&self.sort_options,
483465
self.null_equality,
484466
)? {
@@ -554,8 +536,7 @@ impl SemiAntiSortMergeJoinStream {
554536
resume_from_poll = false;
555537

556538
// Key group extends to end of batch — check next
557-
let last_key_idx = num_inner - 1;
558-
let saved_inner_keys = self.inner_key_arrays.clone();
539+
let saved_inner_keys = slice_keys(&self.inner_key_arrays, num_inner - 1);
559540

560541
// If poll returns Pending, the current batch is already
561542
// in inner_key_buffer.
@@ -573,9 +554,7 @@ impl SemiAntiSortMergeJoinStream {
573554
self.buffering_inner_pending = false;
574555
if keys_match(
575556
&saved_inner_keys,
576-
last_key_idx,
577557
&self.inner_key_arrays,
578-
0,
579558
&self.sort_options,
580559
self.null_equality,
581560
)? {
@@ -685,59 +664,43 @@ impl SemiAntiSortMergeJoinStream {
685664
"caller must load outer_batch first"
686665
);
687666
match self.pending_boundary.take() {
688-
Some(PendingBoundary::NoFilter {
689-
saved_keys,
690-
saved_idx,
691-
}) => {
667+
Some(PendingBoundary::NoFilter { saved_keys }) => {
692668
let same_key = keys_match(
693669
&saved_keys,
694-
saved_idx,
695670
&self.outer_key_arrays,
696-
0,
697671
&self.sort_options,
698672
self.null_equality,
699673
)?;
700674
if same_key {
701675
self.process_key_match_no_filter()?;
702676
let num_outer = self.outer_batch.as_ref().unwrap().num_rows();
703677
if self.outer_offset >= num_outer {
704-
let new_saved = self.outer_key_arrays.clone();
705-
let new_idx = num_outer - 1;
706678
self.pending_boundary = Some(PendingBoundary::NoFilter {
707-
saved_keys: new_saved,
708-
saved_idx: new_idx,
679+
saved_keys: slice_keys(&self.outer_key_arrays, num_outer - 1),
709680
});
710681
self.emit_outer_batch()?;
711682
self.outer_batch = None;
712683
return Ok(true);
713684
}
714685
}
715686
}
716-
Some(PendingBoundary::Filtered {
717-
saved_keys,
718-
saved_idx,
719-
}) => {
687+
Some(PendingBoundary::Filtered { saved_keys }) => {
720688
debug_assert!(
721689
!self.inner_key_buffer.is_empty() || self.inner_key_spill.is_some(),
722-
"FilteredPending entered but no inner key data exists"
690+
"Filtered pending boundary entered but no inner key data exists"
723691
);
724692
let same_key = keys_match(
725693
&saved_keys,
726-
saved_idx,
727694
&self.outer_key_arrays,
728-
0,
729695
&self.sort_options,
730696
self.null_equality,
731697
)?;
732698
if same_key {
733699
self.process_key_match_with_filter()?;
734700
let num_outer = self.outer_batch.as_ref().unwrap().num_rows();
735701
if self.outer_offset >= num_outer {
736-
let new_saved = self.outer_key_arrays.clone();
737-
let new_idx = num_outer - 1;
738702
self.pending_boundary = Some(PendingBoundary::Filtered {
739-
saved_keys: new_saved,
740-
saved_idx: new_idx,
703+
saved_keys: slice_keys(&self.outer_key_arrays, num_outer - 1),
741704
});
742705
self.emit_outer_batch()?;
743706
self.outer_batch = None;
@@ -779,7 +742,7 @@ impl SemiAntiSortMergeJoinStream {
779742
}
780743

781744
// 2. Ensure we have an inner batch (unless inner is exhausted).
782-
// Skip this when in a boundary state — inner was already
745+
// Skip this when resuming a pending boundary — inner was already
783746
// advanced past the key group before the boundary loop started.
784747
if self.inner_batch.is_none() && self.pending_boundary.is_none() {
785748
match ready!(self.poll_next_inner_batch(cx)) {
@@ -877,8 +840,8 @@ impl SemiAntiSortMergeJoinStream {
877840
self.null_equality,
878841
)?;
879842
if group_end >= num_inner {
880-
let saved_keys = self.inner_key_arrays.clone();
881-
let saved_idx = num_inner - 1;
843+
let saved_keys =
844+
slice_keys(&self.inner_key_arrays, num_inner - 1);
882845
match ready!(self.poll_next_inner_batch(cx)) {
883846
Err(e) => return Poll::Ready(Err(e)),
884847
Ok(false) => {
@@ -888,9 +851,7 @@ impl SemiAntiSortMergeJoinStream {
888851
Ok(true) => {
889852
if keys_match(
890853
&saved_keys,
891-
saved_idx,
892854
&self.inner_key_arrays,
893-
0,
894855
&self.sort_options,
895856
self.null_equality,
896857
)? {
@@ -921,19 +882,19 @@ impl SemiAntiSortMergeJoinStream {
921882

922883
let outer_batch = self.outer_batch.as_ref().unwrap();
923884
if self.outer_offset >= outer_batch.num_rows() {
924-
let saved_keys = self.outer_key_arrays.clone();
925-
let saved_idx = outer_batch.num_rows() - 1;
885+
let saved_keys = slice_keys(
886+
&self.outer_key_arrays,
887+
outer_batch.num_rows() - 1,
888+
);
926889

927890
self.emit_outer_batch()?;
928891
debug_assert!(
929892
!self.inner_key_buffer.is_empty()
930893
|| self.inner_key_spill.is_some(),
931-
"FilteredPending requires inner key data in buffer or spill"
894+
"Filtered pending boundary requires inner key data in buffer or spill"
932895
);
933-
self.pending_boundary = Some(PendingBoundary::Filtered {
934-
saved_keys,
935-
saved_idx,
936-
});
896+
self.pending_boundary =
897+
Some(PendingBoundary::Filtered { saved_keys });
937898

938899
match ready!(self.poll_next_outer_batch(cx)) {
939900
Err(e) => return Poll::Ready(Err(e)),
@@ -945,16 +906,13 @@ impl SemiAntiSortMergeJoinStream {
945906
Ok(true) => {
946907
let Some(PendingBoundary::Filtered {
947908
saved_keys,
948-
saved_idx,
949909
}) = self.pending_boundary.take()
950910
else {
951911
unreachable!()
952912
};
953913
let same = keys_match(
954914
&saved_keys,
955-
saved_idx,
956915
&self.outer_key_arrays,
957-
0,
958916
&self.sort_options,
959917
self.null_equality,
960918
)?;
@@ -983,14 +941,12 @@ impl SemiAntiSortMergeJoinStream {
983941

984942
let num_outer = self.outer_batch.as_ref().unwrap().num_rows();
985943
if self.outer_offset >= num_outer {
986-
let saved_keys = self.outer_key_arrays.clone();
987-
let saved_idx = num_outer - 1;
944+
let saved_keys =
945+
slice_keys(&self.outer_key_arrays, num_outer - 1);
988946

989947
self.emit_outer_batch()?;
990-
self.pending_boundary = Some(PendingBoundary::NoFilter {
991-
saved_keys,
992-
saved_idx,
993-
});
948+
self.pending_boundary =
949+
Some(PendingBoundary::NoFilter { saved_keys });
994950

995951
match ready!(self.poll_next_outer_batch(cx)) {
996952
Err(e) => return Poll::Ready(Err(e)),
@@ -1002,16 +958,13 @@ impl SemiAntiSortMergeJoinStream {
1002958
Ok(true) => {
1003959
let Some(PendingBoundary::NoFilter {
1004960
saved_keys,
1005-
saved_idx,
1006961
}) = self.pending_boundary.take()
1007962
else {
1008963
unreachable!()
1009964
};
1010965
let same_key = keys_match(
1011966
&saved_keys,
1012-
saved_idx,
1013967
&self.outer_key_arrays,
1014-
0,
1015968
&self.sort_options,
1016969
self.null_equality,
1017970
)?;
@@ -1091,20 +1044,26 @@ fn eval_filter_for_inner_slice(
10911044
Ok(matched_count)
10921045
}
10931046

1094-
/// Compare two key rows using the sort options to determine equality.
1047+
/// Slice each key array to a single row at `idx`.
1048+
fn slice_keys(keys: &[ArrayRef], idx: usize) -> Vec<ArrayRef> {
1049+
keys.iter().map(|a| a.slice(idx, 1)).collect()
1050+
}
1051+
1052+
/// Compare the first row of two key arrays using sort options to determine
1053+
/// equality. The left side is expected to be single-row slices (from
1054+
/// `slice_keys`); the right side can be any length (row 0 is compared).
10951055
fn keys_match(
10961056
left_arrays: &[ArrayRef],
1097-
left_idx: usize,
10981057
right_arrays: &[ArrayRef],
1099-
right_idx: usize,
11001058
sort_options: &[SortOptions],
11011059
null_equality: NullEquality,
11021060
) -> Result<bool> {
1061+
debug_assert!(left_arrays.iter().all(|a| a.len() == 1));
11031062
let cmp = compare_join_arrays(
11041063
left_arrays,
1105-
left_idx,
1064+
0,
11061065
right_arrays,
1107-
right_idx,
1066+
0,
11081067
sort_options,
11091068
null_equality,
11101069
)?;

datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,7 +1131,7 @@ async fn filter_buffer_pending_loses_inner_rows() -> Result<()> {
11311131
/// When an outer key group spans a batch boundary, the no-filter path
11321132
/// emits the current batch, then polls for the next outer batch. If
11331133
/// poll returns Pending, poll_join exits. On re-entry, without the
1134-
/// BoundaryState fix, the new batch is processed fresh by the
1134+
/// PendingBoundary fix, the new batch is processed fresh by the
11351135
/// merge-scan. Since inner already advanced past this key, the outer
11361136
/// rows with the matching key are skipped via Ordering::Less.
11371137
///
@@ -1474,7 +1474,7 @@ async fn spill_with_filter() -> Result<()> {
14741474
Ok(())
14751475
}
14761476

1477-
/// Reproduces a bug where `resume_boundary` for the FilteredPending case
1477+
/// Reproduces a bug where `resume_boundary` for the Filtered pending case
14781478
/// only checks `inner_key_buffer.is_empty()` but ignores `inner_key_spill`.
14791479
/// After spilling, the in-memory buffer is cleared while the spill file
14801480
/// holds the data. If the outer key group spans a batch boundary, the

0 commit comments

Comments
 (0)