Skip to content

Commit 1d04d22

Browse files
author
Shiv Bhatia
committed
Fix filter pushdown optimisation for sort and table scan
1 parent 897b5c1 commit 1d04d22

File tree

3 files changed

+167
-9
lines changed

3 files changed

+167
-9
lines changed

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,13 @@ impl OptimizerRule for PushDownFilter {
832832
insert_below(LogicalPlan::Distinct(distinct), new_filter)
833833
}
834834
LogicalPlan::Sort(sort) => {
835+
// If the sort has a fetch (limit), pushing a filter below
836+
// it would change semantics: the limit should apply before
837+
// the filter, not after.
838+
if sort.fetch.is_some() {
839+
filter.input = Arc::new(LogicalPlan::Sort(sort));
840+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
841+
}
835842
let new_filter =
836843
Filter::try_new(filter.predicate, Arc::clone(&sort.input))
837844
.map(LogicalPlan::Filter)?;
@@ -1130,6 +1137,13 @@ impl OptimizerRule for PushDownFilter {
11301137
}
11311138
LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)),
11321139
LogicalPlan::TableScan(scan) => {
1140+
// If the scan has a fetch (limit), pushing filters into it
1141+
// would change semantics: the limit should apply before the
1142+
// filter, not after.
1143+
if scan.fetch.is_some() {
1144+
filter.input = Arc::new(LogicalPlan::TableScan(scan));
1145+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
1146+
}
11331147
let filter_predicates = split_conjunction(&filter.predicate);
11341148

11351149
let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) =
@@ -4315,4 +4329,63 @@ mod tests {
43154329
"
43164330
)
43174331
}
4332+
4333+
#[test]
4334+
fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> {
4335+
let scan = test_table_scan()?;
4336+
let scan_with_fetch = match scan {
4337+
LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan {
4338+
fetch: Some(10),
4339+
..scan
4340+
}),
4341+
_ => unreachable!(),
4342+
};
4343+
let plan = LogicalPlanBuilder::from(scan_with_fetch)
4344+
.filter(col("a").gt(lit(10i64)))?
4345+
.build()?;
4346+
// Filter must NOT be pushed into the table scan when it has a fetch (limit)
4347+
assert_optimized_plan_equal!(
4348+
plan,
4349+
@r"
4350+
Filter: test.a > Int64(10)
4351+
TableScan: test, fetch=10
4352+
"
4353+
)
4354+
}
4355+
4356+
#[test]
4357+
fn filter_push_down_through_sort_without_fetch() -> Result<()> {
4358+
let table_scan = test_table_scan()?;
4359+
let plan = LogicalPlanBuilder::from(table_scan)
4360+
.sort(vec![col("a").sort(true, true)])?
4361+
.filter(col("a").gt(lit(10i64)))?
4362+
.build()?;
4363+
// Filter should be pushed below the sort
4364+
assert_optimized_plan_equal!(
4365+
plan,
4366+
@r"
4367+
Sort: test.a ASC NULLS FIRST
4368+
TableScan: test, full_filters=[test.a > Int64(10)]
4369+
"
4370+
)
4371+
}
4372+
4373+
#[test]
4374+
fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> {
4375+
let table_scan = test_table_scan()?;
4376+
let plan = LogicalPlanBuilder::from(table_scan)
4377+
.sort_with_limit(vec![col("a").sort(true, true)], Some(5))?
4378+
.filter(col("a").gt(lit(10i64)))?
4379+
.build()?;
4380+
// Filter must NOT be pushed below the sort when it has a fetch (limit),
4381+
// because the limit should apply before the filter.
4382+
assert_optimized_plan_equal!(
4383+
plan,
4384+
@r"
4385+
Filter: test.a > Int64(10)
4386+
Sort: test.a ASC NULLS FIRST, fetch=5
4387+
TableScan: test
4388+
"
4389+
)
4390+
}
43184391
}

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,11 +1405,22 @@ impl ExecutionPlan for SortExec {
14051405
config: &datafusion_common::config::ConfigOptions,
14061406
) -> Result<FilterDescription> {
14071407
if phase != FilterPushdownPhase::Post {
1408+
if self.fetch.is_some() {
1409+
return Ok(FilterDescription::all_unsupported(
1410+
&parent_filters,
1411+
&self.children(),
1412+
));
1413+
}
14081414
return FilterDescription::from_children(parent_filters, &self.children());
14091415
}
14101416

1411-
let mut child =
1412-
ChildFilterDescription::from_child(&parent_filters, self.input())?;
1417+
// In Post phase: block parent filters when fetch is set,
1418+
// but still push the TopK dynamic filter (self-filter).
1419+
let mut child = if self.fetch.is_some() {
1420+
ChildFilterDescription::all_unsupported(&parent_filters)
1421+
} else {
1422+
ChildFilterDescription::from_child(&parent_filters, self.input())?
1423+
};
14131424

14141425
if let Some(filter) = &self.filter
14151426
&& config.optimizer.enable_topk_dynamic_filter_pushdown
@@ -2863,4 +2874,77 @@ mod tests {
28632874
drop(contender);
28642875
Ok(())
28652876
}
2877+
2878+
#[test]
2879+
fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
2880+
use crate::empty::EmptyExec;
2881+
use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
2882+
use datafusion_common::config::ConfigOptions;
2883+
use datafusion_physical_expr::expressions::Column;
2884+
2885+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2886+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2887+
let sort = SortExec::new(
2888+
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
2889+
input,
2890+
)
2891+
.with_fetch(Some(10));
2892+
2893+
let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
2894+
let config = ConfigOptions::new();
2895+
2896+
let desc = sort.gather_filters_for_pushdown(
2897+
FilterPushdownPhase::Pre,
2898+
vec![parent_filter],
2899+
&config,
2900+
)?;
2901+
2902+
// Parent filter must be unsupported — it must not be pushed below
2903+
// a sort with fetch (TopK).
2904+
let parent_filters = desc.parent_filters();
2905+
assert_eq!(parent_filters.len(), 1);
2906+
assert_eq!(parent_filters[0].len(), 1);
2907+
assert!(
2908+
matches!(parent_filters[0][0].discriminant, PushedDown::No),
2909+
"Parent filter should be unsupported when sort has fetch"
2910+
);
2911+
2912+
Ok(())
2913+
}
2914+
2915+
#[test]
2916+
fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> {
2917+
use crate::empty::EmptyExec;
2918+
use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
2919+
use datafusion_common::config::ConfigOptions;
2920+
use datafusion_physical_expr::expressions::Column;
2921+
2922+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2923+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2924+
let sort = SortExec::new(
2925+
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
2926+
input,
2927+
);
2928+
2929+
let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
2930+
let config = ConfigOptions::new();
2931+
2932+
let desc = sort.gather_filters_for_pushdown(
2933+
FilterPushdownPhase::Pre,
2934+
vec![parent_filter],
2935+
&config,
2936+
)?;
2937+
2938+
// Parent filter should be supported — plain sort (no fetch) is
2939+
// filter-commutative.
2940+
let parent_filters = desc.parent_filters();
2941+
assert_eq!(parent_filters.len(), 1);
2942+
assert_eq!(parent_filters[0].len(), 1);
2943+
assert!(
2944+
matches!(parent_filters[0][0].discriminant, PushedDown::Yes),
2945+
"Parent filter should be supported when sort has no fetch"
2946+
);
2947+
2948+
Ok(())
2949+
}
28662950
}

datafusion/sqllogictest/test_files/window.slt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
31983198
----
31993199
logical_plan
32003200
01)Sort: rn1 ASC NULLS LAST
3201-
02)--Sort: rn1 ASC NULLS LAST, fetch=5
3202-
03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
3203-
04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
3201+
02)--Filter: rn1 < UInt64(50)
3202+
03)----Sort: rn1 ASC NULLS LAST, fetch=5
3203+
04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
32043204
05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
32053205
06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
32063206
physical_plan
3207-
01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3208-
02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5
3209-
03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3210-
04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
3207+
01)FilterExec: rn1@5 < 50
3208+
02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3209+
03)----GlobalLimitExec: skip=0, fetch=5
3210+
04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3211+
05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
32113212

32123213
# Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required
32133214
# global order. The existing sort is for the second-term lexicographical ordering requirement, which is being

0 commit comments

Comments
 (0)