Skip to content

Commit 3ef82ad

Browse files
author
Shiv Bhatia
committed
Fix filter pushdown optimisation for sort and table scan
1 parent 897b5c1 commit 3ef82ad

File tree

2 files changed

+81
-7
lines changed

2 files changed

+81
-7
lines changed

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,13 @@ impl OptimizerRule for PushDownFilter {
832832
insert_below(LogicalPlan::Distinct(distinct), new_filter)
833833
}
834834
LogicalPlan::Sort(sort) => {
835+
// If the sort has a fetch (limit), pushing a filter below
836+
// it would change semantics: the limit should apply before
837+
// the filter, not after.
838+
if sort.fetch.is_some() {
839+
filter.input = Arc::new(LogicalPlan::Sort(sort));
840+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
841+
}
835842
let new_filter =
836843
Filter::try_new(filter.predicate, Arc::clone(&sort.input))
837844
.map(LogicalPlan::Filter)?;
@@ -1130,6 +1137,13 @@ impl OptimizerRule for PushDownFilter {
11301137
}
11311138
LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)),
11321139
LogicalPlan::TableScan(scan) => {
1140+
// If the scan has a fetch (limit), pushing filters into it
1141+
// would change semantics: the limit should apply before the
1142+
// filter, not after.
1143+
if scan.fetch.is_some() {
1144+
filter.input = Arc::new(LogicalPlan::TableScan(scan));
1145+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
1146+
}
11331147
let filter_predicates = split_conjunction(&filter.predicate);
11341148

11351149
let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) =
@@ -4315,4 +4329,63 @@ mod tests {
43154329
"
43164330
)
43174331
}
4332+
4333+
#[test]
4334+
fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> {
4335+
let scan = test_table_scan()?;
4336+
let scan_with_fetch = match scan {
4337+
LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan {
4338+
fetch: Some(10),
4339+
..scan
4340+
}),
4341+
_ => unreachable!(),
4342+
};
4343+
let plan = LogicalPlanBuilder::from(scan_with_fetch)
4344+
.filter(col("a").gt(lit(10i64)))?
4345+
.build()?;
4346+
// Filter must NOT be pushed into the table scan when it has a fetch (limit)
4347+
assert_optimized_plan_equal!(
4348+
plan,
4349+
@r"
4350+
Filter: test.a > Int64(10)
4351+
TableScan: test, fetch=10
4352+
"
4353+
)
4354+
}
4355+
4356+
#[test]
4357+
fn filter_push_down_through_sort_without_fetch() -> Result<()> {
4358+
let table_scan = test_table_scan()?;
4359+
let plan = LogicalPlanBuilder::from(table_scan)
4360+
.sort(vec![col("a").sort(true, true)])?
4361+
.filter(col("a").gt(lit(10i64)))?
4362+
.build()?;
4363+
// Filter should be pushed below the sort
4364+
assert_optimized_plan_equal!(
4365+
plan,
4366+
@r"
4367+
Sort: test.a ASC NULLS FIRST
4368+
TableScan: test, full_filters=[test.a > Int64(10)]
4369+
"
4370+
)
4371+
}
4372+
4373+
#[test]
4374+
fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> {
4375+
let table_scan = test_table_scan()?;
4376+
let plan = LogicalPlanBuilder::from(table_scan)
4377+
.sort_with_limit(vec![col("a").sort(true, true)], Some(5))?
4378+
.filter(col("a").gt(lit(10i64)))?
4379+
.build()?;
4380+
// Filter must NOT be pushed below the sort when it has a fetch (limit),
4381+
// because the limit should apply before the filter.
4382+
assert_optimized_plan_equal!(
4383+
plan,
4384+
@r"
4385+
Filter: test.a > Int64(10)
4386+
Sort: test.a ASC NULLS FIRST, fetch=5
4387+
TableScan: test
4388+
"
4389+
)
4390+
}
43184391
}

datafusion/sqllogictest/test_files/window.slt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
31983198
----
31993199
logical_plan
32003200
01)Sort: rn1 ASC NULLS LAST
3201-
02)--Sort: rn1 ASC NULLS LAST, fetch=5
3202-
03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
3203-
04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
3201+
02)--Filter: rn1 < UInt64(50)
3202+
03)----Sort: rn1 ASC NULLS LAST, fetch=5
3203+
04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
32043204
05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
32053205
06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
32063206
physical_plan
3207-
01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3208-
02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5
3209-
03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3210-
04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
3207+
01)FilterExec: rn1@5 < 50
3208+
02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3209+
03)----GlobalLimitExec: skip=0, fetch=5
3210+
04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3211+
05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
32113212

32123213
# Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required
32133214
# global order. The existing sort is for the second-term lexicographical ordering requirement, which is being

0 commit comments

Comments
 (0)