Skip to content

Commit 3c09d58

Browse files
author
Shiv Bhatia
committed
Fix filter pushdown optimisation for sort and table scan
1 parent 897b5c1 commit 3c09d58

File tree

2 files changed

+87
-7
lines changed

2 files changed

+87
-7
lines changed

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,16 @@ impl OptimizerRule for PushDownFilter {
832832
insert_below(LogicalPlan::Distinct(distinct), new_filter)
833833
}
834834
LogicalPlan::Sort(sort) => {
835+
// If the sort has a fetch (limit), pushing a filter below
836+
// it would change semantics: the limit should apply before
837+
// the filter, not after.
838+
if sort.fetch.is_some() {
839+
let plan = LogicalPlan::Filter(Filter::try_new(
840+
filter.predicate,
841+
Arc::new(LogicalPlan::Sort(sort)),
842+
)?);
843+
return Ok(Transformed::no(plan));
844+
}
835845
let new_filter =
836846
Filter::try_new(filter.predicate, Arc::clone(&sort.input))
837847
.map(LogicalPlan::Filter)?;
@@ -1130,6 +1140,16 @@ impl OptimizerRule for PushDownFilter {
11301140
}
11311141
LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)),
11321142
LogicalPlan::TableScan(scan) => {
1143+
// If the scan has a fetch (limit), pushing filters into it
1144+
// would change semantics: the limit should apply before the
1145+
// filter, not after.
1146+
if scan.fetch.is_some() {
1147+
let plan = LogicalPlan::Filter(Filter::try_new(
1148+
filter.predicate,
1149+
Arc::new(LogicalPlan::TableScan(scan)),
1150+
)?);
1151+
return Ok(Transformed::no(plan));
1152+
}
11331153
let filter_predicates = split_conjunction(&filter.predicate);
11341154

11351155
let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) =
@@ -4315,4 +4335,63 @@ mod tests {
43154335
"
43164336
)
43174337
}
4338+
4339+
#[test]
4340+
fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> {
4341+
let scan = test_table_scan()?;
4342+
let scan_with_fetch = match scan {
4343+
LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan {
4344+
fetch: Some(10),
4345+
..scan
4346+
}),
4347+
_ => unreachable!(),
4348+
};
4349+
let plan = LogicalPlanBuilder::from(scan_with_fetch)
4350+
.filter(col("a").gt(lit(10i64)))?
4351+
.build()?;
4352+
// Filter must NOT be pushed into the table scan when it has a fetch (limit)
4353+
assert_optimized_plan_equal!(
4354+
plan,
4355+
@r"
4356+
Filter: test.a > Int64(10)
4357+
TableScan: test, fetch=10
4358+
"
4359+
)
4360+
}
4361+
4362+
#[test]
4363+
fn filter_push_down_through_sort_without_fetch() -> Result<()> {
4364+
let table_scan = test_table_scan()?;
4365+
let plan = LogicalPlanBuilder::from(table_scan)
4366+
.sort(vec![col("a").sort(true, true)])?
4367+
.filter(col("a").gt(lit(10i64)))?
4368+
.build()?;
4369+
// Filter should be pushed below the sort
4370+
assert_optimized_plan_equal!(
4371+
plan,
4372+
@r"
4373+
Sort: test.a ASC NULLS FIRST
4374+
TableScan: test, full_filters=[test.a > Int64(10)]
4375+
"
4376+
)
4377+
}
4378+
4379+
#[test]
4380+
fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> {
4381+
let table_scan = test_table_scan()?;
4382+
let plan = LogicalPlanBuilder::from(table_scan)
4383+
.sort_with_limit(vec![col("a").sort(true, true)], Some(5))?
4384+
.filter(col("a").gt(lit(10i64)))?
4385+
.build()?;
4386+
// Filter must NOT be pushed below the sort when it has a fetch (limit),
4387+
// because the limit should apply before the filter.
4388+
assert_optimized_plan_equal!(
4389+
plan,
4390+
@r"
4391+
Filter: test.a > Int64(10)
4392+
Sort: test.a ASC NULLS FIRST, fetch=5
4393+
TableScan: test
4394+
"
4395+
)
4396+
}
43184397
}

datafusion/sqllogictest/test_files/window.slt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
31983198
----
31993199
logical_plan
32003200
01)Sort: rn1 ASC NULLS LAST
3201-
02)--Sort: rn1 ASC NULLS LAST, fetch=5
3202-
03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
3203-
04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
3201+
02)--Filter: rn1 < UInt64(50)
3202+
03)----Sort: rn1 ASC NULLS LAST, fetch=5
3203+
04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
32043204
05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
32053205
06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
32063206
physical_plan
3207-
01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3208-
02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5
3209-
03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3210-
04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
3207+
01)FilterExec: rn1@5 < 50
3208+
02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3209+
03)----GlobalLimitExec: skip=0, fetch=5
3210+
04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3211+
05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
32113212

32123213
# Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required
32133214
# global order. The existing sort is for the second-term lexicographical ordering requirement, which is being

0 commit comments

Comments
 (0)