Skip to content

Commit dbd97e6

Browse files
authored
[branch-54] Gate new ScalarSubqueryExec node behind session property (#22530) (#22690)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change This PR backports #22530 ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent d32964e commit dbd97e6

7 files changed

Lines changed: 243 additions & 27 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,6 +1124,22 @@ config_namespace! {
11241124
/// into the file scan phase.
11251125
pub enable_topk_dynamic_filter_pushdown: bool, default = true
11261126

1127+
/// When set to true, uncorrelated scalar subqueries are
1128+
/// left in the logical plan and executed by `ScalarSubqueryExec` during
1129+
/// physical execution. When set to false, all scalar subqueries
1130+
/// (including uncorrelated ones) are rewritten to left joins by the
1131+
/// `ScalarSubqueryToJoin` optimizer rule.
1132+
///
1133+
/// Note disabling this option is not recommended. It restores
1134+
/// pre <https://github.com/apache/datafusion/pull/21240>
1135+
/// behavior, which silently produces incorrect results for
1136+
/// multi-row subqueries and does not support scalar subqueries in
1137+
/// ORDER BY / JOIN ON / aggregate-function arguments. This option is
1138+
/// intended as a temporary escape hatch for distributed execution
1139+
/// frameworks and is planned to be removed in a future DataFusion
1140+
/// release.
1141+
pub enable_physical_uncorrelated_scalar_subquery: bool, default = true
1142+
11271143
/// When set to true, the optimizer will attempt to push down Join dynamic filters
11281144
/// into the file scan phase.
11291145
pub enable_join_dynamic_filter_pushdown: bool, default = true

datafusion/core/src/physical_planner.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,20 @@ impl DefaultPhysicalPlanner {
437437
session_state: &'a SessionState,
438438
) -> futures::future::BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
439439
Box::pin(async move {
440-
let all_subqueries = Self::collect_scalar_subqueries(logical_plan);
440+
// When `enable_physical_uncorrelated_scalar_subquery` is disabled, the
441+
// `ScalarSubqueryToJoin` optimizer rule rewrites all uncorrelated
442+
// scalar subqueries to joins, so none should reach this point.
443+
// Skip collection in that case to avoid creating a no-op
444+
// `ScalarSubqueryExec` wrapper.
445+
let all_subqueries = if session_state
446+
.config_options()
447+
.optimizer
448+
.enable_physical_uncorrelated_scalar_subquery
449+
{
450+
Self::collect_scalar_subqueries(logical_plan)
451+
} else {
452+
Vec::new()
453+
};
441454
let (links, index_map) = self
442455
.plan_scalar_subqueries(all_subqueries, session_state)
443456
.await?;

datafusion/optimizer/src/scalar_subquery_to_join.rs

Lines changed: 112 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! [`ScalarSubqueryToJoin`] rewriting correlated scalar subquery filters to `JOIN`s
18+
//! [`ScalarSubqueryToJoin`] rewriting scalar subquery filters to `JOIN`s
1919
2020
use std::collections::{BTreeSet, HashMap};
2121
use std::sync::Arc;
@@ -36,9 +36,14 @@ use datafusion_expr::logical_plan::{JoinType, Subquery};
3636
use datafusion_expr::utils::conjunction;
3737
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder, lit, not, when};
3838

39-
/// Optimizer rule that rewrites correlated scalar subquery filters to joins and
40-
/// places an additional projection on top of the filter, to preserve the
41-
/// original schema.
39+
/// Optimizer rule that rewrites scalar subquery filters to joins and places an
40+
/// additional projection on top of the filter to preserve the original schema.
41+
///
42+
/// When [`datafusion_common::config::OptimizerOptions::enable_physical_uncorrelated_scalar_subquery`] is
43+
/// true (the default), only *correlated* scalar subqueries are rewritten here;
44+
/// uncorrelated ones are left for physical execution via `ScalarSubqueryExec`.
45+
/// When the option is false, all scalar subqueries — correlated and
46+
/// uncorrelated — are rewritten to left joins by this rule.
4247
#[derive(Default, Debug)]
4348
pub struct ScalarSubqueryToJoin {}
4449

@@ -63,10 +68,12 @@ impl ScalarSubqueryToJoin {
6368
&self,
6469
predicate: &Expr,
6570
alias_gen: &Arc<AliasGenerator>,
71+
physical_uncorrelated: bool,
6672
) -> Result<(Vec<(Subquery, String)>, Expr)> {
6773
let mut extract = ExtractScalarSubQuery {
6874
sub_query_info: vec![],
6975
alias_gen,
76+
physical_uncorrelated,
7077
};
7178
predicate
7279
.clone()
@@ -88,15 +95,23 @@ impl OptimizerRule for ScalarSubqueryToJoin {
8895
) -> Result<Transformed<LogicalPlan>> {
8996
match plan {
9097
LogicalPlan::Filter(filter) => {
98+
let physical_uncorrelated = config
99+
.options()
100+
.optimizer
101+
.enable_physical_uncorrelated_scalar_subquery;
91102
// Optimization: skip the rest of the rule and its copies if
92-
// there are no scalar subqueries
93-
if !contains_correlated_scalar_subquery(&filter.predicate) {
103+
// there are no scalar subqueries this rule should rewrite
104+
if !contains_scalar_subquery_to_rewrite(
105+
&filter.predicate,
106+
physical_uncorrelated,
107+
) {
94108
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
95109
}
96110

97111
let (subqueries, mut rewrite_expr) = self.extract_subquery_exprs(
98112
&filter.predicate,
99113
config.alias_generator(),
114+
physical_uncorrelated,
100115
)?;
101116

102117
assert_or_internal_err!(
@@ -141,13 +156,15 @@ impl OptimizerRule for ScalarSubqueryToJoin {
141156
Ok(Transformed::yes(new_plan))
142157
}
143158
LogicalPlan::Projection(projection) => {
159+
let physical_uncorrelated = config
160+
.options()
161+
.optimizer
162+
.enable_physical_uncorrelated_scalar_subquery;
144163
// Optimization: skip the rest of the rule and its copies if there
145-
// are no correlated scalar subqueries
146-
if !projection
147-
.expr
148-
.iter()
149-
.any(contains_correlated_scalar_subquery)
150-
{
164+
// are no scalar subqueries this rule should rewrite
165+
if !projection.expr.iter().any(|expr| {
166+
contains_scalar_subquery_to_rewrite(expr, physical_uncorrelated)
167+
}) {
151168
return Ok(Transformed::no(LogicalPlan::Projection(projection)));
152169
}
153170

@@ -156,8 +173,11 @@ impl OptimizerRule for ScalarSubqueryToJoin {
156173
let mut rewrite_exprs: Vec<Expr> =
157174
Vec::with_capacity(projection.expr.len());
158175
for (idx, expr) in projection.expr.iter().enumerate() {
159-
let (subqueries, rewrite_expr) =
160-
self.extract_subquery_exprs(expr, config.alias_generator())?;
176+
let (subqueries, rewrite_expr) = self.extract_subquery_exprs(
177+
expr,
178+
config.alias_generator(),
179+
physical_uncorrelated,
180+
)?;
161181
for (_, alias) in &subqueries {
162182
alias_to_index.insert(alias.clone(), idx);
163183
}
@@ -228,29 +248,42 @@ impl OptimizerRule for ScalarSubqueryToJoin {
228248
}
229249
}
230250

231-
/// Returns true if the expression contains a correlated scalar subquery, false
232-
/// otherwise. Uncorrelated scalar subqueries are handled by the physical
233-
/// planner via `ScalarSubqueryExec` and do not need to be converted to joins.
234-
fn contains_correlated_scalar_subquery(expr: &Expr) -> bool {
251+
/// Returns true if the expression contains a scalar subquery that this rule
252+
/// should rewrite to a join.
253+
///
254+
/// When `enable_physical_uncorrelated_scalar_subquery` is true (the default) only
255+
/// correlated scalar subqueries are rewritten — uncorrelated ones are handled
256+
/// by the physical planner via `ScalarSubqueryExec`. When it is false, all
257+
/// scalar subqueries (correlated and uncorrelated) are rewritten.
258+
fn contains_scalar_subquery_to_rewrite(expr: &Expr, physical_uncorrelated: bool) -> bool {
235259
expr.exists(|expr| {
236-
Ok(matches!(expr, Expr::ScalarSubquery(sq) if !sq.outer_ref_columns.is_empty()))
260+
Ok(matches!(
261+
expr,
262+
Expr::ScalarSubquery(sq)
263+
if !physical_uncorrelated || !sq.outer_ref_columns.is_empty()
264+
))
237265
})
238266
.expect("Inner is always Ok")
239267
}
240268

241269
struct ExtractScalarSubQuery<'a> {
242270
sub_query_info: Vec<(Subquery, String)>,
243271
alias_gen: &'a Arc<AliasGenerator>,
272+
physical_uncorrelated: bool,
244273
}
245274

246275
impl TreeNodeRewriter for ExtractScalarSubQuery<'_> {
247276
type Node = Expr;
248277

249278
fn f_down(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
250279
match expr {
251-
// Skip uncorrelated scalar subqueries
280+
// Match scalar subqueries this rule should rewrite to a join. When
281+
// `physical_uncorrelated` is true, only correlated subqueries are
282+
// rewritten — uncorrelated ones are handled later by the physical
283+
// planner. When false, both are rewritten.
252284
Expr::ScalarSubquery(ref subquery)
253-
if !subquery.outer_ref_columns.is_empty() =>
285+
if !self.physical_uncorrelated
286+
|| !subquery.outer_ref_columns.is_empty() =>
254287
{
255288
let subquery = subquery.clone();
256289
let scalar_expr = subquery
@@ -288,9 +321,15 @@ impl TreeNodeRewriter for ExtractScalarSubQuery<'_> {
288321
/// where c.balance > o."avg(total)"
289322
/// ```
290323
///
324+
/// When [`datafusion_common::config::OptimizerOptions::enable_physical_uncorrelated_scalar_subquery`] is
325+
/// false, this function also handles uncorrelated scalar subqueries, rewriting
326+
/// them as a `Left Join: Filter: Boolean(true)` instead of leaving them for
327+
/// `ScalarSubqueryExec`.
328+
///
291329
/// # Arguments
292330
///
293-
/// * `subquery` - The correlated scalar subquery to decorrelate.
331+
/// * `subquery` - The scalar subquery to rewrite (correlated, or uncorrelated
332+
/// when `enable_physical_uncorrelated_scalar_subquery` is false).
294333
/// * `outer_input` - The outer plan that the decorrelated subquery is
295334
/// left-joined onto — the input of the `Filter` or `Projection` node
296335
/// that contained the subquery.
@@ -308,10 +347,9 @@ fn build_join(
308347
outer_input: &LogicalPlan,
309348
subquery_alias: &str,
310349
) -> Result<Option<(LogicalPlan, HashMap<Column, Expr>)>> {
311-
assert_or_internal_err!(
312-
!subquery.outer_ref_columns.is_empty(),
313-
"build_join should only be called for correlated subqueries"
314-
);
350+
// `build_join` also handles uncorrelated scalar subqueries (as a left
351+
// join with `Boolean(true)`) when the
352+
// `enable_physical_uncorrelated_scalar_subquery` option is disabled.
315353
let subquery_plan = subquery.subquery.as_ref();
316354
let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true);
317355
let decorrelated_subquery = subquery_plan.clone().rewrite(&mut pull_up).data()?;
@@ -1159,4 +1197,52 @@ mod tests {
11591197
"
11601198
)
11611199
}
1200+
1201+
#[test]
1202+
fn uncorrelated_scalar_subquery_rewritten_when_flag_off() -> Result<()> {
1203+
use datafusion_common::config::ConfigOptions;
1204+
1205+
let sq = Arc::new(
1206+
LogicalPlanBuilder::from(scan_tpch_table("orders"))
1207+
.aggregate(Vec::<Expr>::new(), vec![max(col("orders.o_custkey"))])?
1208+
.project(vec![max(col("orders.o_custkey"))])?
1209+
.build()?,
1210+
);
1211+
1212+
let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
1213+
.filter(col("customer.c_custkey").eq(scalar_subquery(sq)))?
1214+
.project(vec![col("customer.c_custkey")])?
1215+
.build()?;
1216+
1217+
let mut options = ConfigOptions::default();
1218+
options
1219+
.optimizer
1220+
.enable_physical_uncorrelated_scalar_subquery = false;
1221+
let context = crate::OptimizerContext::new_with_config_options(Arc::new(options));
1222+
1223+
let rule: Arc<dyn OptimizerRule + Send + Sync> =
1224+
Arc::new(ScalarSubqueryToJoin::new());
1225+
let optimizer = crate::Optimizer::with_rules(vec![rule]);
1226+
let optimized_plan = optimizer
1227+
.optimize(plan, &context, |_, _| {})
1228+
.expect("failed to optimize plan");
1229+
let formatted_plan = optimized_plan.display_indent_schema();
1230+
1231+
insta::assert_snapshot!(
1232+
formatted_plan,
1233+
@r"
1234+
Projection: customer.c_custkey [c_custkey:Int64]
1235+
Projection: customer.c_custkey, customer.c_name [c_custkey:Int64, c_name:Utf8]
1236+
Filter: customer.c_custkey = __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]
1237+
Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]
1238+
TableScan: customer [c_custkey:Int64, c_name:Utf8]
1239+
SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N]
1240+
Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N]
1241+
Aggregate: groupBy=[[]], aggr=[[max(orders.o_custkey)]] [max(orders.o_custkey):Int64;N]
1242+
TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
1243+
"
1244+
);
1245+
1246+
Ok(())
1247+
}
11621248
}

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true
303303
datafusion.optimizer.enable_dynamic_filter_pushdown true
304304
datafusion.optimizer.enable_join_dynamic_filter_pushdown true
305305
datafusion.optimizer.enable_leaf_expression_pushdown true
306+
datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery true
306307
datafusion.optimizer.enable_piecewise_merge_join false
307308
datafusion.optimizer.enable_round_robin_repartition true
308309
datafusion.optimizer.enable_sort_pushdown true
@@ -453,6 +454,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to tru
453454
datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
454455
datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase.
455456
datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes.
457+
datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery true When set to true, uncorrelated scalar subqueries are left in the logical plan and executed by `ScalarSubqueryExec` during physical execution. When set to false, all scalar subqueries (including uncorrelated ones) are rewritten to left joins by the `ScalarSubqueryToJoin` optimizer rule. Note disabling this option is not recommended. It restores pre <https://github.com/apache/datafusion/pull/21240> behavior, which silently produces incorrect results for multi-row subqueries and does not support scalar subqueries in ORDER BY / JOIN ON / aggregate-function arguments. This option is intended as a temporary escape hatch for distributed execution frameworks and is planned to be removed in a future DataFusion release.
456458
datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter.
457459
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
458460
datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true

datafusion/sqllogictest/test_files/subquery.slt

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2091,6 +2091,95 @@ SELECT (SELECT v FROM (SELECT 1 AS v UNION ALL SELECT 2) AS t ORDER BY v LIMIT 1
20912091
----
20922092
1
20932093

2094+
#############
2095+
## End-to-end correctness coverage for the flag-off path.
2096+
## When `datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery` is false,
2097+
## uncorrelated scalar subqueries are rewritten to left joins by
2098+
## `ScalarSubqueryToJoin` instead of executed by `ScalarSubqueryExec`. This
2099+
## restores pre-PR-21240 behavior, which has two known shortcomings the
2100+
## physical-execution path was built to fix: multi-row subqueries silently
2101+
## return wrong results, and uncorrelated scalar subqueries do not work in
2102+
## ORDER BY / JOIN ON / aggregate-function arguments. Those cases are
2103+
## intentionally not covered here; the queries below are the ones where both
2104+
## paths agree.
2105+
#############
2106+
2107+
statement ok
2108+
set datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery = false;
2109+
2110+
# Scalar subquery returning exactly one row → success
2111+
query I
2112+
SELECT (SELECT v FROM sq_values LIMIT 1);
2113+
----
2114+
1
2115+
2116+
# Scalar subquery returning exactly one row in WHERE → success
2117+
query I rowsort
2118+
SELECT x FROM sq_main WHERE x > (SELECT v FROM sq_values LIMIT 1);
2119+
----
2120+
10
2121+
20
2122+
2123+
# Scalar subquery returning zero rows → NULL
2124+
query I
2125+
SELECT (SELECT v FROM sq_empty);
2126+
----
2127+
NULL
2128+
2129+
# Scalar subquery returning zero rows in arithmetic → NULL propagation
2130+
query I
2131+
SELECT x + (SELECT v FROM sq_empty) FROM sq_main;
2132+
----
2133+
NULL
2134+
NULL
2135+
2136+
# Scalar subquery returning zero rows in WHERE comparison → no matching rows
2137+
query I
2138+
SELECT x FROM sq_main WHERE x > (SELECT v FROM sq_empty);
2139+
----
2140+
2141+
# Aggregated subquery always returns one row, even on empty input → success
2142+
query I
2143+
SELECT (SELECT count(*) FROM sq_empty);
2144+
----
2145+
0
2146+
2147+
# Aggregated subquery on multi-row table → success
2148+
query I
2149+
SELECT (SELECT max(v) FROM sq_values);
2150+
----
2151+
3
2152+
2153+
# HAVING clause with uncorrelated scalar subquery
2154+
query II rowsort
2155+
SELECT x, count(*) AS cnt FROM sq_main GROUP BY x
2156+
HAVING count(*) > (SELECT min(v) FROM sq_values);
2157+
----
2158+
2159+
# CASE WHEN with uncorrelated scalar subquery as condition
2160+
query T rowsort
2161+
SELECT CASE WHEN x > (SELECT min(v) FROM sq_values)
2162+
THEN 'big' ELSE 'small' END AS label
2163+
FROM sq_main;
2164+
----
2165+
big
2166+
big
2167+
2168+
# Doubly-nested constant subquery
2169+
query I
2170+
SELECT (SELECT (SELECT 42));
2171+
----
2172+
42
2173+
2174+
# NULL comparison semantics through subquery boundary
2175+
query B
2176+
SELECT 1 = (SELECT CAST(NULL AS INT));
2177+
----
2178+
NULL
2179+
2180+
statement ok
2181+
RESET datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery;
2182+
20942183
statement count 0
20952184
DROP TABLE sq_values;
20962185

0 commit comments

Comments
 (0)