Skip to content

Commit 0bf9def

Browse files
authored
fix: Fix three bugs in query decorrelation (apache#21208)
## Which issue does this PR close? - Closes apache#21205. - Closes apache#21206. - Closes apache#20315. ## Rationale for this change This PR fixes three separate bugs in query decorrelation: 1. When removing duplicate filters as part of pulling up `IN` subqueries, an operator precedence error meant that we would consider two filters to be duplicates even if they involved different operators (e.g., `=` and `>`). 2. When generating the `CASE` used to implement "count bug" handling, we referenced the subquery output column without qualifying it by the subquery alias. This could result in name-collisions with unrelated identifiers in the parent query. 3. After generating the `CASE` used for "count bug" handling, we rewrote the parent query to replace references to the subquery output column with the generated `CASE` expression. This rewrite only matched on unqualified column name only, which meant that unrelated parent query identifiers that happened to share the same column name as subquery aggregate aliases could been rewritten by mistake. The first and third issues could result in incorrect query results; the second would only cause spurious errors, as far as I can see. ## What changes are included in this PR? * Fix all three bugs * Add SLT tests * Add a `debug_assert!` to document/check that `remove_duplicated_filter` is called with a commutative operator ## Are these changes tested? Yes. ## Are there any user-facing changes? No, except in the sense that they fix user-visible bugs.
1 parent 2d9e268 commit 0bf9def

File tree

3 files changed

+108
-25
lines changed

3 files changed

+108
-25
lines changed

datafusion/optimizer/src/decorrelate.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use crate::simplify_expressions::ExprSimplifier;
2626
use datafusion_common::tree_node::{
2727
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
2828
};
29-
use datafusion_common::{Column, DFSchemaRef, HashMap, Result, ScalarValue, plan_err};
29+
use datafusion_common::{
30+
Column, DFSchemaRef, HashMap, Result, ScalarValue, assert_or_internal_err, plan_err,
31+
};
3032
use datafusion_expr::expr::Alias;
3133
use datafusion_expr::simplify::SimplifyContext;
3234
use datafusion_expr::utils::{
@@ -179,7 +181,7 @@ impl TreeNodeRewriter for PullUpCorrelatedExpr {
179181
find_join_exprs(subquery_filter_exprs)?;
180182
if let Some(in_predicate) = &self.in_predicate_opt {
181183
// in_predicate may be already included in the join filters, remove it from the join filters first.
182-
join_filters = remove_duplicated_filter(join_filters, in_predicate);
184+
join_filters = remove_duplicated_filter(join_filters, in_predicate)?;
183185
}
184186
let correlated_subquery_cols =
185187
collect_subquery_cols(&join_filters, subquery_schema)?;
@@ -460,25 +462,39 @@ fn collect_local_correlated_cols(
460462
}
461463
}
462464

463-
fn remove_duplicated_filter(filters: Vec<Expr>, in_predicate: &Expr) -> Vec<Expr> {
464-
filters
465+
fn remove_duplicated_filter(
466+
filters: Vec<Expr>,
467+
in_predicate: &Expr,
468+
) -> Result<Vec<Expr>> {
469+
// We assume below that swapping the order of operands to an operator does
470+
// not change behavior, which is only true if the operator is commutative.
471+
assert_or_internal_err!(
472+
match in_predicate {
473+
Expr::BinaryExpr(b) => b.op.swap() == Some(b.op),
474+
_ => true,
475+
},
476+
"remove_duplicated_filter: in_predicate must use a commutative operator"
477+
);
478+
479+
Ok(filters
465480
.into_iter()
466481
.filter(|filter| {
467482
if filter == in_predicate {
468483
return false;
469484
}
470485

471-
// ignore the binary order
486+
// Treat swapped operand order to a binary operator as equivalent
472487
!match (filter, in_predicate) {
473488
(Expr::BinaryExpr(a_expr), Expr::BinaryExpr(b_expr)) => {
474-
(a_expr.op == b_expr.op)
475-
&& (a_expr.left == b_expr.left && a_expr.right == b_expr.right)
476-
|| (a_expr.left == b_expr.right && a_expr.right == b_expr.left)
489+
a_expr.op == b_expr.op
490+
&& ((a_expr.left == b_expr.left && a_expr.right == b_expr.right)
491+
|| (a_expr.left == b_expr.right
492+
&& a_expr.right == b_expr.left))
477493
}
478494
_ => false,
479495
}
480496
})
481-
.collect::<Vec<_>>()
497+
.collect::<Vec<_>>())
482498
}
483499

484500
fn agg_exprs_evaluation_result_on_empty_batch(

datafusion/optimizer/src/scalar_subquery_to_join.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
111111
// replace column references with entry in map, if it exists
112112
if let Some(map_expr) = expr
113113
.try_as_col()
114-
.and_then(|col| expr_check_map.get(&col.name))
114+
.and_then(|col| expr_check_map.get(col))
115115
{
116116
Ok(Transformed::yes(map_expr.clone()))
117117
} else {
@@ -176,7 +176,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
176176
// replace column references with entry in map, if it exists
177177
if let Some(map_expr) = expr
178178
.try_as_col()
179-
.and_then(|col| expr_check_map.get(&col.name))
179+
.and_then(|col| expr_check_map.get(col))
180180
{
181181
Ok(Transformed::yes(map_expr.clone()))
182182
} else {
@@ -301,7 +301,7 @@ fn build_join(
301301
subquery: &Subquery,
302302
filter_input: &LogicalPlan,
303303
subquery_alias: &str,
304-
) -> Result<Option<(LogicalPlan, HashMap<String, Expr>)>> {
304+
) -> Result<Option<(LogicalPlan, HashMap<Column, Expr>)>> {
305305
let subquery_plan = subquery.subquery.as_ref();
306306
let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true);
307307
let new_plan = subquery_plan.clone().rewrite(&mut pull_up).data()?;
@@ -358,44 +358,43 @@ fn build_join(
358358
// If expr always returns null when column is null, skip processing
359359
continue;
360360
}
361+
362+
let indicator_col =
363+
Column::new(Some(subquery_alias), UN_MATCHED_ROW_INDICATOR);
364+
// Qualify with the subquery alias to avoid ambiguity when the
365+
// outer table has a column with the same name as the aggregate.
366+
let value_col = Column::new(Some(subquery_alias), name.clone());
367+
361368
let computer_expr = if let Some(filter) = &pull_up.pull_up_having_expr {
362369
Expr::Case(expr::Case {
363370
expr: None,
364371
when_then_expr: vec![
365372
(
366-
Box::new(Expr::IsNull(Box::new(Expr::Column(
367-
Column::new_unqualified(UN_MATCHED_ROW_INDICATOR),
368-
)))),
373+
Box::new(Expr::IsNull(Box::new(Expr::Column(indicator_col)))),
369374
Box::new(result),
370375
),
371376
(
372377
Box::new(Expr::Not(Box::new(filter.clone()))),
373378
Box::new(Expr::Literal(ScalarValue::Null, None)),
374379
),
375380
],
376-
else_expr: Some(Box::new(Expr::Column(Column::new_unqualified(
377-
name.clone(),
378-
)))),
381+
else_expr: Some(Box::new(Expr::Column(value_col.clone()))),
379382
})
380383
} else {
381384
Expr::Case(expr::Case {
382385
expr: None,
383386
when_then_expr: vec![(
384-
Box::new(Expr::IsNull(Box::new(Expr::Column(
385-
Column::new_unqualified(UN_MATCHED_ROW_INDICATOR),
386-
)))),
387+
Box::new(Expr::IsNull(Box::new(Expr::Column(indicator_col)))),
387388
Box::new(result),
388389
)],
389-
else_expr: Some(Box::new(Expr::Column(Column::new_unqualified(
390-
name.clone(),
391-
)))),
390+
else_expr: Some(Box::new(Expr::Column(value_col.clone()))),
392391
})
393392
};
394393
let mut expr_rewrite = TypeCoercionRewriter {
395394
schema: new_plan.schema(),
396395
};
397396
computation_project_expr
398-
.insert(name, computer_expr.rewrite(&mut expr_rewrite).data()?);
397+
.insert(value_col, computer_expr.rewrite(&mut expr_rewrite).data()?);
399398
}
400399
}
401400

datafusion/sqllogictest/test_files/subquery.slt

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1671,3 +1671,71 @@ drop table employees;
16711671

16721672
statement count 0
16731673
drop table project_assignments;
1674+
1675+
# https://github.com/apache/datafusion/issues/21205
1676+
statement ok
1677+
CREATE TABLE dup_filter_t1(id INTEGER) AS VALUES (1), (2), (3);
1678+
1679+
statement ok
1680+
CREATE TABLE dup_filter_t2(id INTEGER) AS VALUES (1), (2), (3);
1681+
1682+
query I
1683+
SELECT * FROM dup_filter_t1 WHERE dup_filter_t1.id IN (
1684+
SELECT dup_filter_t2.id FROM dup_filter_t2 WHERE dup_filter_t2.id > dup_filter_t1.id
1685+
);
1686+
----
1687+
1688+
statement ok
1689+
DROP TABLE dup_filter_t1;
1690+
1691+
statement ok
1692+
DROP TABLE dup_filter_t2;
1693+
1694+
# https://github.com/apache/datafusion/issues/21206
1695+
statement ok
1696+
CREATE TABLE sq_name_t1(id INTEGER) AS VALUES (1), (2), (3);
1697+
1698+
statement ok
1699+
CREATE TABLE sq_name_t2(id INTEGER, outer_id INTEGER) AS VALUES (10, 1), (20, 1), (30, 2);
1700+
1701+
query II
1702+
SELECT sq_name_t1.id,
1703+
(SELECT count(*) AS id FROM sq_name_t2 WHERE sq_name_t2.outer_id = sq_name_t1.id) AS cnt
1704+
FROM sq_name_t1
1705+
ORDER BY sq_name_t1.id;
1706+
----
1707+
1 2
1708+
2 1
1709+
3 0
1710+
1711+
query I
1712+
SELECT sq_name_t1.id
1713+
FROM sq_name_t1
1714+
WHERE sq_name_t1.id > (
1715+
SELECT count(*) AS id
1716+
FROM sq_name_t2
1717+
WHERE sq_name_t2.outer_id = sq_name_t1.id
1718+
)
1719+
ORDER BY sq_name_t1.id;
1720+
----
1721+
2
1722+
3
1723+
1724+
query I
1725+
SELECT sq_name_t1.id * 10 + (
1726+
SELECT count(*) AS id
1727+
FROM sq_name_t2
1728+
WHERE sq_name_t2.outer_id = sq_name_t1.id
1729+
) AS total
1730+
FROM sq_name_t1
1731+
ORDER BY sq_name_t1.id;
1732+
----
1733+
12
1734+
21
1735+
30
1736+
1737+
statement ok
1738+
DROP TABLE sq_name_t1;
1739+
1740+
statement ok
1741+
DROP TABLE sq_name_t2;

0 commit comments

Comments
 (0)