diff --git a/src/query/service/tests/it/sql/exec/correlated_subquery_regression.rs b/src/query/service/tests/it/sql/exec/correlated_subquery_regression.rs new file mode 100644 index 0000000000000..157672f770b57 --- /dev/null +++ b/src/query/service/tests/it/sql/exec/correlated_subquery_regression.rs @@ -0,0 +1,68 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_query::interpreters::InterpreterFactory; +use databend_query::sql::Planner; +use databend_query::test_kits::TestFixture; +use futures_util::TryStreamExt; + +async fn execute_query_rows(sql: &str) -> Result { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + + let mut planner = Planner::new(ctx.clone()); + let (plan, _) = planner.plan_sql(sql).await?; + let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; + let stream = interpreter.execute(ctx).await?; + let blocks: Vec = stream.try_collect().await?; + Ok(DataBlock::concat(&blocks)?.num_rows()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn correlated_exists_subquery_over_union_regression() -> anyhow::Result<()> { + let sql = r" + SELECT * + FROM (VALUES (1)) t(f1) + WHERE EXISTS ( + SELECT 1 + UNION + SELECT 2 WHERE f1 = 1 + ); + "; + + let rows = execute_query_rows(sql).await?; + assert_eq!(rows, 1); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn correlated_exists_subquery_over_union_all_regression() -> anyhow::Result<()> { + let sql = r" + SELECT * + FROM (VALUES (1)) t(f1) + WHERE EXISTS ( + SELECT 1 + UNION ALL + SELECT 2 WHERE f1 = 1 + ); + "; + + let rows = execute_query_rows(sql).await?; + assert_eq!(rows, 1); + + Ok(()) +} diff --git a/src/query/service/tests/it/sql/exec/mod.rs b/src/query/service/tests/it/sql/exec/mod.rs index 89e8aac925425..da7a58600e380 100644 --- a/src/query/service/tests/it/sql/exec/mod.rs +++ b/src/query/service/tests/it/sql/exec/mod.rs @@ -167,4 +167,5 @@ pub async fn test_snapshot_consistency() -> anyhow::Result<()> { Ok(()) } +mod correlated_subquery_regression; mod get_table_bind_test; diff --git a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/flatten_plan.rs b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/flatten_plan.rs index cec2e1b4fa876..c331a0c5aa3b7 100644 --- a/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/flatten_plan.rs +++ b/src/query/sql/src/planner/optimizer/optimizers/operator/decorrelate/flatten_plan.rs @@ -855,13 +855,21 @@ impl SubqueryDecorrelatorOptimizer { need_cross_join = true; } + let rel_expr = RelExpr::with_s_expr(subquery); + let left_prop = rel_expr.derive_relational_prop_child(0)?; + let right_prop = rel_expr.derive_relational_prop_child(1)?; + let left_need_cross_join = + need_cross_join || !correlated_columns.is_subset(&left_prop.outer_columns); + let right_need_cross_join = + need_cross_join || !correlated_columns.is_subset(&right_prop.outer_columns); + let mut union_all = union_all.clone(); let left_flatten_plan = self.flatten_plan( outer, subquery.left_child(), correlated_columns, flatten_info, - need_cross_join, + left_need_cross_join, )?; union_all.left_outputs = union_all @@ -877,7 +885,7 @@ impl SubqueryDecorrelatorOptimizer { Ok((new, expr)) }) .chain(correlated_columns.iter().copied().map(|old| { - let new = *self.derived_columns.get(&old).unwrap(); + let new = self.get_derived(old)?; Ok((new, None)) })) .collect::>()?; @@ -888,7 +896,7 @@ impl SubqueryDecorrelatorOptimizer { subquery.right_child(), correlated_columns, flatten_info, - need_cross_join, + right_need_cross_join, )?; union_all.right_outputs = union_all .right_outputs @@ -902,8 +910,8 @@ impl SubqueryDecorrelatorOptimizer { }; Ok((new, expr)) }) - .chain(correlated_columns.iter().map(|old| { - let new = *self.derived_columns.get(old).unwrap(); + .chain(correlated_columns.iter().copied().map(|old| { + let new = self.get_derived(old)?; Ok((new, None)) })) .collect::>()?; diff --git a/src/query/sql/test-support/data/cases/regressions/19574_correlated_exists_union.yaml b/src/query/sql/test-support/data/cases/regressions/19574_correlated_exists_union.yaml new file mode 100644 index 0000000000000..83fb72627d91c --- /dev/null +++ b/src/query/sql/test-support/data/cases/regressions/19574_correlated_exists_union.yaml @@ -0,0 +1,11 @@ +name: "19574_correlated_exists_union" +description: "Correlated EXISTS over UNION should decorrelate without panicking" + +sql: | + SELECT * + FROM (VALUES (1)) t(f1) + WHERE EXISTS ( + SELECT 1 + UNION + SELECT 2 WHERE f1 = 1 + ); diff --git a/src/query/sql/test-support/data/cases/regressions/19574_correlated_exists_union_all.yaml b/src/query/sql/test-support/data/cases/regressions/19574_correlated_exists_union_all.yaml new file mode 100644 index 0000000000000..a14d6d4ff7f60 --- /dev/null +++ b/src/query/sql/test-support/data/cases/regressions/19574_correlated_exists_union_all.yaml @@ -0,0 +1,11 @@ +name: "19574_correlated_exists_union_all" +description: "Correlated EXISTS over UNION ALL should decorrelate without panicking" + +sql: | + SELECT * + FROM (VALUES (1)) t(f1) + WHERE EXISTS ( + SELECT 1 + UNION ALL + SELECT 2 WHERE f1 = 1 + ); diff --git a/src/query/sql/test-support/data/results/regressions/19574_correlated_exists_union_all_optimized.txt b/src/query/sql/test-support/data/results/regressions/19574_correlated_exists_union_all_optimized.txt new file mode 100644 index 0000000000000..519844c59718d --- /dev/null +++ b/src/query/sql/test-support/data/results/regressions/19574_correlated_exists_union_all_optimized.txt @@ -0,0 +1,59 @@ +EvalScalar +├── scalars: [f1 (#0) AS (#0), marker (#7) AS (#8)] +└── Filter + ├── filters: [is_true(marker (#7))] + └── Join(LeftMark) + ├── build keys: [f1 (#0)] + ├── probe keys: [f1 (#6)] + ├── other filters: [] + ├── Exchange(Merge) + │ └── ConstantTableScan + │ ├── columns: [f1 (#0)] + │ └── num_rows: [1] + └── UnionAll + ├── output: [1 (#3), f1 (#6)] + ├── left: [1 (#1), f1 (#4)] + ├── right: [2 (#2), f1 (#5)] + ├── cte_scan_names: [] + ├── logical_recursive_cte_id: None + ├── Join(Cross) + │ ├── build keys: [] + │ ├── probe keys: [] + │ ├── other filters: [] + │ ├── EvalScalar + │ │ ├── scalars: [1 AS (#1)] + │ │ └── DummyTableScan(DummyTableScan { source_table_indexes: [] }) + │ └── Exchange(Merge) + │ └── Aggregate(Final) + │ ├── group items: [f1 (#4) AS (#4)] + │ ├── aggregate functions: [] + │ └── Aggregate(Partial) + │ ├── group items: [f1 (#4) AS (#4)] + │ ├── aggregate functions: [] + │ └── Exchange(Hash) + │ ├── Exchange(Hash): keys: [f1 (#4)] + │ └── ConstantTableScan + │ ├── columns: [f1 (#4)] + │ └── num_rows: [1] + └── EvalScalar + ├── scalars: [2 AS (#2), f1 (#5) AS (#5)] + └── Join(Cross) + ├── build keys: [] + ├── probe keys: [] + ├── other filters: [] + ├── DummyTableScan(DummyTableScan { source_table_indexes: [] }) + └── Exchange(Merge) + └── Aggregate(Final) + ├── group items: [f1 (#5) AS (#5)] + ├── aggregate functions: [] + └── Aggregate(Partial) + ├── group items: [f1 (#5) AS (#5)] + ├── aggregate functions: [] + └── Exchange(Hash) + ├── Exchange(Hash): keys: [f1 (#5)] + └── Filter + ├── filters: [eq(f1 (#5), 1)] + └── ConstantTableScan + ├── columns: [f1 (#5)] + └── num_rows: [1] + diff --git a/src/query/sql/test-support/data/results/regressions/19574_correlated_exists_union_all_raw.txt b/src/query/sql/test-support/data/results/regressions/19574_correlated_exists_union_all_raw.txt new file mode 100644 index 0000000000000..299ff84a9e05b --- /dev/null +++ b/src/query/sql/test-support/data/results/regressions/19574_correlated_exists_union_all_raw.txt @@ -0,0 +1,25 @@ +EvalScalar +├── scalars: [f1 (#0) AS (#0)] +└── Filter + ├── filters: [SUBQUERY AS (#3)] + ├── subquerys + │ └── Subquery (Exists) + │ ├── output_column: 1 (#3) + │ └── UnionAll + │ ├── output: [1 (#3)] + │ ├── left: [1 (#1)] + │ ├── right: [2 (#2)] + │ ├── cte_scan_names: [] + │ ├── logical_recursive_cte_id: None + │ ├── EvalScalar + │ │ ├── scalars: [1 AS (#1)] + │ │ └── DummyTableScan(DummyTableScan { source_table_indexes: [] }) + │ └── EvalScalar + │ ├── scalars: [2 AS (#2)] + │ └── Filter + │ ├── filters: [eq(f1 (#0), 1)] + │ └── DummyTableScan(DummyTableScan { source_table_indexes: [] }) + └── ConstantTableScan + ├── columns: [f1 (#0)] + └── num_rows: [1] + diff --git a/src/query/sql/test-support/data/results/regressions/19574_correlated_exists_union_optimized.txt b/src/query/sql/test-support/data/results/regressions/19574_correlated_exists_union_optimized.txt new file mode 100644 index 0000000000000..4e94aeea7f641 --- /dev/null +++ b/src/query/sql/test-support/data/results/regressions/19574_correlated_exists_union_optimized.txt @@ -0,0 +1,65 @@ +EvalScalar +├── scalars: [f1 (#0) AS (#0), marker (#7) AS (#8)] +└── Filter + ├── filters: [is_true(marker (#7))] + └── Join(RightMark) + ├── build keys: [f1 (#6)] + ├── probe keys: [f1 (#0)] + ├── other filters: [] + ├── Aggregate(Final) + │ ├── group items: [1 (#3) AS (#3), f1 (#6) AS (#6)] + │ ├── aggregate functions: [] + │ └── Aggregate(Partial) + │ ├── group items: [1 (#3) AS (#3), f1 (#6) AS (#6)] + │ ├── aggregate functions: [] + │ └── UnionAll + │ ├── output: [1 (#3), f1 (#6)] + │ ├── left: [1 (#1), f1 (#4)] + │ ├── right: [2 (#2), f1 (#5)] + │ ├── cte_scan_names: [] + │ ├── logical_recursive_cte_id: None + │ ├── Join(Cross) + │ │ ├── build keys: [] + │ │ ├── probe keys: [] + │ │ ├── other filters: [] + │ │ ├── EvalScalar + │ │ │ ├── scalars: [1 AS (#1)] + │ │ │ └── DummyTableScan(DummyTableScan { source_table_indexes: [] }) + │ │ └── Exchange(Merge) + │ │ └── Aggregate(Final) + │ │ ├── group items: [f1 (#4) AS (#4)] + │ │ ├── aggregate functions: [] + │ │ └── Aggregate(Partial) + │ │ ├── group items: [f1 (#4) AS (#4)] + │ │ ├── aggregate functions: [] + │ │ └── Exchange(Hash) + │ │ ├── Exchange(Hash): keys: [f1 (#4)] + │ │ └── ConstantTableScan + │ │ ├── columns: [f1 (#4)] + │ │ └── num_rows: [1] + │ └── EvalScalar + │ ├── scalars: [2 AS (#2), f1 (#5) AS (#5)] + │ └── Join(Cross) + │ ├── build keys: [] + │ ├── probe keys: [] + │ ├── other filters: [] + │ ├── DummyTableScan(DummyTableScan { source_table_indexes: [] }) + │ └── Exchange(Merge) + │ └── Aggregate(Final) + │ ├── group items: [f1 (#5) AS (#5)] + │ ├── aggregate functions: [] + │ └── Aggregate(Partial) + │ ├── group items: [f1 (#5) AS (#5)] + │ ├── aggregate functions: [] + │ └── Exchange(Hash) + │ ├── Exchange(Hash): keys: [f1 (#5)] + │ └── Filter + │ ├── filters: [eq(f1 (#5), 1)] + │ └── ConstantTableScan + │ ├── columns: [f1 (#5)] + │ └── num_rows: [1] + └── Exchange(Merge) + └── ConstantTableScan + ├── columns: [f1 (#0)] + └── num_rows: [1] + diff --git a/src/query/sql/test-support/data/results/regressions/19574_correlated_exists_union_raw.txt b/src/query/sql/test-support/data/results/regressions/19574_correlated_exists_union_raw.txt new file mode 100644 index 0000000000000..4954085ff48cd --- /dev/null +++ b/src/query/sql/test-support/data/results/regressions/19574_correlated_exists_union_raw.txt @@ -0,0 +1,28 @@ +EvalScalar +├── scalars: [f1 (#0) AS (#0)] +└── Filter + ├── filters: [SUBQUERY AS (#3)] + ├── subquerys + │ └── Subquery (Exists) + │ ├── output_column: 1 (#3) + │ └── Aggregate(Initial) + │ ├── group items: [1 (#3) AS (#3)] + │ ├── aggregate functions: [] + │ └── UnionAll + │ ├── output: [1 (#3)] + │ ├── left: [1 (#1)] + │ ├── right: [2 (#2)] + │ ├── cte_scan_names: [] + │ ├── logical_recursive_cte_id: None + │ ├── EvalScalar + │ │ ├── scalars: [1 AS (#1)] + │ │ └── DummyTableScan(DummyTableScan { source_table_indexes: [] }) + │ └── EvalScalar + │ ├── scalars: [2 AS (#2)] + │ └── Filter + │ ├── filters: [eq(f1 (#0), 1)] + │ └── DummyTableScan(DummyTableScan { source_table_indexes: [] }) + └── ConstantTableScan + ├── columns: [f1 (#0)] + └── num_rows: [1] + diff --git a/src/query/sql/tests/it/planner.rs b/src/query/sql/tests/it/planner.rs index 1df4cf2012d30..1311befa9135a 100644 --- a/src/query/sql/tests/it/planner.rs +++ b/src/query/sql/tests/it/planner.rs @@ -108,6 +108,18 @@ const LITE_REPLAY_CASE_SPECS: &[LiteReplayCaseSpec] = &[ optimizer_skip_list: &[], default_node_num: 1, }, + LiteReplayCaseSpec { + name: "19574_correlated_exists_union", + warehouse_distribution: false, + optimizer_skip_list: &[], + default_node_num: 1, + }, + LiteReplayCaseSpec { + name: "19574_correlated_exists_union_all", + warehouse_distribution: false, + optimizer_skip_list: &[], + default_node_num: 1, + }, ]; impl TestCaseRunner for LiteRunner {