Skip to content

Commit 6e58d74

Browse files
xiedeyantucompheadCopilot
authored
Add configurable UNION DISTINCT to FILTER rewrite optimization (#21075)
- ## Which issue does this PR close? - Closes #21310. ## Rationale for this change This PR adds a configurable optimizer rewrite for `UNION DISTINCT` queries. The goal is to allow the optimizer to collapse eligible union branches into a single filtered scan when the branches read from the same source and differ only by filter predicates. This optimization can reduce duplicated work and avoid scanning the same input multiple times. Keeping it behind a configuration flag makes the behavior explicit and safe to enable only when desired. ## What changes are included in this PR? - Adds a new optimizer configuration option: `datafusion.optimizer.enable_unions_to_filter`, which is disabled by default. - Enables the `UnionsToFilter` optimization rule in the logical optimizer pipeline. - Adds documentation for the new configuration option, including plan-shape examples. - Extends sqllogictest coverage in `datafusion/sqllogictest/test_files/union.slt` to cover both the disabled and enabled cases. - Verifies that the rewrite only applies to eligible `UNION DISTINCT` queries. ### Example rewrite When the rule is enabled, a query such as: SQL ``` SELECT id, name FROM t1 WHERE id = 1 UNION SELECT id, name FROM t1 WHERE id = 2 ``` may be rewritten into an equivalent plan that scans `t1` once and applies a combined filter such as: SQL ``` SELECT id, name FROM t1 WHERE id = 1 OR id = 2 ``` This keeps the results unchanged while avoiding repeated reads from the same source. ## Are these changes tested? Yes. The new behavior is covered by sqllogictest cases that validate both plan variants: - the original `UNION DISTINCT` execution path when the option is disabled - the rewritten single-scan plan when the option is enabled ## Are there any user-facing changes? Yes. A new configuration option is introduced: - `datafusion.optimizer.enable_unions_to_filter` When enabled, some `UNION DISTINCT` queries may be optimized into a different plan shape. Query results remain the same, but the execution plan may change. --------- Co-authored-by: Oleks V <comphead@users.noreply.github.com> Co-authored-by: Copilot <copilot@github.com>
1 parent d379241 commit 6e58d74

12 files changed

Lines changed: 971 additions & 23 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,6 +1348,13 @@ config_namespace! {
13481348
/// closer to the leaf table scans, and push those projections down
13491349
/// towards the leaf nodes.
13501350
pub enable_leaf_expression_pushdown: bool, default = true
1351+
1352+
/// When set to true, the logical optimizer will rewrite `UNION DISTINCT` branches that
1353+
/// read from the same source and differ only by filter predicates into a single branch
1354+
/// with a combined filter. This optimization is conservative and only applies when the
1355+
/// branches share the same source and compatible wrapper nodes such as identical
1356+
/// projections or aliases.
1357+
pub enable_unions_to_filter: bool, default = false
13511358
}
13521359
}
13531360

