Skip to content

Commit 42dd427

Browse files
andygroveclaude
andauthored
bench: Add criterion benchmark for sort merge join (#20464)
## Summary - Adds a criterion micro-benchmark for SortMergeJoinExec that measures join kernel performance in isolation - Pre-sorted RecordBatches are fed directly into the join operator, avoiding sort/scan overhead - Data is constructed once and reused across iterations; only the `TestMemoryExec` wrapper is recreated per iteration ## Benchmarks Five scenarios covering the most common SMJ patterns: | Benchmark | Join Type | Key Pattern | |-----------|-----------|-------------| | `inner_1to1` | Inner | 100K unique keys per side | | `inner_1to10` | Inner | 10K keys, ~10 rows per key | | `left_1to1_unmatched` | Left | ~5% unmatched on left side | | `left_semi_1to10` | Left Semi | 10K keys | | `left_anti_partial` | Left Anti | Partial key overlap | ## Usage ```bash cargo bench -p datafusion-physical-plan --features test_utils --bench sort_merge_join ``` 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0d63ced commit 42dd427

File tree

2 files changed

+209
-0
lines changed

2 files changed

+209
-0
lines changed

datafusion/physical-plan/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ name = "spill_io"
9898
harness = false
9999
name = "sort_preserving_merge"
100100

101+
[[bench]]
102+
harness = false
103+
name = "sort_merge_join"
104+
required-features = ["test_utils"]
105+
101106
[[bench]]
102107
harness = false
103108
name = "aggregate_vectorized"
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Criterion benchmarks for Sort Merge Join
19+
//!
20+
//! These benchmarks measure the join kernel in isolation by feeding
21+
//! pre-sorted RecordBatches directly into SortMergeJoinExec, avoiding
22+
//! sort / scan overhead.
23+
24+
use std::sync::Arc;
25+
26+
use arrow::array::{Int64Array, RecordBatch, StringArray};
27+
use arrow::compute::SortOptions;
28+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
29+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
30+
use datafusion_common::NullEquality;
31+
use datafusion_execution::TaskContext;
32+
use datafusion_physical_expr::expressions::col;
33+
use datafusion_physical_plan::collect;
34+
use datafusion_physical_plan::joins::{SortMergeJoinExec, utils::JoinOn};
35+
use datafusion_physical_plan::test::TestMemoryExec;
36+
use tokio::runtime::Runtime;
37+
38+
/// Build pre-sorted RecordBatches (split into ~8192-row chunks).
39+
///
40+
/// Schema: (key: Int64, data: Int64, payload: Utf8)
41+
///
42+
/// `key_mod` controls distinct key count: key = row_index % key_mod.
43+
fn build_sorted_batches(
44+
num_rows: usize,
45+
key_mod: usize,
46+
schema: &SchemaRef,
47+
) -> Vec<RecordBatch> {
48+
let mut rows: Vec<(i64, i64)> = (0..num_rows)
49+
.map(|i| ((i % key_mod) as i64, i as i64))
50+
.collect();
51+
rows.sort();
52+
53+
let keys: Vec<i64> = rows.iter().map(|(k, _)| *k).collect();
54+
let data: Vec<i64> = rows.iter().map(|(_, d)| *d).collect();
55+
let payload: Vec<String> = data.iter().map(|d| format!("val_{d}")).collect();
56+
57+
let batch = RecordBatch::try_new(
58+
Arc::clone(schema),
59+
vec![
60+
Arc::new(Int64Array::from(keys)),
61+
Arc::new(Int64Array::from(data)),
62+
Arc::new(StringArray::from(payload)),
63+
],
64+
)
65+
.unwrap();
66+
67+
let batch_size = 8192;
68+
let mut batches = Vec::new();
69+
let mut offset = 0;
70+
while offset < batch.num_rows() {
71+
let len = (batch.num_rows() - offset).min(batch_size);
72+
batches.push(batch.slice(offset, len));
73+
offset += len;
74+
}
75+
batches
76+
}
77+
78+
fn make_exec(
79+
batches: &[RecordBatch],
80+
schema: &SchemaRef,
81+
) -> Arc<dyn datafusion_physical_plan::ExecutionPlan> {
82+
TestMemoryExec::try_new_exec(&[batches.to_vec()], Arc::clone(schema), None).unwrap()
83+
}
84+
85+
fn schema() -> SchemaRef {
86+
Arc::new(Schema::new(vec![
87+
Field::new("key", DataType::Int64, false),
88+
Field::new("data", DataType::Int64, false),
89+
Field::new("payload", DataType::Utf8, false),
90+
]))
91+
}
92+
93+
fn do_join(
94+
left: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
95+
right: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
96+
join_type: datafusion_common::JoinType,
97+
rt: &Runtime,
98+
) -> usize {
99+
let on: JoinOn = vec![(
100+
col("key", &left.schema()).unwrap(),
101+
col("key", &right.schema()).unwrap(),
102+
)];
103+
let join = SortMergeJoinExec::try_new(
104+
left,
105+
right,
106+
on,
107+
None,
108+
join_type,
109+
vec![SortOptions::default()],
110+
NullEquality::NullEqualsNothing,
111+
)
112+
.unwrap();
113+
114+
let task_ctx = Arc::new(TaskContext::default());
115+
rt.block_on(async {
116+
let batches = collect(Arc::new(join), task_ctx).await.unwrap();
117+
batches.iter().map(|b| b.num_rows()).sum()
118+
})
119+
}
120+
121+
fn bench_smj(c: &mut Criterion) {
122+
let rt = Runtime::new().unwrap();
123+
let s = schema();
124+
125+
let mut group = c.benchmark_group("sort_merge_join");
126+
127+
// 1:1 Inner Join — 100K rows each, unique keys
128+
// Best case for contiguous-range optimization: every index array is [0,1,2,...].
129+
{
130+
let n = 100_000;
131+
let left_batches = build_sorted_batches(n, n, &s);
132+
let right_batches = build_sorted_batches(n, n, &s);
133+
group.bench_function(BenchmarkId::new("inner_1to1", n), |b| {
134+
b.iter(|| {
135+
let left = make_exec(&left_batches, &s);
136+
let right = make_exec(&right_batches, &s);
137+
do_join(left, right, datafusion_common::JoinType::Inner, &rt)
138+
})
139+
});
140+
}
141+
142+
// 1:10 Inner Join — 100K left, 100K right, 10K distinct keys
143+
{
144+
let n = 100_000;
145+
let key_mod = 10_000;
146+
let left_batches = build_sorted_batches(n, key_mod, &s);
147+
let right_batches = build_sorted_batches(n, key_mod, &s);
148+
group.bench_function(BenchmarkId::new("inner_1to10", n), |b| {
149+
b.iter(|| {
150+
let left = make_exec(&left_batches, &s);
151+
let right = make_exec(&right_batches, &s);
152+
do_join(left, right, datafusion_common::JoinType::Inner, &rt)
153+
})
154+
});
155+
}
156+
157+
// Left Join — 100K each, ~5% unmatched on left
158+
{
159+
let n = 100_000;
160+
let left_batches = build_sorted_batches(n, n + n / 20, &s);
161+
let right_batches = build_sorted_batches(n, n, &s);
162+
group.bench_function(BenchmarkId::new("left_1to1_unmatched", n), |b| {
163+
b.iter(|| {
164+
let left = make_exec(&left_batches, &s);
165+
let right = make_exec(&right_batches, &s);
166+
do_join(left, right, datafusion_common::JoinType::Left, &rt)
167+
})
168+
});
169+
}
170+
171+
// Left Semi Join — 100K left, 100K right, 10K keys
172+
{
173+
let n = 100_000;
174+
let key_mod = 10_000;
175+
let left_batches = build_sorted_batches(n, key_mod, &s);
176+
let right_batches = build_sorted_batches(n, key_mod, &s);
177+
group.bench_function(BenchmarkId::new("left_semi_1to10", n), |b| {
178+
b.iter(|| {
179+
let left = make_exec(&left_batches, &s);
180+
let right = make_exec(&right_batches, &s);
181+
do_join(left, right, datafusion_common::JoinType::LeftSemi, &rt)
182+
})
183+
});
184+
}
185+
186+
// Left Anti Join — 100K left, 100K right, partial match
187+
{
188+
let n = 100_000;
189+
let left_batches = build_sorted_batches(n, n + n / 5, &s);
190+
let right_batches = build_sorted_batches(n, n, &s);
191+
group.bench_function(BenchmarkId::new("left_anti_partial", n), |b| {
192+
b.iter(|| {
193+
let left = make_exec(&left_batches, &s);
194+
let right = make_exec(&right_batches, &s);
195+
do_join(left, right, datafusion_common::JoinType::LeftAnti, &rt)
196+
})
197+
});
198+
}
199+
200+
group.finish();
201+
}
202+
203+
criterion_group!(benches, bench_smj);
204+
criterion_main!(benches);

0 commit comments

Comments
 (0)