Skip to content

feat: Support RIGHT/FULL joins in NLJ memory-limited execution#21833

Merged
viirya merged 2 commits intoapache:mainfrom
viirya:nlj-right-full-join-spill
May 5, 2026
Merged

feat: Support RIGHT/FULL joins in NLJ memory-limited execution#21833
viirya merged 2 commits intoapache:mainfrom
viirya:nlj-right-full-join-spill

Conversation

@viirya
Copy link
Copy Markdown
Member

@viirya viirya commented Apr 24, 2026

Which issue does this PR close?

  • Closes #.

Rationale for this change

What changes are included in this PR?

Previously RIGHT/FULL/RIGHT SEMI/RIGHT ANTI/RIGHT MARK joins were excluded from the memory-limited (multi-pass) fallback path because they need to track which right rows have been matched across all left chunks. They would OOM instead of spilling.

Now all join types support the fallback:

  • A global right bitmap (Vec, indexed by right batch sequence number) accumulates matches across all left chunk passes. ReplayableStreamSource guarantees consistent batch boundaries across passes, so batch sequence numbers are stable.

  • In memory-limited mode, EmitRightUnmatched merges the current batch's bitmap into the global accumulator (bitwise OR) instead of emitting unmatched rows immediately.

  • After the last left chunk, a new state EmitGlobalRightUnmatched replays the right side one more time and uses the accumulated bitmap to emit unmatched right rows correctly.

Single-pass behavior is unchanged: the global bitmap path is only active when is_memory_limited() is true.

Co-authored-by: Claude Code

Are these changes tested?

Unit tests and e2e tests

Are there any user-facing changes?

No

@github-actions github-actions Bot added sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels Apr 24, 2026
@viirya viirya marked this pull request as draft April 24, 2026 15:52
@viirya viirya force-pushed the nlj-right-full-join-spill branch 4 times, most recently from eb80057 to 57c8cb8 Compare April 24, 2026 17:41
@viirya viirya marked this pull request as ready for review April 24, 2026 18:07
Copy link
Copy Markdown
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya
Thanks for working on this. I think there are a couple of correctness and test coverage issues that should be addressed before this lands.