datafusion/core/src/optimizer_rule_reference.md

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,28 +39,29 @@ Rule order matters. The default pipeline may change between releases.
3939
| ----- | ----------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- |
4040
| 1 | `rewrite_set_comparison` | Rewrites `ANY` and `ALL` set-comparison subqueries into `EXISTS`-based boolean expressions with correct SQL NULL semantics. |
4141
| 2 | `optimize_unions` | Flattens nested unions and removes unions with a single input. |
42-
| 3 | `simplify_expressions` | Constant-folds and simplifies expressions while preserving output names. |
43-
| 4 | `replace_distinct_aggregate` | Rewrites `DISTINCT` and `DISTINCT ON` operators into aggregate-based plans that later rules can optimize further. |
44-
| 5 | `eliminate_join` | Replaces keyless inner joins with a literal `false` filter by an empty relation. |
45-
| 6 | `decorrelate_predicate_subquery` | Converts eligible `IN` and `EXISTS` predicate subqueries into semi or anti joins. |
46-
| 7 | `scalar_subquery_to_join` | Rewrites eligible scalar subqueries into joins and adds schema-preserving projections. |
47-
| 8 | `decorrelate_lateral_join` | Rewrites eligible lateral joins into regular joins. |
48-
| 9 | `extract_equijoin_predicate` | Splits join filters into equijoin keys and residual predicates. |
49-
| 10 | `eliminate_duplicated_expr` | Removes duplicate expressions from projections, aggregates, and similar operators. |
50-
| 11 | `eliminate_filter` | Drops always-true filters and replaces always-false or NULL filters with empty relations. |
51-
| 12 | `eliminate_cross_join` | Uses filter predicates to replace cross joins with inner joins when join keys can be found. |
52-
| 13 | `eliminate_limit` | Removes no-op limits and simplifies trivial limit shapes. |
53-
| 14 | `propagate_empty_relation` | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. |
54-
| 15 | `filter_null_join_keys` | Adds `IS NOT NULL` filters to nullable equijoin keys that can never match. |
55-
| 16 | `eliminate_outer_join` | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. |
56-
| 17 | `push_down_limit` | Moves literal limits closer to scans and unions and merges adjacent limits. |
57-
| 18 | `push_down_filter` | Moves filters as early as possible through filter-commutative operators. |
58-
| 19 | `single_distinct_aggregation_to_group_by` | Rewrites single-column `DISTINCT` aggregations into two-stage `GROUP BY` plans. |
59-
| 20 | `eliminate_group_by_constant` | Removes constant or functionally redundant expressions from `GROUP BY`. |
60-
| 21 | `common_sub_expression_eliminate` | Computes repeated subexpressions once and reuses the result. |
61-
| 22 | `extract_leaf_expressions` | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. |
62-
| 23 | `push_down_leaf_projections` | Pushes the helper projections created by leaf extraction toward leaf inputs. |
63-
| 24 | `optimize_projections` | Prunes unused columns and removes unnecessary logical projections. |
42+
| 3 | `unions_to_filter` | Merges `UNION DISTINCT` branches that share the same source into a single filtered branch with a disjunctive predicate. |
43+
| 4 | `simplify_expressions` | Constant-folds and simplifies expressions while preserving output names. |
44+
| 5 | `replace_distinct_aggregate` | Rewrites `DISTINCT` and `DISTINCT ON` operators into aggregate-based plans that later rules can optimize further. |
45+
| 6 | `eliminate_join` | Replaces keyless inner joins with a literal `false` filter by an empty relation. |
46+
| 7 | `decorrelate_predicate_subquery` | Converts eligible `IN` and `EXISTS` predicate subqueries into semi or anti joins. |
47+
| 8 | `scalar_subquery_to_join` | Rewrites eligible scalar subqueries into joins and adds schema-preserving projections. |
48+
| 9 | `decorrelate_lateral_join` | Rewrites eligible lateral joins into regular joins. |
49+
| 10 | `extract_equijoin_predicate` | Splits join filters into equijoin keys and residual predicates. |
50+
| 11 | `eliminate_duplicated_expr` | Removes duplicate expressions from projections, aggregates, and similar operators. |
51+
| 12 | `eliminate_filter` | Drops always-true filters and replaces always-false or NULL filters with empty relations. |
52+
| 13 | `eliminate_cross_join` | Uses filter predicates to replace cross joins with inner joins when join keys can be found. |
53+
| 14 | `eliminate_limit` | Removes no-op limits and simplifies trivial limit shapes. |
54+
| 15 | `propagate_empty_relation` | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. |
55+
| 16 | `filter_null_join_keys` | Adds `IS NOT NULL` filters to nullable equijoin keys that can never match. |
56+
| 17 | `eliminate_outer_join` | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. |
57+
| 18 | `push_down_limit` | Moves literal limits closer to scans and unions and merges adjacent limits. |
58+
| 19 | `push_down_filter` | Moves filters as early as possible through filter-commutative operators. |
59+
| 20 | `single_distinct_aggregation_to_group_by` | Rewrites single-column `DISTINCT` aggregations into two-stage `GROUP BY` plans. |
60+
| 21 | `eliminate_group_by_constant` | Removes constant or functionally redundant expressions from `GROUP BY`. |
61+
| 22 | `common_sub_expression_eliminate` | Computes repeated subexpressions once and reuses the result. |
62+
| 23 | `extract_leaf_expressions` | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. |
63+
| 24 | `push_down_leaf_projections` | Pushes the helper projections created by leaf extraction toward leaf inputs. |
64+
| 25 | `optimize_projections` | Prunes unused columns and removes unnecessary logical projections. |
6465

6566
### Physical Optimizer Rules
6667

