-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix push_down_filter for children with non-empty fetch fields #21057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
475de9f
0e83891
db84bf8
79bdcac
cf5ab10
b87fd67
eee6542
2a42cf0
6601e44
2ac6b09
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -832,6 +832,13 @@ impl OptimizerRule for PushDownFilter { | |||||||||
| insert_below(LogicalPlan::Distinct(distinct), new_filter) | ||||||||||
| } | ||||||||||
| LogicalPlan::Sort(sort) => { | ||||||||||
| // If the sort has a fetch (limit), pushing a filter below | ||||||||||
| // it would change semantics: the limit should apply before | ||||||||||
| // the filter, not after. | ||||||||||
| if sort.fetch.is_some() { | ||||||||||
|
shivbhatia10 marked this conversation as resolved.
Outdated
|
||||||||||
| filter.input = Arc::new(LogicalPlan::Sort(sort)); | ||||||||||
| return Ok(Transformed::no(LogicalPlan::Filter(filter))); | ||||||||||
| } | ||||||||||
| let new_filter = | ||||||||||
| Filter::try_new(filter.predicate, Arc::clone(&sort.input)) | ||||||||||
| .map(LogicalPlan::Filter)?; | ||||||||||
|
|
@@ -1130,6 +1137,13 @@ impl OptimizerRule for PushDownFilter { | |||||||||
| } | ||||||||||
| LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)), | ||||||||||
| LogicalPlan::TableScan(scan) => { | ||||||||||
| // If the scan has a fetch (limit), pushing filters into it | ||||||||||
| // would change semantics: the limit should apply before the | ||||||||||
| // filter, not after. | ||||||||||
| if scan.fetch.is_some() { | ||||||||||
| filter.input = Arc::new(LogicalPlan::TableScan(scan)); | ||||||||||
| return Ok(Transformed::no(LogicalPlan::Filter(filter))); | ||||||||||
| } | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alamb : Curious about your thoughts for this part of the change? After PushDownLimit folds the limit, we get FILTER -> TABLE_SCAN(fetch=50). This PR prevents pushing the filter into scan.filters when fetch is set. Since there is no ordering specified, the LIMIT is essentially non-deterministic in the rows it returns. So the filter can be moved past it or run after it — both are semantically correct - in which case we can remove this part of the change. But if the table scan has an underlying implicit ordering (due to the layout of data or such), then pushing the filter around may be incorrect.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is an excellent point
I am not sure they are both semantically correct. I think limit is (should be) applied after Filters in table providers and there are some places where that already happens. For example
Also in the parquet data source I know the limit is applied after filters as well However, that does not appear to be explicitly documented anywhere I could find -- I will make a PR to clarify the documentation
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thankyou @alamb - I think keeping this check for TableScan makes sense. |
||||||||||
| let filter_predicates = split_conjunction(&filter.predicate); | ||||||||||
|
|
||||||||||
| let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) = | ||||||||||
|
|
@@ -4315,4 +4329,63 @@ mod tests { | |||||||||
| " | ||||||||||
| ) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| #[test] | ||||||||||
| fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> { | ||||||||||
| let scan = test_table_scan()?; | ||||||||||
| let scan_with_fetch = match scan { | ||||||||||
| LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan { | ||||||||||
| fetch: Some(10), | ||||||||||
| ..scan | ||||||||||
| }), | ||||||||||
| _ => unreachable!(), | ||||||||||
| }; | ||||||||||
| let plan = LogicalPlanBuilder::from(scan_with_fetch) | ||||||||||
| .filter(col("a").gt(lit(10i64)))? | ||||||||||
| .build()?; | ||||||||||
| // Filter must NOT be pushed into the table scan when it has a fetch (limit) | ||||||||||
| assert_optimized_plan_equal!( | ||||||||||
| plan, | ||||||||||
| @r" | ||||||||||
| Filter: test.a > Int64(10) | ||||||||||
| TableScan: test, fetch=10 | ||||||||||
| " | ||||||||||
| ) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| #[test] | ||||||||||
| fn filter_push_down_through_sort_without_fetch() -> Result<()> { | ||||||||||
| let table_scan = test_table_scan()?; | ||||||||||
| let plan = LogicalPlanBuilder::from(table_scan) | ||||||||||
| .sort(vec![col("a").sort(true, true)])? | ||||||||||
| .filter(col("a").gt(lit(10i64)))? | ||||||||||
| .build()?; | ||||||||||
| // Filter should be pushed below the sort | ||||||||||
| assert_optimized_plan_equal!( | ||||||||||
| plan, | ||||||||||
| @r" | ||||||||||
| Sort: test.a ASC NULLS FIRST | ||||||||||
| TableScan: test, full_filters=[test.a > Int64(10)] | ||||||||||
| " | ||||||||||
| ) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| #[test] | ||||||||||
| fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> { | ||||||||||
| let table_scan = test_table_scan()?; | ||||||||||
| let plan = LogicalPlanBuilder::from(table_scan) | ||||||||||
| .sort_with_limit(vec![col("a").sort(true, true)], Some(5))? | ||||||||||
| .filter(col("a").gt(lit(10i64)))? | ||||||||||
| .build()?; | ||||||||||
| // Filter must NOT be pushed below the sort when it has a fetch (limit), | ||||||||||
| // because the limit should apply before the filter. | ||||||||||
| assert_optimized_plan_equal!( | ||||||||||
| plan, | ||||||||||
| @r" | ||||||||||
| Filter: test.a > Int64(10) | ||||||||||
| Sort: test.a ASC NULLS FIRST, fetch=5 | ||||||||||
| TableScan: test | ||||||||||
| " | ||||||||||
| ) | ||||||||||
| } | ||||||||||
| } | ||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
|
|
||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| # Tests for filter pushdown behavior with Sort + LIMIT (fetch). | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I recommend adding this to an existing test rather than an entirely new .slt test to make it easier to discover int he future Perhaps |
||
|
|
||
| statement ok | ||
| CREATE TABLE t(id INT, value INT) AS VALUES | ||
| (1, 100), | ||
| (2, 200), | ||
| (3, 300), | ||
| (4, 400), | ||
| (5, 500); | ||
|
|
||
| # Take the 3 smallest values (100, 200, 300), then filter value > 200. | ||
| query II | ||
| SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200; | ||
| ---- | ||
| 3 300 | ||
|
|
||
| # Take the 3 largest values (500, 400, 300), then filter value < 400. | ||
| query II | ||
| SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400; | ||
| ---- | ||
| 3 300 | ||
|
|
||
| # The filter stays above the sort+fetch in the plan. | ||
| query TT | ||
| EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200; | ||
| ---- | ||
| logical_plan | ||
| 01)SubqueryAlias: sub | ||
| 02)--Filter: t.value > Int32(200) | ||
| 03)----Sort: t.value ASC NULLS LAST, fetch=3 | ||
| 04)------TableScan: t projection=[id, value] | ||
| physical_plan | ||
| 01)FilterExec: value@1 > 200 | ||
| 02)--SortExec: TopK(fetch=3), expr=[value@1 ASC NULLS LAST], preserve_partitioning=[false] | ||
| 03)----DataSourceExec: partitions=1, partition_sizes=[1] | ||
|
|
||
| statement ok | ||
| DROP TABLE t; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This fails without our fixes |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1 | |
| ---- | ||
| logical_plan | ||
| 01)Sort: rn1 ASC NULLS LAST | ||
| 02)--Sort: rn1 ASC NULLS LAST, fetch=5 | ||
| 03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1 | ||
| 04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it definitely had the filter too low 👍 |
||
| 02)--Filter: rn1 < UInt64(50) | ||
| 03)----Sort: rn1 ASC NULLS LAST, fetch=5 | ||
| 04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1 | ||
| 05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] | ||
| 06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d] | ||
| physical_plan | ||
| 01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1] | ||
| 02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5 | ||
| 03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] | ||
| 04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] | ||
| 01)FilterExec: rn1@5 < 50 | ||
| 02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1] | ||
| 03)----GlobalLimitExec: skip=0, fetch=5 | ||
| 04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted] | ||
| 05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] | ||
|
|
||
| # Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required | ||
| # global order. The existing sort is for the second-term lexicographical ordering requirement, which is being | ||
|
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutionPlanhas fetch on it so I wonder if we need to do this more generically?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's physical nodes, this is the logical plan optimizer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For logical nodes as far as I can see only Sort, Limit, and TableScan have fetch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry yes, that sounds right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was just trying to think about how we can prevent this kind of bug happening in the fututre.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to modify
gather_filters_for_pushdowninSortExecThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will need to check physical plan too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be reasonable to add a
fn fetch(&self) -> Option<usize>toLogicalPlan. We could make the match exhaustive / have no fall-through so that anyone adding a new variant has to update it. At least then it will be in one place and also closer to the change -> better for human and machine to pick it up.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would be nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense @adriangb, I've tried adding that: 0e83891