Skip to content

Commit 26566c2

Browse files
committed
Replace BoundaryState enum with PendingBoundary to make state machine more clear.
1 parent 628d0f5 commit 26566c2

1 file changed

Lines changed: 38 additions & 51 deletions

File tree

  • datafusion/physical-plan/src/joins/semi_anti_sort_merge_join

datafusion/physical-plan/src/joins/semi_anti_sort_merge_join/stream.rs

Lines changed: 38 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -141,22 +141,22 @@ fn find_key_group_end(
141141
/// re-entry: compare the new batch's first key with the saved key to
142142
/// decide whether to continue marking or move on.
143143
#[derive(Debug)]
144-
enum BoundaryState {
145-
/// Normal processing — not inside a boundary poll.
146-
Normal,
147-
/// The no-filter boundary loop's `poll_next_outer_batch` returned
148-
/// Pending. Carries the key arrays and index from the last emitted
149-
/// batch so we can compare with the next batch's first key.
150-
NoFilterPending {
144+
/// When a key group spans an outer batch boundary, we poll for the next
145+
/// outer batch. If that poll returns `Pending`, we must yield back to the
146+
/// executor — but we need to remember where we were so we can resume.
147+
/// This enum carries the last key from the previous batch so we can check
148+
/// whether the next batch continues the same key group.
149+
///
150+
/// Stored as `Option<PendingBoundary>`: `None` means normal processing.
151+
enum PendingBoundary {
152+
/// Resuming a no-filter boundary loop.
153+
NoFilter {
151154
saved_keys: Vec<ArrayRef>,
152155
saved_idx: usize,
153156
},
154-
/// The filtered boundary loop's `poll_next_outer_batch` returned
155-
/// Pending. Carries the key arrays and index from the last emitted
156-
/// outer batch so we can compare with the next batch's first key
157-
/// without reading from the inner key buffer (which may have been
158-
/// spilled to disk).
159-
FilteredPending {
157+
/// Resuming a filtered boundary loop. Inner key data remains in the
158+
/// buffer (or spill file) for the resumed loop.
159+
Filtered {
160160
saved_keys: Vec<ArrayRef>,
161161
saved_idx: usize,
162162
},
@@ -199,8 +199,8 @@ pub(super) struct SemiAntiSortMergeJoinStream {
199199
// current inner_batch was already sliced and pushed before Pending).
200200
buffering_inner_pending: bool,
201201

202-
// Boundary re-entry state — see BoundaryState doc comment.
203-
boundary_state: BoundaryState,
202+
// Boundary re-entry state — see PendingBoundary doc comment.
203+
pending_boundary: Option<PendingBoundary>,
204204

205205
// Join condition
206206
on_outer: Vec<PhysicalExprRef>,
@@ -283,7 +283,7 @@ impl SemiAntiSortMergeJoinStream {
283283
inner_key_buffer: vec![],
284284
inner_key_spill: None,
285285
buffering_inner_pending: false,
286-
boundary_state: BoundaryState::Normal,
286+
pending_boundary: None,
287287
on_outer,
288288
on_inner,
289289
filter,
@@ -684,11 +684,11 @@ impl SemiAntiSortMergeJoinStream {
684684
self.outer_batch.is_some(),
685685
"caller must load outer_batch first"
686686
);
687-
match std::mem::replace(&mut self.boundary_state, BoundaryState::Normal) {
688-
BoundaryState::NoFilterPending {
687+
match self.pending_boundary.take() {
688+
Some(PendingBoundary::NoFilter {
689689
saved_keys,
690690
saved_idx,
691-
} => {
691+
}) => {
692692
let same_key = keys_match(
693693
&saved_keys,
694694
saved_idx,
@@ -703,20 +703,20 @@ impl SemiAntiSortMergeJoinStream {
703703
if self.outer_offset >= num_outer {
704704
let new_saved = self.outer_key_arrays.clone();
705705
let new_idx = num_outer - 1;
706-
self.boundary_state = BoundaryState::NoFilterPending {
706+
self.pending_boundary = Some(PendingBoundary::NoFilter {
707707
saved_keys: new_saved,
708708
saved_idx: new_idx,
709-
};
709+
});
710710
self.emit_outer_batch()?;
711711
self.outer_batch = None;
712712
return Ok(true);
713713
}
714714
}
715715
}
716-
BoundaryState::FilteredPending {
716+
Some(PendingBoundary::Filtered {
717717
saved_keys,
718718
saved_idx,
719-
} => {
719+
}) => {
720720
debug_assert!(
721721
!self.inner_key_buffer.is_empty() || self.inner_key_spill.is_some(),
722722
"FilteredPending entered but no inner key data exists"
@@ -735,18 +735,18 @@ impl SemiAntiSortMergeJoinStream {
735735
if self.outer_offset >= num_outer {
736736
let new_saved = self.outer_key_arrays.clone();
737737
let new_idx = num_outer - 1;
738-
self.boundary_state = BoundaryState::FilteredPending {
738+
self.pending_boundary = Some(PendingBoundary::Filtered {
739739
saved_keys: new_saved,
740740
saved_idx: new_idx,
741-
};
741+
});
742742
self.emit_outer_batch()?;
743743
self.outer_batch = None;
744744
return Ok(true);
745745
}
746746
}
747747
self.clear_inner_key_group();
748748
}
749-
BoundaryState::Normal => {}
749+
None => {}
750750
}
751751
Ok(false)
752752
}
@@ -763,7 +763,7 @@ impl SemiAntiSortMergeJoinStream {
763763
Err(e) => return Poll::Ready(Err(e)),
764764
Ok(false) => {
765765
// Outer exhausted — flush coalescer
766-
self.boundary_state = BoundaryState::Normal;
766+
self.pending_boundary = None;
767767
self.coalescer.finish_buffered_batch()?;
768768
if let Some(batch) = self.coalescer.next_completed_batch() {
769769
return Poll::Ready(Ok(Some(batch)));
@@ -781,9 +781,7 @@ impl SemiAntiSortMergeJoinStream {
781781
// 2. Ensure we have an inner batch (unless inner is exhausted).
782782
// Skip this when in a boundary state — inner was already
783783
// advanced past the key group before the boundary loop started.
784-
if self.inner_batch.is_none()
785-
&& matches!(self.boundary_state, BoundaryState::Normal)
786-
{
784+
if self.inner_batch.is_none() && self.pending_boundary.is_none() {
787785
match ready!(self.poll_next_inner_batch(cx)) {
788786
Err(e) => return Poll::Ready(Err(e)),
789787
Ok(false) => {
@@ -932,26 +930,23 @@ impl SemiAntiSortMergeJoinStream {
932930
|| self.inner_key_spill.is_some(),
933931
"FilteredPending requires inner key data in buffer or spill"
934932
);
935-
self.boundary_state = BoundaryState::FilteredPending {
933+
self.pending_boundary = Some(PendingBoundary::Filtered {
936934
saved_keys,
937935
saved_idx,
938-
};
936+
});
939937

940938
match ready!(self.poll_next_outer_batch(cx)) {
941939
Err(e) => return Poll::Ready(Err(e)),
942940
Ok(false) => {
943-
self.boundary_state = BoundaryState::Normal;
941+
self.pending_boundary = None;
944942
self.outer_batch = None;
945943
break;
946944
}
947945
Ok(true) => {
948-
let BoundaryState::FilteredPending {
946+
let Some(PendingBoundary::Filtered {
949947
saved_keys,
950948
saved_idx,
951-
} = std::mem::replace(
952-
&mut self.boundary_state,
953-
BoundaryState::Normal,
954-
)
949+
}) = self.pending_boundary.take()
955950
else {
956951
unreachable!()
957952
};
@@ -992,31 +987,23 @@ impl SemiAntiSortMergeJoinStream {
992987
let saved_idx = num_outer - 1;
993988

994989
self.emit_outer_batch()?;
995-
// Save boundary state before polling. If
996-
// poll returns Pending, poll_join exits and
997-
// re-enters from the top. The saved state
998-
// lets step 1 resume the boundary loop.
999-
self.boundary_state = BoundaryState::NoFilterPending {
990+
self.pending_boundary = Some(PendingBoundary::NoFilter {
1000991
saved_keys,
1001992
saved_idx,
1002-
};
993+
});
1003994

1004995
match ready!(self.poll_next_outer_batch(cx)) {
1005996
Err(e) => return Poll::Ready(Err(e)),
1006997
Ok(false) => {
1007-
self.boundary_state = BoundaryState::Normal;
998+
self.pending_boundary = None;
1008999
self.outer_batch = None;
10091000
break;
10101001
}
10111002
Ok(true) => {
1012-
// Recover saved_keys from boundary state
1013-
let BoundaryState::NoFilterPending {
1003+
let Some(PendingBoundary::NoFilter {
10141004
saved_keys,
10151005
saved_idx,
1016-
} = std::mem::replace(
1017-
&mut self.boundary_state,
1018-
BoundaryState::Normal,
1019-
)
1006+
}) = self.pending_boundary.take()
10201007
else {
10211008
unreachable!()
10221009
};

0 commit comments

Comments
 (0)