|
15 | 15 | // specific language governing permissions and limitations |
16 | 16 | // under the License. |
17 | 17 |
|
18 | | -//! Stream implementation for semi/anti sort-merge joins. |
| 18 | +//! Sort-merge join stream specialized for semi/anti joins. |
| 19 | +//! |
| 20 | +//! Instantiated by [`SortMergeJoinExec`](crate::joins::sort_merge_join::SortMergeJoinExec) |
| 21 | +//! when the join type is `LeftSemi`, `LeftAnti`, `RightSemi`, or `RightAnti`. |
| 22 | +//! |
| 23 | +//! # Motivation |
| 24 | +//! |
| 25 | +//! The general-purpose [`SortMergeJoinStream`](crate::joins::sort_merge_join::SortMergeJoinStream) |
| 26 | +//! handles semi/anti joins by materializing `(outer, inner)` row pairs, |
| 27 | +//! applying a filter, then using a "corrected filter mask" to deduplicate. |
| 28 | +//! Semi/anti joins only need a boolean per outer row (does a match exist?), |
| 29 | +//! not pairs. The pair-based approach incurs unnecessary memory allocation |
| 30 | +//! and intermediate batches. |
| 31 | +//! |
| 32 | +//! This stream instead tracks matches with a per-outer-batch bitset, |
| 33 | +//! avoiding all pair materialization. |
| 34 | +//! |
| 35 | +//! # "Outer Side" vs "Inner Side" |
| 36 | +//! |
| 37 | +//! For `Left*` join types, left is outer and right is inner. |
| 38 | +//! For `Right*` join types, right is outer and left is inner. |
| 39 | +//! The output schema always equals the outer side's schema. |
| 40 | +//! |
| 41 | +//! # Algorithm |
| 42 | +//! |
| 43 | +//! Both inputs must be sorted by the join keys. The stream performs a merge |
| 44 | +//! scan across the two sorted inputs: |
| 45 | +//! |
| 46 | +//! ```text |
| 47 | +//! outer cursor ──► [1, 2, 2, 3, 5, 5, 7] |
| 48 | +//! inner cursor ──► [2, 2, 4, 5, 6, 7, 7] |
| 49 | +//! ▲ |
| 50 | +//! compare keys at cursors |
| 51 | +//! ``` |
| 52 | +//! |
| 53 | +//! At each step, the keys at the outer and inner cursors are compared: |
| 54 | +//! |
| 55 | +//! - **outer < inner**: Skip the outer key group (no match exists). |
| 56 | +//! - **outer > inner**: Skip the inner key group. |
| 57 | +//! - **outer == inner**: Process the match (see below). |
| 58 | +//! |
| 59 | +//! Key groups are contiguous runs of equal keys within one side. The scan |
| 60 | +//! advances past entire groups at each step. |
| 61 | +//! |
| 62 | +//! ## Processing a key match |
| 63 | +//! |
| 64 | +//! **Without filter**: All outer rows in the key group are marked as matched. |
| 65 | +//! |
| 66 | +//! **With filter**: The inner key group is buffered (may span multiple inner |
| 67 | +//! batches). For each buffered inner row, the filter is evaluated against the |
| 68 | +//! outer key group as a batch. Results are OR'd into the matched bitset. A |
| 69 | +//! short-circuit exits early when all outer rows in the group are matched. |
| 70 | +//! |
| 71 | +//! ```text |
| 72 | +//! matched bitset: [0, 0, 1, 0, 0, ...] |
| 73 | +//! ▲── one bit per outer row ──▲ |
| 74 | +//! |
| 75 | +//! On emit: |
| 76 | +//! Semi → filter_record_batch(outer_batch, &matched) |
| 77 | +//! Anti → filter_record_batch(outer_batch, &NOT(matched)) |
| 78 | +//! ``` |
| 79 | +//! |
| 80 | +//! ## Batch boundaries |
| 81 | +//! |
| 82 | +//! Key groups can span batch boundaries on either side. The stream handles |
| 83 | +//! this by detecting when a group extends to the end of a batch, loading the |
| 84 | +//! next batch, and continuing if the key matches. The [`PendingBoundary`] enum |
| 85 | +//! preserves loop context across async `Poll::Pending` re-entries. |
| 86 | +//! |
| 87 | +//! # Memory |
| 88 | +//! |
| 89 | +//! Memory usage is bounded and independent of total input size: |
| 90 | +//! - One outer batch at a time (not tracked by reservation — single batch, |
| 91 | +//! cannot be spilled since it's needed for filter evaluation) |
| 92 | +//! - One inner batch at a time (streaming) |
| 93 | +//! - `matched` bitset: one bit per outer row, re-allocated per batch |
| 94 | +//! - Inner key group buffer: only for filtered joins, one key group at a time. |
| 95 | +//! Tracked via `MemoryReservation`; spilled to disk when the memory pool |
| 96 | +//! limit is exceeded. |
| 97 | +//! - `BatchCoalescer`: output buffering to target batch size |
| 98 | +//! |
| 99 | +//! # Degenerate cases |
| 100 | +//! |
| 101 | +//! **Highly skewed key (filtered joins only):** When a filter is present, |
| 102 | +//! the inner key group is buffered so each inner row can be evaluated |
| 103 | +//! against the outer group. If one join key has N inner rows, all N rows |
| 104 | +//! are held in memory simultaneously (or spilled to disk if the memory |
| 105 | +//! pool limit is reached). With uniform key distribution this is small |
| 106 | +//! (inner_rows / num_distinct_keys), but a single hot key can buffer |
| 107 | +//! arbitrarily many rows. The no-filter path does not buffer inner |
| 108 | +//! rows — it only advances the cursor — so it is unaffected. |
| 109 | +//! |
| 110 | +//! **Scalar broadcast during filter evaluation:** Each inner row is |
| 111 | +//! broadcast to match the outer group length for filter evaluation, |
| 112 | +//! allocating one array per inner row × filter column. This is inherent |
| 113 | +//! to the `PhysicalExpr::evaluate(RecordBatch)` API, which does not |
| 114 | +//! support scalar inputs directly. The total work is |
| 115 | +//! O(inner_group × outer_group) per key, but with much lower constant |
| 116 | +//! factor than the pair-materialization approach. |
19 | 117 |
|
20 | 118 | use std::cmp::Ordering; |
21 | 119 | use std::fs::File; |
|
0 commit comments