Skip to content

Commit f4eb943

Browse files
committed
add datafusion/optimizer/benches/unions_to_filter.rs
1 parent c62e482 commit f4eb943

2 files changed

Lines changed: 194 additions & 0 deletions

File tree

datafusion/optimizer/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,7 @@ insta = { workspace = true }
7979
[[bench]]
8080
name = "projection_unnecessary"
8181
harness = false
82+
83+
[[bench]]
84+
name = "unions_to_filter"
85+
harness = false
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Microbenchmarks for the [`UnionsToFilter`] optimizer rule.
19+
//!
20+
//! Three scenarios are covered:
21+
//!
22+
//! 1. **merge** – N branches over the *same* table, each with a simple
23+
//! equality filter. All branches should be merged into a single
24+
//! `DISTINCT(Filter(OR …))` plan.
25+
//!
26+
//! 2. **no_merge** – N branches over *different* tables. The rule must
27+
//! recognise that no merge is possible and leave the plan unchanged.
28+
//! This exercises the "cold path" without any rewrite work.
29+
//!
30+
//! 3. **merge_with_projection** – N branches over the same table but each
31+
//! branch wraps the filter in a `Projection`. This exercises the wrapper-
32+
//! peeling and re-wrapping paths in addition to the core merge logic.
33+
//!
34+
//! To generate a flamegraph (requires `cargo-flamegraph`):
35+
//! ```text
36+
//! cargo flamegraph -p datafusion-optimizer --bench unions_to_filter \
37+
//! --flamechart --root --profile profiling --freq 1000 -- --bench
38+
//! ```
39+
40+
use arrow::datatypes::{DataType, Field, Schema};
41+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
42+
use datafusion_common::config::ConfigOptions;
43+
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, logical_plan::table_scan};
44+
use datafusion_expr::{col, lit};
45+
use datafusion_optimizer::OptimizerContext;
46+
use datafusion_optimizer::unions_to_filter::UnionsToFilter;
47+
use datafusion_optimizer::{Optimizer, OptimizerRule};
48+
use std::hint::black_box;
49+
use std::sync::Arc;
50+
51+
// ---------------------------------------------------------------------------
52+
// Helpers
53+
// ---------------------------------------------------------------------------
54+
55+
/// Build a three-column table scan for `name`.
56+
fn scan(name: &str) -> LogicalPlan {
57+
let schema = Schema::new(vec![
58+
Field::new("a", DataType::Int32, false),
59+
Field::new("b", DataType::Int32, false),
60+
Field::new("c", DataType::Int32, false),
61+
]);
62+
table_scan(Some(name), &schema, None)
63+
.unwrap()
64+
.build()
65+
.unwrap()
66+
}
67+
68+
/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches all filter over
69+
/// the *same* table (`t`), so the rule can merge them.
70+
fn build_merge_plan(n: usize) -> LogicalPlan {
71+
assert!(n >= 2);
72+
let mut builder: Option<LogicalPlanBuilder> = None;
73+
for i in 0..n {
74+
let branch = LogicalPlanBuilder::from(scan("t"))
75+
.filter(col("a").eq(lit(i as i32)))
76+
.unwrap()
77+
.build()
78+
.unwrap();
79+
builder = Some(match builder {
80+
None => LogicalPlanBuilder::from(branch),
81+
Some(b) => b.union(branch).unwrap(),
82+
});
83+
}
84+
builder.unwrap().distinct().unwrap().build().unwrap()
85+
}
86+
87+
/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches each filter over a
88+
/// *different* table, so no merge is possible.
89+
fn build_no_merge_plan(n: usize) -> LogicalPlan {
90+
assert!(n >= 2);
91+
let mut builder: Option<LogicalPlanBuilder> = None;
92+
for i in 0..n {
93+
let branch = LogicalPlanBuilder::from(scan(&format!("t{i}")))
94+
.filter(col("a").eq(lit(i as i32)))
95+
.unwrap()
96+
.build()
97+
.unwrap();
98+
builder = Some(match builder {
99+
None => LogicalPlanBuilder::from(branch),
100+
Some(b) => b.union(branch).unwrap(),
101+
});
102+
}
103+
builder.unwrap().distinct().unwrap().build().unwrap()
104+
}
105+
106+
/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches each wrap the
107+
/// filter inside a `Projection` over the *same* table.
108+
fn build_merge_with_projection_plan(n: usize) -> LogicalPlan {
109+
assert!(n >= 2);
110+
let mut builder: Option<LogicalPlanBuilder> = None;
111+
for i in 0..n {
112+
let branch = LogicalPlanBuilder::from(scan("t"))
113+
.filter(col("a").eq(lit(i as i32)))
114+
.unwrap()
115+
.project(vec![col("a"), col("b")])
116+
.unwrap()
117+
.build()
118+
.unwrap();
119+
builder = Some(match builder {
120+
None => LogicalPlanBuilder::from(branch),
121+
Some(b) => b.union(branch).unwrap(),
122+
});
123+
}
124+
builder.unwrap().distinct().unwrap().build().unwrap()
125+
}
126+
127+
/// Run the [`UnionsToFilter`] rule through the full [`Optimizer`] pipeline
128+
/// (single pass, feature flag enabled).
129+
fn run_optimizer(plan: &LogicalPlan, ctx: &OptimizerContext) -> LogicalPlan {
130+
let rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> =
131+
vec![Arc::new(UnionsToFilter::new())];
132+
Optimizer::with_rules(rules)
133+
.optimize(plan.clone(), ctx, |_, _| {})
134+
.unwrap()
135+
}
136+
137+
// ---------------------------------------------------------------------------
138+
// Benchmark functions
139+
// ---------------------------------------------------------------------------
140+
141+
fn bench_merge(c: &mut Criterion) {
142+
let mut options = ConfigOptions::default();
143+
options.optimizer.enable_unions_to_filter = true;
144+
let ctx = OptimizerContext::new_with_config_options(Arc::new(options))
145+
.with_max_passes(1);
146+
147+
let mut group = c.benchmark_group("unions_to_filter/merge");
148+
for n in [2, 8, 32, 128] {
149+
let plan = build_merge_plan(n);
150+
group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| {
151+
b.iter(|| black_box(run_optimizer(p, &ctx)));
152+
});
153+
}
154+
group.finish();
155+
}
156+
157+
fn bench_no_merge(c: &mut Criterion) {
158+
let mut options = ConfigOptions::default();
159+
options.optimizer.enable_unions_to_filter = true;
160+
let ctx = OptimizerContext::new_with_config_options(Arc::new(options))
161+
.with_max_passes(1);
162+
163+
let mut group = c.benchmark_group("unions_to_filter/no_merge");
164+
for n in [2, 8, 32, 128] {
165+
let plan = build_no_merge_plan(n);
166+
group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| {
167+
b.iter(|| black_box(run_optimizer(p, &ctx)));
168+
});
169+
}
170+
group.finish();
171+
}
172+
173+
fn bench_merge_with_projection(c: &mut Criterion) {
174+
let mut options = ConfigOptions::default();
175+
options.optimizer.enable_unions_to_filter = true;
176+
let ctx = OptimizerContext::new_with_config_options(Arc::new(options))
177+
.with_max_passes(1);
178+
179+
let mut group = c.benchmark_group("unions_to_filter/merge_with_projection");
180+
for n in [2, 8, 32, 128] {
181+
let plan = build_merge_with_projection_plan(n);
182+
group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| {
183+
b.iter(|| black_box(run_optimizer(p, &ctx)));
184+
});
185+
}
186+
group.finish();
187+
}
188+
189+
criterion_group!(benches, bench_merge, bench_no_merge, bench_merge_with_projection);
190+
criterion_main!(benches);

0 commit comments

Comments
 (0)