&& !need_produce_right_in_final(self.join_type)
{
// Condition: disk manager supports temp files (needed for spilling)
let spill_state = if context.runtime_env().disk_manager.tmp_files_enabled() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to keep the core invariant here that memory-limited execution produces the same results as the single-pass NLJ for every enabled join type.

Enabling fallback for FULL joins exposes the memory-limited path to multi-partition right inputs, but each output partition builds its own per-chunk JoinLeftData with AtomicUsize::new(1). That means left-unmatched rows are emitted based only on matches seen by that partition's right-side input.

In the single-pass path, collect_left_input(..., right_partition_count) coordinates left-unmatched emission across all right partitions. The fallback path does not appear to do that yet.

For a FULL JOIN with target_partitions > 1, a left row that matches a row in another right partition can still be emitted as unmatched by this partition, which would produce incorrect duplicate/null-padded rows.

Could we either keep the previous exclusion for FULL until the fallback path has coordinated cross-partition left match state, or make the memory-limited left bitmap/probe completion shared across right partitions for each left chunk?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — you're right that the fallback path doesn't yet coordinate left-bitmap state across right partitions. I've added a guard that disables fallback for FULL JOIN when right_partition_count > 1, falling back to standard OOM behavior in that case. This preserves correctness for the case you flagged.

A note: I noticed the same pre-existing concern applies to LEFT/LEFT SEMI/LEFT ANTI/LEFT MARK in the multi-partition fallback path, which was introduced in Phase 1 of this work (#21448) — not new to this PR. We should fix it. Since it requires a larger refactor (sharing the per-chunk JoinLeftData and probe-thread counter across partitions), I'll address it in a follow-up PR rather than expanding this one's scope.

# we must emit the correct unmatched rows at the end.
query II rowsort
SELECT t1.v1, t2.v2
FROM generate_series(1, 5) AS t1(v1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test says it exercises the global right bitmap across multiple left chunks, but generate_series(1, 5) looks too small to trigger the memory-limited fallback under the 150K limit. As a result, it seems to run through the unchanged single-pass path instead.

The larger RIGHT JOIN case above does spill, but all left chunks match the single right row, so it would not catch a bad global OR/accumulation or incorrect final unmatched emission.

Could we add a spilling RIGHT, FULL, RIGHT ANTI, or RIGHT MARK case where different right rows are matched by different left chunks and at least one right row remains unmatched? It would also be good to assert spill_count for that query so we know the fallback path is actually exercised.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — the small generate_series(1, 5) cases ran in single-pass and didn't exercise the global bitmap. I've replaced them with a 100K-left × 200-right test using the predicate (t1.v1 + t2.v2) = 2 AND t2.v2 <= 100. This is non-equi (forces NLJ), forces spill (left side ~800KB > 150K limit), and produces exactly 1 matched pair + 199 unmatched right rows — so each right batch has both bits-on and bits-off entries that must be correctly accumulated across passes. There's a corresponding EXPLAIN ANALYZE assertion confirming spill_count=2. Added the same predicate for FULL JOIN too.

let bitmap = std::mem::take(&mut self.current_right_batch_matched)
.expect("right bitmap should be available");
let (values, _nulls) = bitmap.into_parts();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small readability suggestion: could this bitmap merge/accounting block be extracted into a helper on SpillStateActive, maybe something like merge_current_right_bitmap(idx, values)?

The state machine is already pretty dense, and centralizing the first-seen vs OR-merge behavior would make the global bitmap invariant easier to audit and test.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — extracted to SpillStateActive::merge_current_right_bitmap(idx, values), which centralizes the first-seen-vs-OR-merge behavior and the reservation accounting. The state-machine call site is now a 2-line invocation.

@viirya viirya force-pushed the nlj-right-full-join-spill branch from 57c8cb8 to 2232ddd Compare May 1, 2026 07:35
Previously RIGHT/FULL/RIGHT SEMI/RIGHT ANTI/RIGHT MARK joins were
excluded from the memory-limited (multi-pass) fallback path because
they need to track which right rows have been matched across all
left chunks. They would OOM instead of spilling.

Now all join types support the fallback:

- A global right bitmap (Vec<BooleanBuffer>, indexed by right batch
  sequence number) accumulates matches across all left chunk passes.
  ReplayableStreamSource guarantees consistent batch boundaries
  across passes, so batch sequence numbers are stable.

- In memory-limited mode, EmitRightUnmatched merges the current
  batch's bitmap into the global accumulator (bitwise OR) instead
  of emitting unmatched rows immediately.

- After the last left chunk, a new state EmitGlobalRightUnmatched
  replays the right side one more time and uses the accumulated
  bitmap to emit unmatched right rows correctly.

Single-pass behavior is unchanged: the global bitmap path is only
active when is_memory_limited() is true.

Co-authored-by: Claude Code
@viirya viirya force-pushed the nlj-right-full-join-spill branch from 2232ddd to 87f64ba Compare May 1, 2026 07:43
The optimizer pushes `t2.v2 <= 100` into a projection on the right
side, rewriting it as a `join_proj_push_down_*` boolean column. This
inserts an extra ProjectionExec and changes the NLJ filter expression.
Update the expected EXPLAIN ANALYZE output to match.

Co-authored-by: Claude Code
Copy link
Copy Markdown
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya
Thanks for the iteration.

Looks good to me now.

@viirya
Copy link
Copy Markdown
Member Author

viirya commented May 4, 2026

Thank you @kosiew

@viirya viirya added this pull request to the merge queue May 5, 2026
Merged via the queue into apache:main with commit 36b1927 May 5, 2026
39 checks passed
@viirya viirya deleted the nlj-right-full-join-spill branch May 5, 2026 06:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants