Skip to content

Commit 51f13d7

Browse files
authored
perf: specialized SemiAntiSortMergeJoinStream (#20806)
## Which issue does this PR close? - N/A. ## Rationale for this change DataFusion's `SortMergeJoinExec` handles semi/anti joins by materializing `(outer, inner)` row pairs, applying a filter, then deduplicating with a corrected filter mask. Semi/anti joins only need a boolean per outer row — not pairs. The pair-based approach allocates unnecessary intermediate batches and index arrays to materialize output. Recent PRs have improved SMJ performance within the existing pair-based framework — [#18875](#18875) (BatchCoalescer to reduce concatenation overhead), [#20463](#20463) (zero-copy slice instead of take for contiguous indices), [#20478](#20478) (cached row counts to avoid O(n) recalculation) — but the fundamental mismatch remains: semi/anti joins don't need pairs at all. I think we're hitting diminishing returns on filtered semi/anti sort-merge joins (TPC-H Q21) and need a specialized stream. ## What changes are included in this PR? A new `SemiAntiSortMergeJoinStream` used internally by `SortMergeJoinExec` for `LeftSemi`, `LeftAnti`, `RightSemi`, and `RightAnti` joins. When `SortMergeJoinExec::execute()` encounters a semi/anti join type, it instantiates this stream instead of the existing `SortMergeJoinStream`. This is transparent to the rest of the system — no planner changes, no config flags, no new operators. Instead of materializing row pairs, the stream maintains a per-outer-batch bitset (`BooleanBufferBuilder`) recording which outer rows have a matching inner row, then emits output via `filter_record_batch`. **Algorithm:** Merge-scan across two sorted inputs. On key match without filter, set matched bits for the outer key group. With filter, buffer the inner key group and evaluate the filter as outer_slice × inner_scalar, OR-ing results into the bitset with `apply_bitwise_binary_op` (64 bits per iteration). Short-circuit when all outer rows in the group are matched. **Memory management:** The inner key group buffer is tracked via `MemoryReservation` and spilled to disk (via `SpillManager`) when the memory pool limit is exceeded, matching existing `SortMergeJoinExec` behavior. Metrics include `peak_mem_used`, `spill_count`, `spilled_bytes`, and `spilled_rows`. **Benchmark results** (best of 3, `dfbench smj`): | Query | Type | Old (ms) | New (ms) | Speedup | |-------|------|----------|----------|---------| | Q10 | LEFT SEMI, no filter | 4.79 | 4.27 | 1.12x | | Q11 | LEFT SEMI, 1% filter | 3.00 | 2.30 | 1.31x | | Q12 | LEFT SEMI, 50% filter | 38.1 | 5.52 | 6.9x | | Q13 | LEFT SEMI, 90% filter | 66.9 | 10.2 | 6.6x | | Q14 | LEFT ANTI, no filter | 4.96 | 4.13 | 1.20x | | Q15 | LEFT ANTI, partial match | 4.80 | 4.22 | 1.14x | | Q16 | LEFT ANTI, stress | 1.63 | 1.64 | 1.00x | | Q18 | LEFT SEMI, 2% filter | 7.61 | 5.34 | 1.42x | | Q19 | LEFT ANTI, partial match | 24.1 | 21.8 | 1.10x | Non-semi/anti queries are unaffected (same stream as before). ## Are these changes tested? - 5 stream-level unit tests covering async re-entry (`PendingStream`), batch boundary handling, filtered joins, and spill-to-disk edge cases - Fuzz tests (`join_fuzz.rs`) compare `SortMergeJoinExec` output against `HashJoinExec` for all semi/anti join types, with and without filters, across multiple batch sizes and sort key combinations. Ran 1000+ iterations locally with random seeds. - Existing SMJ unit tests in `sort_merge_join/tests.rs` continue to exercise semi/anti join types through `SortMergeJoinExec`, now hitting the new stream - Existing `sort_merge_join.slt` sqllogic tests pass (the stream change is transparent to the SQL layer) - I ported the operator to Comet apache/datafusion-comet#3648 and saw good speedup with TPC-H Q21 (SF10 on my laptop): **Current operator:** <img width="1414" height="1158" alt="Screenshot 2026-03-08 at 5 38 17 PM" src="https://github.com/user-attachments/assets/399c97e3-ef90-49c8-9c8d-220427636840" /> **PR #20806:** <img width="1340" height="890" alt="Screenshot 2026-03-08 at 5 38 10 PM" src="https://github.com/user-attachments/assets/f57d331b-9894-4290-85b3-6526b04a8a61" /> (metrics and spilling were not hooked up in the version I ported to Comet, but this query does not spill anyway) ## Are there any user-facing changes? No. The new stream is used automatically for semi/anti sort-merge joins. There are no new config flags, no API changes, and no changes to query plans. **Known limitations to address in follow-up PRs:** - Remove semi/anti-specific logic from the existing `SortMergeJoinStream` (dead code for those join types now). - I am keeping an eye on #20455.
1 parent 9b726bc commit 51f13d7

File tree

7 files changed

+2178
-30
lines changed

7 files changed

+2178
-30
lines changed

benchmarks/src/smj.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ const SMJ_QUERIES: &[&str] = &[
277277
WHERE EXISTS (
278278
SELECT 1 FROM t2_sorted
279279
WHERE t2_sorted.key = t1_sorted.key
280+
AND t2_sorted.data <> t1_sorted.data
280281
AND t2_sorted.data % 10 <> 0
281282
)
282283
"#,

datafusion/physical-plan/src/joins/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ mod cross_join;
3434
mod hash_join;
3535
mod nested_loop_join;
3636
mod piecewise_merge_join;
37+
pub(crate) mod semi_anti_sort_merge_join;
3738
mod sort_merge_join;
3839
mod stream_join_utils;
3940
mod symmetric_hash_join;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Specialized Sort Merge Join stream for Semi/Anti joins.
19+
//!
20+
//! Used internally by `SortMergeJoinExec` for semi/anti join types.
21+
22+
pub(crate) mod stream;
23+
24+
#[cfg(test)]
25+
mod tests;

0 commit comments

Comments
 (0)