Skip to content

Commit a8847e1

Browse files
authored
fix: Support NOT <field> IN (<subquery>) via anti join (#10936)
* fix: rewriting NOT IN (<subquery>) to anti join * add wrapped_not_not_in_subquery test
1 parent f373a86 commit a8847e1

2 files changed

Lines changed: 94 additions & 5 deletions

File tree

datafusion/optimizer/src/decorrelate_predicate_subquery.rs

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
3333
use datafusion_expr::logical_plan::{JoinType, Subquery};
3434
use datafusion_expr::utils::{conjunction, split_conjunction, split_conjunction_owned};
3535
use datafusion_expr::{
36-
exists, in_subquery, not_exists, not_in_subquery, BinaryExpr, Expr, Filter,
36+
exists, in_subquery, not, not_exists, not_in_subquery, BinaryExpr, Expr, Filter,
3737
LogicalPlan, LogicalPlanBuilder, Operator,
3838
};
3939

@@ -79,6 +79,25 @@ impl DecorrelatePredicateSubquery {
7979
let mut others = vec![];
8080
for it in filters.into_iter() {
8181
match it {
82+
Expr::Not(not_expr) => match *not_expr {
83+
Expr::InSubquery(InSubquery {
84+
expr,
85+
subquery,
86+
negated,
87+
}) => {
88+
let new_subquery = self.rewrite_subquery(subquery, config)?;
89+
subqueries.push(SubqueryInfo::new_with_in_expr(
90+
new_subquery,
91+
*expr,
92+
!negated,
93+
));
94+
}
95+
Expr::Exists(Exists { subquery, negated }) => {
96+
let new_subquery = self.rewrite_subquery(subquery, config)?;
97+
subqueries.push(SubqueryInfo::new(new_subquery, !negated));
98+
}
99+
expr => others.push(not(expr)),
100+
},
82101
Expr::InSubquery(InSubquery {
83102
expr,
84103
subquery,
@@ -126,9 +145,17 @@ impl OptimizerRule for DecorrelatePredicateSubquery {
126145
};
127146

128147
// if there are no subqueries in the predicate, return the original plan
129-
let has_subqueries = split_conjunction(&filter.predicate)
130-
.iter()
131-
.any(|expr| matches!(expr, Expr::InSubquery(_) | Expr::Exists(_)));
148+
let has_subqueries =
149+
split_conjunction(&filter.predicate)
150+
.iter()
151+
.any(|expr| match expr {
152+
Expr::Not(not_expr) => {
153+
matches!(not_expr.as_ref(), Expr::InSubquery(_) | Expr::Exists(_))
154+
}
155+
Expr::InSubquery(_) | Expr::Exists(_) => true,
156+
_ => false,
157+
});
158+
132159
if !has_subqueries {
133160
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
134161
}
@@ -351,7 +378,7 @@ mod tests {
351378
use crate::test::*;
352379

353380
use arrow::datatypes::DataType;
354-
use datafusion_expr::{and, binary_expr, col, lit, or, out_ref_col};
381+
use datafusion_expr::{and, binary_expr, col, lit, not, or, out_ref_col};
355382

356383
fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
357384
assert_optimized_plan_eq_display_indent(
@@ -1099,6 +1126,55 @@ mod tests {
10991126
Ok(())
11001127
}
11011128

1129+
#[test]
1130+
fn wrapped_not_in_subquery() -> Result<()> {
1131+
let table_scan = test_table_scan()?;
1132+
let plan = LogicalPlanBuilder::from(table_scan)
1133+
.filter(not(in_subquery(col("c"), test_subquery_with_name("sq")?)))?
1134+
.project(vec![col("test.b")])?
1135+
.build()?;
1136+
1137+
let expected = "Projection: test.b [b:UInt32]\
1138+
\n LeftAnti Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
1139+
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1140+
\n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1141+
\n Projection: sq.c [c:UInt32]\
1142+
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1143+
1144+
assert_optimized_plan_eq_display_indent(
1145+
Arc::new(DecorrelatePredicateSubquery::new()),
1146+
plan,
1147+
expected,
1148+
);
1149+
Ok(())
1150+
}
1151+
1152+
#[test]
1153+
fn wrapped_not_not_in_subquery() -> Result<()> {
1154+
let table_scan = test_table_scan()?;
1155+
let plan = LogicalPlanBuilder::from(table_scan)
1156+
.filter(not(not_in_subquery(
1157+
col("c"),
1158+
test_subquery_with_name("sq")?,
1159+
)))?
1160+
.project(vec![col("test.b")])?
1161+
.build()?;
1162+
1163+
let expected = "Projection: test.b [b:UInt32]\
1164+
\n LeftSemi Join: Filter: test.c = __correlated_sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
1165+
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
1166+
\n SubqueryAlias: __correlated_sq_1 [c:UInt32]\
1167+
\n Projection: sq.c [c:UInt32]\
1168+
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
1169+
1170+
assert_optimized_plan_eq_display_indent(
1171+
Arc::new(DecorrelatePredicateSubquery::new()),
1172+
plan,
1173+
expected,
1174+
);
1175+
Ok(())
1176+
}
1177+
11021178
#[test]
11031179
fn in_subquery_both_side_expr() -> Result<()> {
11041180
let table_scan = test_table_scan()?;

datafusion/sqllogictest/test_files/subquery.slt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,19 @@ where t1.t1_id + 12 not in (
127127
----
128128
22 b 2
129129

130+
# wrapped_not_in_subquery_to_join_with_correlated_outer_filter
131+
query ITI rowsort
132+
select t1.t1_id,
133+
t1.t1_name,
134+
t1.t1_int
135+
from t1
136+
where not t1.t1_id + 12 in (
137+
select t2.t2_id + 1 from t2 where t1.t1_int > 0
138+
)
139+
----
140+
22 b 2
141+
142+
130143
# in subquery with two parentheses, see #5529
131144
query ITI rowsort
132145
select t1.t1_id,

0 commit comments

Comments
 (0)