Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
974d305
Remove dead code.
mbutrovich Mar 25, 2026
3de5c0c
More cleanup.
mbutrovich Mar 25, 2026
a570c6c
move mark logic
mbutrovich Mar 25, 2026
e4fcdd1
move mark logic
mbutrovich Mar 25, 2026
e66a44b
add benchmark, optimize remaining smj stream
mbutrovich Mar 26, 2026
640dddc
clean up, debug_asserts
mbutrovich Mar 26, 2026
f71e161
add a new test
mbutrovich Mar 26, 2026
cd799a6
scale benchmark
mbutrovich Mar 26, 2026
d922b9b
Batch deferred filtering for outer joins with unique keys
mbutrovich Mar 26, 2026
14d9653
add comments
mbutrovich Mar 26, 2026
5779054
Merge branch 'main' into simplify_smj_full_opt
mbutrovich Mar 26, 2026
1c1bec5
clippy fix.
mbutrovich Mar 26, 2026
9fc21a0
add clarifying comment
mbutrovich Mar 26, 2026
66632ef
remove booleans is_semi is_mark and just use JoinType enum.
mbutrovich Mar 26, 2026
60127a7
clean up redundant comment next to already-verbose unreachable! macro.
mbutrovich Mar 26, 2026
481753a
clearer debug_assert messages
mbutrovich Mar 26, 2026
eeafcd0
Merge branch 'main' into simplify_smj_full_opt
mbutrovich Mar 26, 2026
2d2758b
add 3 spilling SMJ unit tests and 1 spilling SMJ fuzz test
mbutrovich Mar 26, 2026
e051e23
Merge branch 'main' into simplify_smj_full_opt
mbutrovich Mar 27, 2026
335db42
add sqllogictest test for #21197.
mbutrovich Mar 27, 2026
802a121
get scaled benchmarks and new mark join benchmarks
mbutrovich Mar 27, 2026
768529e
Merge branch 'main' into simplify_smj_full_opt
mbutrovich Mar 27, 2026
99a8642
Merge branch 'main' into simplify_smj_full_opt
mbutrovich Mar 27, 2026
3be0029
address some PR feedback. Rename SMJ streams, collapse the bitwise (S…
mbutrovich Mar 27, 2026
0c39df3
add renamed files.
mbutrovich Mar 27, 2026
e077499
Merge branch 'main' into simplify_smj_full_opt
mbutrovich Mar 27, 2026
fa4e99d
combine test files.
mbutrovich Mar 27, 2026
2ca0b09
Merge branch 'main' into simplify_smj_full_opt
mbutrovich Mar 30, 2026
f1d40b9
Merge branch 'main' into simplify_smj_full_opt
mbutrovich Mar 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion benchmarks/src/smj.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use futures::StreamExt;
#[derive(Debug, Args, Clone)]
#[command(verbatim_doc_comment)]
pub struct RunOpt {
/// Query number (between 1 and 20). If not specified, runs all queries
/// Query number (between 1 and 23). If not specified, runs all queries
#[arg(short, long)]
query: Option<usize>,

Expand Down Expand Up @@ -410,6 +410,52 @@ const SMJ_QUERIES: &[&str] = &[
FROM t1_sorted JOIN t2_sorted ON t1_sorted.key = t2_sorted.key
GROUP BY t1_sorted.key
"#,
// Q21: INNER 10M x 10M | unique keys (1:1) | 50% join filter
r#"
WITH t1_sorted AS (
SELECT value as key, value as data
FROM range(10000000) ORDER BY value
),
t2_sorted AS (
SELECT value as key, value as data
FROM range(10000000) ORDER BY value
)
SELECT t1_sorted.key, t1_sorted.data as d1, t2_sorted.data as d2
FROM t1_sorted JOIN t2_sorted
ON t1_sorted.key = t2_sorted.key
AND t1_sorted.data + t2_sorted.data < 10000000
"#,
// Q22: LEFT 10M x 10M | unique keys (1:1) | 50% join filter
r#"
WITH t1_sorted AS (
SELECT value as key, value as data
FROM range(10000000) ORDER BY value
),
t2_sorted AS (
SELECT value as key, value as data
FROM range(10000000) ORDER BY value
)
SELECT t1_sorted.key, t1_sorted.data as d1, t2_sorted.data as d2
FROM t1_sorted LEFT JOIN t2_sorted
ON t1_sorted.key = t2_sorted.key
AND t1_sorted.data + t2_sorted.data < 10000000
"#,
// Q23: FULL 10M x 10M | unique keys (1:1) | 50% join filter
r#"
WITH t1_sorted AS (
SELECT value as key, value as data
FROM range(10000000) ORDER BY value
),
t2_sorted AS (
SELECT value as key, value as data
FROM range(10000000) ORDER BY value
)
SELECT t1_sorted.key as k1, t1_sorted.data as d1,
t2_sorted.key as k2, t2_sorted.data as d2
FROM t1_sorted FULL JOIN t2_sorted
ON t1_sorted.key = t2_sorted.key
AND t1_sorted.data + t2_sorted.data < 10000000
"#,
];

impl RunOpt {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ mod cross_join;
mod hash_join;
mod nested_loop_join;
mod piecewise_merge_join;
pub(crate) mod semi_anti_sort_merge_join;
pub(crate) mod semi_anti_mark_sort_merge_join;
mod sort_merge_join;
mod stream_join_utils;
mod symmetric_hash_join;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.

//! Specialized Sort Merge Join stream for Semi/Anti joins.
//! Specialized Sort Merge Join stream for Semi/Anti/Mark joins.
//!
//! Used internally by `SortMergeJoinExec` for semi/anti join types.
//! Used internally by `SortMergeJoinExec` for semi/anti/mark join types.

pub(crate) mod stream;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.

//! Sort-merge join stream specialized for semi/anti joins.
//! Sort-merge join stream specialized for semi/anti/mark joins.
//!
//! Instantiated by [`SortMergeJoinExec`](crate::joins::sort_merge_join::SortMergeJoinExec)
//! when the join type is `LeftSemi`, `LeftAnti`, `RightSemi`, or `RightAnti`.
//! when the join type is `LeftSemi`, `LeftAnti`, `RightSemi`, `RightAnti`,
//! `LeftMark`, or `RightMark`.
//!
//! # Motivation
//!
Expand All @@ -36,7 +37,8 @@
//!
//! For `Left*` join types, left is outer and right is inner.
//! For `Right*` join types, right is outer and left is inner.
//! The output schema always equals the outer side's schema.
//! The output schema always equals the outer side's schema (for semi/anti)
//! or the outer side's schema plus a boolean mark column (for mark joins).
//!
//! # Algorithm
//!
Expand Down Expand Up @@ -75,6 +77,7 @@
//! On emit:
//! Semi → filter_record_batch(outer_batch, &matched)
//! Anti → filter_record_batch(outer_batch, &NOT(matched))
//! Mark → outer_batch + matched as boolean column
//! ```
//!
//! ## Batch boundaries
Expand Down Expand Up @@ -245,10 +248,13 @@ enum PendingBoundary {
Filtered { saved_keys: Vec<ArrayRef> },
}

pub(crate) struct SemiAntiSortMergeJoinStream {
// Decomposed from JoinType to avoid matching on 4 variants in hot paths.
pub(crate) struct SemiAntiMarkSortMergeJoinStream {
// Decomposed from JoinType to avoid matching on 6 variants in hot paths.
// true for semi (emit matched), false for anti (emit unmatched).
// Ignored when is_mark is true.
is_semi: bool,
// true for mark joins (emit all rows with match boolean column).
is_mark: bool,

// Input streams — in the nested-loop model that sort-merge join
// implements, "outer" is the driving loop and "inner" is probed for
Expand Down Expand Up @@ -330,7 +336,7 @@ pub(crate) struct SemiAntiSortMergeJoinStream {
batch_emitted: bool,
}

impl SemiAntiSortMergeJoinStream {
impl SemiAntiMarkSortMergeJoinStream {
#[expect(clippy::too_many_arguments)]
pub fn try_new(
schema: SchemaRef,
Expand All @@ -350,8 +356,24 @@ impl SemiAntiSortMergeJoinStream {
spill_manager: SpillManager,
runtime_env: Arc<datafusion_execution::runtime_env::RuntimeEnv>,
) -> Result<Self> {
debug_assert!(
matches!(
join_type,
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::RightMark
),
"SemiAntiMarkSortMergeJoinStream does not handle {join_type:?}"
);
let is_semi = matches!(join_type, JoinType::LeftSemi | JoinType::RightSemi);
let outer_is_left = matches!(join_type, JoinType::LeftSemi | JoinType::LeftAnti);
let is_mark = matches!(join_type, JoinType::LeftMark | JoinType::RightMark);
let outer_is_left = matches!(
join_type,
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark
);

let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition);
let input_batches =
Expand All @@ -361,6 +383,7 @@ impl SemiAntiSortMergeJoinStream {

Ok(Self {
is_semi,
is_mark,
outer,
inner,
outer_batch: None,
Expand Down Expand Up @@ -492,17 +515,33 @@ impl SemiAntiSortMergeJoinStream {

// finish() converts the bit-packed builder directly to a
// BooleanBuffer — no iteration or repacking needed.
let selection = BooleanArray::new(self.matched.finish(), None);

let selection = if self.is_semi {
selection
let matched_buf = self.matched.finish();

if self.is_mark {
// Mark joins emit ALL outer rows with a boolean match column appended.
debug_assert_eq!(
self.schema.fields().len(),
batch.num_columns() + 1,
"Mark join output schema should be outer schema + 1 mark column"
);
let mark_col = Arc::new(BooleanArray::new(matched_buf, None)) as ArrayRef;
let mut columns = batch.columns().to_vec();
columns.push(mark_col);
let output = RecordBatch::try_new(Arc::clone(&self.schema), columns)?;
self.coalescer.push_batch(output)?;
} else {
not(&selection)?
};
let selection = BooleanArray::new(matched_buf, None);

let selection = if self.is_semi {
selection
} else {
not(&selection)?
};

let filtered = filter_record_batch(batch, &selection)?;
if filtered.num_rows() > 0 {
self.coalescer.push_batch(filtered)?;
let filtered = filter_record_batch(batch, &selection)?;
if filtered.num_rows() > 0 {
self.coalescer.push_batch(filtered)?;
}
}
Ok(())
}
Expand Down Expand Up @@ -1184,7 +1223,7 @@ fn keys_match(

/// Evaluate the join filter for one inner row against a slice of outer rows.
///
/// Free function (not a method on SemiAntiSortMergeJoinStream) so that Rust
/// Free function (not a method on SemiAntiMarkSortMergeJoinStream) so that Rust
/// can split the struct borrow in process_key_match_with_filter: the caller
/// holds &mut self.matched and &self.inner_key_buffer simultaneously, which
/// is impossible if this borrows all of &self.
Expand Down Expand Up @@ -1257,7 +1296,7 @@ fn evaluate_filter_for_inner_row(
}
}

impl Stream for SemiAntiSortMergeJoinStream {
impl Stream for SemiAntiMarkSortMergeJoinStream {
Comment thread
mbutrovich marked this conversation as resolved.
Outdated
type Item = Result<RecordBatch>;

fn poll_next(
Expand All @@ -1269,7 +1308,7 @@ impl Stream for SemiAntiSortMergeJoinStream {
}
}

impl RecordBatchStream for SemiAntiSortMergeJoinStream {
impl RecordBatchStream for SemiAntiMarkSortMergeJoinStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use super::stream::SemiAntiSortMergeJoinStream;
use super::stream::SemiAntiMarkSortMergeJoinStream;
use crate::ExecutionPlan;
use crate::RecordBatchStream;
use crate::common;
Expand Down Expand Up @@ -149,8 +149,10 @@ impl RecordBatchStream for PendingStream {
}
}

/// Helper: collect all output from a SemiAntiSortMergeJoinStream.
async fn collect_stream(stream: SemiAntiSortMergeJoinStream) -> Result<Vec<RecordBatch>> {
/// Helper: collect all output from a SemiAntiMarkSortMergeJoinStream.
async fn collect_stream(
stream: SemiAntiMarkSortMergeJoinStream,
) -> Result<Vec<RecordBatch>> {
common::collect(Box::pin(stream)).await
}

Expand Down Expand Up @@ -259,7 +261,7 @@ async fn filter_buffer_pending_loses_inner_rows() -> Result<()> {
let inner_schema = inner.schema();
let (reservation, peak_mem_used, spill_manager, runtime_env) =
test_stream_resources(inner_schema, &metrics);
let stream = SemiAntiSortMergeJoinStream::try_new(
let stream = SemiAntiMarkSortMergeJoinStream::try_new(
left_schema, // output schema = outer schema for semi
vec![SortOptions::default()],
NullEquality::NullEqualsNothing,
Expand Down Expand Up @@ -359,7 +361,7 @@ async fn no_filter_boundary_pending_loses_outer_rows() -> Result<()> {
let inner_schema = inner.schema();
let (reservation, peak_mem_used, spill_manager, runtime_env) =
test_stream_resources(inner_schema, &metrics);
let stream = SemiAntiSortMergeJoinStream::try_new(
let stream = SemiAntiMarkSortMergeJoinStream::try_new(
left_schema,
vec![SortOptions::default()],
NullEquality::NullEqualsNothing,
Expand Down Expand Up @@ -473,7 +475,7 @@ async fn filtered_boundary_pending_outer_rows() -> Result<()> {
let inner_schema = inner.schema();
let (reservation, peak_mem_used, spill_manager, runtime_env) =
test_stream_resources(inner_schema, &metrics);
let stream = SemiAntiSortMergeJoinStream::try_new(
let stream = SemiAntiMarkSortMergeJoinStream::try_new(
left_schema,
vec![SortOptions::default()],
NullEquality::NullEqualsNothing,
Expand Down Expand Up @@ -756,7 +758,7 @@ async fn spill_filtered_boundary_loses_outer_rows() -> Result<()> {
Arc::clone(&right_schema),
);

let stream = SemiAntiSortMergeJoinStream::try_new(
let stream = SemiAntiMarkSortMergeJoinStream::try_new(
Arc::clone(&left_schema),
vec![SortOptions::default()],
NullEquality::NullEqualsNothing,
Expand Down
8 changes: 6 additions & 2 deletions datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;

use crate::execution_plan::{EmissionType, boundedness_from_children};
use crate::expressions::PhysicalSortExpr;
use crate::joins::semi_anti_sort_merge_join::stream::SemiAntiSortMergeJoinStream;
use crate::joins::semi_anti_mark_sort_merge_join::stream::SemiAntiMarkSortMergeJoinStream;
use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics;
use crate::joins::sort_merge_join::stream::SortMergeJoinStream;
use crate::joins::utils::{
Expand Down Expand Up @@ -337,6 +337,8 @@ impl SortMergeJoinExec {
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::RightMark
) {
Ok(Arc::new(new_join))
} else {
Expand Down Expand Up @@ -534,6 +536,8 @@ impl ExecutionPlan for SortMergeJoinExec {
| JoinType::LeftAnti
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::RightMark
) {
let peak_mem_used =
MetricBuilder::new(&self.metrics).gauge("peak_mem_used", partition);
Expand All @@ -544,7 +548,7 @@ impl ExecutionPlan for SortMergeJoinExec {
)
.with_compression_type(context.session_config().spill_compression());

Ok(Box::pin(SemiAntiSortMergeJoinStream::try_new(
Ok(Box::pin(SemiAntiMarkSortMergeJoinStream::try_new(
Arc::clone(&self.schema),
self.sort_options.clone(),
self.null_equality,
Expand Down
Loading
Loading