datafusion/optimizer/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,7 @@ harness = false
8383
[[bench]]
8484
name = "optimize_projections"
8585
harness = false
86+
87+
[[bench]]
88+
name = "unions_to_filter"
89+
harness = false
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Microbenchmarks for the [`UnionsToFilter`] optimizer rule.
19+
//!
20+
//! Three scenarios are covered:
21+
//!
22+
//! 1. **merge** – N branches over the *same* table, each with a simple
23+
//! equality filter. All branches should be merged into a single
24+
//! `DISTINCT(Filter(OR …))` plan.
25+
//!
26+
//! 2. **no_merge** – N branches over *different* tables. The rule must
27+
//! recognise that no merge is possible and leave the plan unchanged.
28+
//! This exercises the "cold path" without any rewrite work.
29+
//!
30+
//! 3. **merge_with_projection** – N branches over the same table but each
31+
//! branch wraps the filter in a `Projection`. This exercises the wrapper-
32+
//! peeling and re-wrapping paths in addition to the core merge logic.
33+
//!
34+
//! To generate a flamegraph (requires `cargo-flamegraph`):
35+
//! ```text
36+
//! cargo flamegraph -p datafusion-optimizer --bench unions_to_filter \
37+
//! --flamechart --root --profile profiling --freq 1000 -- --bench
38+
//! ```
39+
40+
use arrow::datatypes::{DataType, Field, Schema};
41+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
42+
use datafusion_common::config::ConfigOptions;
43+
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, logical_plan::table_scan};
44+
use datafusion_expr::{col, lit};
45+
use datafusion_optimizer::OptimizerContext;
46+
use datafusion_optimizer::unions_to_filter::UnionsToFilter;
47+
use datafusion_optimizer::{Optimizer, OptimizerRule};
48+
use std::hint::black_box;
49+
use std::sync::Arc;
50+
51+
// ---------------------------------------------------------------------------
52+
// Helpers
53+
// ---------------------------------------------------------------------------
54+
55+
/// Build a three-column table scan for `name`.
56+
fn scan(name: &str) -> LogicalPlan {
57+
let schema = Schema::new(vec![
58+
Field::new("a", DataType::Int32, false),
59+
Field::new("b", DataType::Int32, false),
60+
Field::new("c", DataType::Int32, false),
61+
]);
62+
table_scan(Some(name), &schema, None)
63+
.unwrap()
64+
.build()
65+
.unwrap()
66+
}
67+
68+
/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches all filter over
69+
/// the *same* table (`t`), so the rule can merge them.
70+
fn build_merge_plan(n: usize) -> LogicalPlan {
71+
assert!(n >= 2);
72+
let mut builder: Option<LogicalPlanBuilder> = None;
73+
for i in 0..n {
74+
let branch = LogicalPlanBuilder::from(scan("t"))
75+
.filter(col("a").eq(lit(i as i32)))
76+
.unwrap()
77+
.build()
78+
.unwrap();
79+
builder = Some(match builder {
80+
None => LogicalPlanBuilder::from(branch),
81+
Some(b) => b.union(branch).unwrap(),
82+
});
83+
}
84+
builder.unwrap().distinct().unwrap().build().unwrap()
85+
}
86+
87+
/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches each filter over a
88+
/// *different* table, so no merge is possible.
89+
fn build_no_merge_plan(n: usize) -> LogicalPlan {
90+
assert!(n >= 2);
91+
let mut builder: Option<LogicalPlanBuilder> = None;
92+
for i in 0..n {
93+
let branch = LogicalPlanBuilder::from(scan(&format!("t{i}")))
94+
.filter(col("a").eq(lit(i as i32)))
95+
.unwrap()
96+
.build()
97+
.unwrap();
98+
builder = Some(match builder {
99+
None => LogicalPlanBuilder::from(branch),
100+
Some(b) => b.union(branch).unwrap(),
101+
});
102+
}
103+
builder.unwrap().distinct().unwrap().build().unwrap()
104+
}
105+
106+
/// Build a `DISTINCT (UNION ALL …)` plan whose `n` branches each wrap the
107+
/// filter inside a `Projection` over the *same* table.
108+
fn build_merge_with_projection_plan(n: usize) -> LogicalPlan {
109+
assert!(n >= 2);
110+
let mut builder: Option<LogicalPlanBuilder> = None;
111+
for i in 0..n {
112+
let branch = LogicalPlanBuilder::from(scan("t"))
113+
.filter(col("a").eq(lit(i as i32)))
114+
.unwrap()
115+
.project(vec![col("a"), col("b")])
116+
.unwrap()
117+
.build()
118+
.unwrap();
119+
builder = Some(match builder {
120+
None => LogicalPlanBuilder::from(branch),
121+
Some(b) => b.union(branch).unwrap(),
122+
});
123+
}
124+
builder.unwrap().distinct().unwrap().build().unwrap()
125+
}
126+
127+
/// Run the [`UnionsToFilter`] rule through the full [`Optimizer`] pipeline
128+
/// (single pass, feature flag enabled).
129+
fn run_optimizer(plan: &LogicalPlan, ctx: &OptimizerContext) -> LogicalPlan {
130+
let rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> =
131+
vec![Arc::new(UnionsToFilter::new())];
132+
Optimizer::with_rules(rules)
133+
.optimize(plan.clone(), ctx, |_, _| {})
134+
.unwrap()
135+
}
136+
137+
// ---------------------------------------------------------------------------
138+
// Benchmark functions
139+
// ---------------------------------------------------------------------------
140+
141+
fn bench_merge(c: &mut Criterion) {
142+
let mut options = ConfigOptions::default();
143+
options.optimizer.enable_unions_to_filter = true;
144+
let ctx =
145+
OptimizerContext::new_with_config_options(Arc::new(options)).with_max_passes(1);
146+
147+
let mut group = c.benchmark_group("unions_to_filter/merge");
148+
for n in [2, 8, 32, 128] {
149+
let plan = build_merge_plan(n);
150+
group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| {
151+
b.iter(|| black_box(run_optimizer(p, &ctx)));
152+
});
153+
}
154+
group.finish();
155+
}
156+
157+
fn bench_no_merge(c: &mut Criterion) {
158+
let mut options = ConfigOptions::default();
159+
options.optimizer.enable_unions_to_filter = true;
160+
let ctx =
161+
OptimizerContext::new_with_config_options(Arc::new(options)).with_max_passes(1);
162+
163+
let mut group = c.benchmark_group("unions_to_filter/no_merge");
164+
for n in [2, 8, 32, 128] {
165+
let plan = build_no_merge_plan(n);
166+
group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| {
167+
b.iter(|| black_box(run_optimizer(p, &ctx)));
168+
});
169+
}
170+
group.finish();
171+
}
172+
173+
fn bench_merge_with_projection(c: &mut Criterion) {
174+
let mut options = ConfigOptions::default();
175+
options.optimizer.enable_unions_to_filter = true;
176+
let ctx =
177+
OptimizerContext::new_with_config_options(Arc::new(options)).with_max_passes(1);
178+
179+
let mut group = c.benchmark_group("unions_to_filter/merge_with_projection");
180+
for n in [2, 8, 32, 128] {
181+
let plan = build_merge_with_projection_plan(n);
182+
group.bench_with_input(BenchmarkId::from_parameter(n), &plan, |b, p| {
183+
b.iter(|| black_box(run_optimizer(p, &ctx)));
184+
});
185+
}
186+
group.finish();
187+
}
188+
189+
criterion_group!(
190+
benches,
191+
bench_merge,
192+
bench_no_merge,
193+
bench_merge_with_projection
194+
);
195+
criterion_main!(benches);

datafusion/optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ pub mod rewrite_set_comparison;
7070
pub mod scalar_subquery_to_join;
7171
pub mod simplify_expressions;
7272
pub mod single_distinct_to_groupby;
73+
pub mod unions_to_filter;
7374
pub mod utils;
7475

7576
#[cfg(test)]

datafusion/optimizer/src/optimizer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use crate::rewrite_set_comparison::RewriteSetComparison;
5656
use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
5757
use crate::simplify_expressions::SimplifyExpressions;
5858
use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
59+
use crate::unions_to_filter::UnionsToFilter;
5960
use crate::utils::log_plan;
6061

6162
/// Transforms one [`LogicalPlan`] into another which computes the same results,
@@ -280,6 +281,7 @@ impl Optimizer {
280281
let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
281282
Arc::new(RewriteSetComparison::new()),
282283
Arc::new(OptimizeUnions::new()),
284+
Arc::new(UnionsToFilter::new()),
283285
Arc::new(SimplifyExpressions::new()),
284286
Arc::new(ReplaceDistinctWithAggregate::new()),
285287
Arc::new(EliminateJoin::new()),

0 commit comments

Comments
 (0)