Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_query::interpreters::InterpreterFactory;
use databend_query::sql::Planner;
use databend_query::test_kits::TestFixture;
use futures_util::TryStreamExt;

async fn execute_query_rows(sql: &str) -> Result<usize> {
let fixture = TestFixture::setup().await?;
let ctx = fixture.new_query_ctx().await?;

let mut planner = Planner::new(ctx.clone());
let (plan, _) = planner.plan_sql(sql).await?;
let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?;
let stream = interpreter.execute(ctx).await?;
let blocks: Vec<DataBlock> = stream.try_collect().await?;
Ok(DataBlock::concat(&blocks)?.num_rows())
}

#[tokio::test(flavor = "multi_thread")]
async fn correlated_exists_subquery_over_union_regression() -> anyhow::Result<()> {
let sql = r"
SELECT *
FROM (VALUES (1)) t(f1)
WHERE EXISTS (
SELECT 1
UNION
SELECT 2 WHERE f1 = 1
);
";

let rows = execute_query_rows(sql).await?;
assert_eq!(rows, 1);

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn correlated_exists_subquery_over_union_all_regression() -> anyhow::Result<()> {
let sql = r"
SELECT *
FROM (VALUES (1)) t(f1)
WHERE EXISTS (
SELECT 1
UNION ALL
SELECT 2 WHERE f1 = 1
);
";

let rows = execute_query_rows(sql).await?;
assert_eq!(rows, 1);

Ok(())
}
1 change: 1 addition & 0 deletions src/query/service/tests/it/sql/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,5 @@ pub async fn test_snapshot_consistency() -> anyhow::Result<()> {
Ok(())
}

mod correlated_subquery_regression;
mod get_table_bind_test;
Original file line number Diff line number Diff line change
Expand Up @@ -855,13 +855,21 @@ impl SubqueryDecorrelatorOptimizer {
need_cross_join = true;
}

let rel_expr = RelExpr::with_s_expr(subquery);
let left_prop = rel_expr.derive_relational_prop_child(0)?;
let right_prop = rel_expr.derive_relational_prop_child(1)?;
let left_need_cross_join =
need_cross_join || !correlated_columns.is_subset(&left_prop.outer_columns);
let right_need_cross_join =
need_cross_join || !correlated_columns.is_subset(&right_prop.outer_columns);

let mut union_all = union_all.clone();
let left_flatten_plan = self.flatten_plan(
outer,
subquery.left_child(),
correlated_columns,
flatten_info,
need_cross_join,
left_need_cross_join,
)?;

union_all.left_outputs = union_all
Expand All @@ -877,7 +885,7 @@ impl SubqueryDecorrelatorOptimizer {
Ok((new, expr))
})
.chain(correlated_columns.iter().copied().map(|old| {
let new = *self.derived_columns.get(&old).unwrap();
let new = self.get_derived(old)?;
Ok((new, None))
}))
.collect::<Result<_>>()?;
Expand All @@ -888,7 +896,7 @@ impl SubqueryDecorrelatorOptimizer {
subquery.right_child(),
correlated_columns,
flatten_info,
need_cross_join,
right_need_cross_join,
)?;
union_all.right_outputs = union_all
.right_outputs
Expand All @@ -902,8 +910,8 @@ impl SubqueryDecorrelatorOptimizer {
};
Ok((new, expr))
})
.chain(correlated_columns.iter().map(|old| {
let new = *self.derived_columns.get(old).unwrap();
.chain(correlated_columns.iter().copied().map(|old| {
let new = self.get_derived(old)?;
Ok((new, None))
}))
.collect::<Result<_>>()?;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: "19574_correlated_exists_union"
description: "Correlated EXISTS over UNION should decorrelate without panicking"

sql: |
SELECT *
FROM (VALUES (1)) t(f1)
WHERE EXISTS (
SELECT 1
UNION
SELECT 2 WHERE f1 = 1
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: "19574_correlated_exists_union_all"
description: "Correlated EXISTS over UNION ALL should decorrelate without panicking"

sql: |
SELECT *
FROM (VALUES (1)) t(f1)
WHERE EXISTS (
SELECT 1
UNION ALL
SELECT 2 WHERE f1 = 1
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
EvalScalar
├── scalars: [f1 (#0) AS (#0), marker (#7) AS (#8)]
└── Filter
├── filters: [is_true(marker (#7))]
└── Join(LeftMark)
├── build keys: [f1 (#0)]
├── probe keys: [f1 (#6)]
├── other filters: []
├── Exchange(Merge)
│ └── ConstantTableScan
│ ├── columns: [f1 (#0)]
│ └── num_rows: [1]
└── UnionAll
├── output: [1 (#3), f1 (#6)]
├── left: [1 (#1), f1 (#4)]
├── right: [2 (#2), f1 (#5)]
├── cte_scan_names: []
├── logical_recursive_cte_id: None
├── Join(Cross)
│ ├── build keys: []
│ ├── probe keys: []
│ ├── other filters: []
│ ├── EvalScalar
│ │ ├── scalars: [1 AS (#1)]
│ │ └── DummyTableScan(DummyTableScan { source_table_indexes: [] })
│ └── Exchange(Merge)
│ └── Aggregate(Final)
│ ├── group items: [f1 (#4) AS (#4)]
│ ├── aggregate functions: []
│ └── Aggregate(Partial)
│ ├── group items: [f1 (#4) AS (#4)]
│ ├── aggregate functions: []
│ └── Exchange(Hash)
│ ├── Exchange(Hash): keys: [f1 (#4)]
│ └── ConstantTableScan
│ ├── columns: [f1 (#4)]
│ └── num_rows: [1]
└── EvalScalar
├── scalars: [2 AS (#2), f1 (#5) AS (#5)]
└── Join(Cross)
├── build keys: []
├── probe keys: []
├── other filters: []
├── DummyTableScan(DummyTableScan { source_table_indexes: [] })
└── Exchange(Merge)
└── Aggregate(Final)
├── group items: [f1 (#5) AS (#5)]
├── aggregate functions: []
└── Aggregate(Partial)
├── group items: [f1 (#5) AS (#5)]
├── aggregate functions: []
└── Exchange(Hash)
├── Exchange(Hash): keys: [f1 (#5)]
└── Filter
├── filters: [eq(f1 (#5), 1)]
└── ConstantTableScan
├── columns: [f1 (#5)]
└── num_rows: [1]

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
EvalScalar
├── scalars: [f1 (#0) AS (#0)]
└── Filter
├── filters: [SUBQUERY AS (#3)]
├── subquerys
│ └── Subquery (Exists)
│ ├── output_column: 1 (#3)
│ └── UnionAll
│ ├── output: [1 (#3)]
│ ├── left: [1 (#1)]
│ ├── right: [2 (#2)]
│ ├── cte_scan_names: []
│ ├── logical_recursive_cte_id: None
│ ├── EvalScalar
│ │ ├── scalars: [1 AS (#1)]
│ │ └── DummyTableScan(DummyTableScan { source_table_indexes: [] })
│ └── EvalScalar
│ ├── scalars: [2 AS (#2)]
│ └── Filter
│ ├── filters: [eq(f1 (#0), 1)]
│ └── DummyTableScan(DummyTableScan { source_table_indexes: [] })
└── ConstantTableScan
├── columns: [f1 (#0)]
└── num_rows: [1]

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
EvalScalar
├── scalars: [f1 (#0) AS (#0), marker (#7) AS (#8)]
└── Filter
├── filters: [is_true(marker (#7))]
└── Join(RightMark)
├── build keys: [f1 (#6)]
├── probe keys: [f1 (#0)]
├── other filters: []
├── Aggregate(Final)
│ ├── group items: [1 (#3) AS (#3), f1 (#6) AS (#6)]
│ ├── aggregate functions: []
│ └── Aggregate(Partial)
│ ├── group items: [1 (#3) AS (#3), f1 (#6) AS (#6)]
│ ├── aggregate functions: []
│ └── UnionAll
│ ├── output: [1 (#3), f1 (#6)]
│ ├── left: [1 (#1), f1 (#4)]
│ ├── right: [2 (#2), f1 (#5)]
│ ├── cte_scan_names: []
│ ├── logical_recursive_cte_id: None
│ ├── Join(Cross)
│ │ ├── build keys: []
│ │ ├── probe keys: []
│ │ ├── other filters: []
│ │ ├── EvalScalar
│ │ │ ├── scalars: [1 AS (#1)]
│ │ │ └── DummyTableScan(DummyTableScan { source_table_indexes: [] })
│ │ └── Exchange(Merge)
│ │ └── Aggregate(Final)
│ │ ├── group items: [f1 (#4) AS (#4)]
│ │ ├── aggregate functions: []
│ │ └── Aggregate(Partial)
│ │ ├── group items: [f1 (#4) AS (#4)]
│ │ ├── aggregate functions: []
│ │ └── Exchange(Hash)
│ │ ├── Exchange(Hash): keys: [f1 (#4)]
│ │ └── ConstantTableScan
│ │ ├── columns: [f1 (#4)]
│ │ └── num_rows: [1]
│ └── EvalScalar
│ ├── scalars: [2 AS (#2), f1 (#5) AS (#5)]
│ └── Join(Cross)
│ ├── build keys: []
│ ├── probe keys: []
│ ├── other filters: []
│ ├── DummyTableScan(DummyTableScan { source_table_indexes: [] })
│ └── Exchange(Merge)
│ └── Aggregate(Final)
│ ├── group items: [f1 (#5) AS (#5)]
│ ├── aggregate functions: []
│ └── Aggregate(Partial)
│ ├── group items: [f1 (#5) AS (#5)]
│ ├── aggregate functions: []
│ └── Exchange(Hash)
│ ├── Exchange(Hash): keys: [f1 (#5)]
│ └── Filter
│ ├── filters: [eq(f1 (#5), 1)]
│ └── ConstantTableScan
│ ├── columns: [f1 (#5)]
│ └── num_rows: [1]
└── Exchange(Merge)
└── ConstantTableScan
├── columns: [f1 (#0)]
└── num_rows: [1]

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
EvalScalar
├── scalars: [f1 (#0) AS (#0)]
└── Filter
├── filters: [SUBQUERY AS (#3)]
├── subquerys
│ └── Subquery (Exists)
│ ├── output_column: 1 (#3)
│ └── Aggregate(Initial)
│ ├── group items: [1 (#3) AS (#3)]
│ ├── aggregate functions: []
│ └── UnionAll
│ ├── output: [1 (#3)]
│ ├── left: [1 (#1)]
│ ├── right: [2 (#2)]
│ ├── cte_scan_names: []
│ ├── logical_recursive_cte_id: None
│ ├── EvalScalar
│ │ ├── scalars: [1 AS (#1)]
│ │ └── DummyTableScan(DummyTableScan { source_table_indexes: [] })
│ └── EvalScalar
│ ├── scalars: [2 AS (#2)]
│ └── Filter
│ ├── filters: [eq(f1 (#0), 1)]
│ └── DummyTableScan(DummyTableScan { source_table_indexes: [] })
└── ConstantTableScan
├── columns: [f1 (#0)]
└── num_rows: [1]

12 changes: 12 additions & 0 deletions src/query/sql/tests/it/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ const LITE_REPLAY_CASE_SPECS: &[LiteReplayCaseSpec] = &[
optimizer_skip_list: &[],
default_node_num: 1,
},
LiteReplayCaseSpec {
name: "19574_correlated_exists_union",
warehouse_distribution: false,
optimizer_skip_list: &[],
default_node_num: 1,
},
LiteReplayCaseSpec {
name: "19574_correlated_exists_union_all",
warehouse_distribution: false,
optimizer_skip_list: &[],
default_node_num: 1,
},
];

impl TestCaseRunner for LiteRunner {
Expand Down
Loading