Skip to content

Commit 475de9f

Browse files
author
Shiv Bhatia
committed
Fix filter pushdown optimisation
1 parent 897b5c1 commit 475de9f

File tree

4 files changed

+263
-15
lines changed

4 files changed

+263
-15
lines changed

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,13 @@ impl OptimizerRule for PushDownFilter {
832832
insert_below(LogicalPlan::Distinct(distinct), new_filter)
833833
}
834834
LogicalPlan::Sort(sort) => {
835+
// If the sort has a fetch (limit), pushing a filter below
836+
// it would change semantics: the limit should apply before
837+
// the filter, not after.
838+
if sort.fetch.is_some() {
839+
filter.input = Arc::new(LogicalPlan::Sort(sort));
840+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
841+
}
835842
let new_filter =
836843
Filter::try_new(filter.predicate, Arc::clone(&sort.input))
837844
.map(LogicalPlan::Filter)?;
@@ -1130,6 +1137,13 @@ impl OptimizerRule for PushDownFilter {
11301137
}
11311138
LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)),
11321139
LogicalPlan::TableScan(scan) => {
1140+
// If the scan has a fetch (limit), pushing filters into it
1141+
// would change semantics: the limit should apply before the
1142+
// filter, not after.
1143+
if scan.fetch.is_some() {
1144+
filter.input = Arc::new(LogicalPlan::TableScan(scan));
1145+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
1146+
}
11331147
let filter_predicates = split_conjunction(&filter.predicate);
11341148

11351149
let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) =
@@ -4315,4 +4329,63 @@ mod tests {
43154329
"
43164330
)
43174331
}
4332+
4333+
#[test]
4334+
fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> {
4335+
let scan = test_table_scan()?;
4336+
let scan_with_fetch = match scan {
4337+
LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan {
4338+
fetch: Some(10),
4339+
..scan
4340+
}),
4341+
_ => unreachable!(),
4342+
};
4343+
let plan = LogicalPlanBuilder::from(scan_with_fetch)
4344+
.filter(col("a").gt(lit(10i64)))?
4345+
.build()?;
4346+
// Filter must NOT be pushed into the table scan when it has a fetch (limit)
4347+
assert_optimized_plan_equal!(
4348+
plan,
4349+
@r"
4350+
Filter: test.a > Int64(10)
4351+
TableScan: test, fetch=10
4352+
"
4353+
)
4354+
}
4355+
4356+
#[test]
4357+
fn filter_push_down_through_sort_without_fetch() -> Result<()> {
4358+
let table_scan = test_table_scan()?;
4359+
let plan = LogicalPlanBuilder::from(table_scan)
4360+
.sort(vec![col("a").sort(true, true)])?
4361+
.filter(col("a").gt(lit(10i64)))?
4362+
.build()?;
4363+
// Filter should be pushed below the sort
4364+
assert_optimized_plan_equal!(
4365+
plan,
4366+
@r"
4367+
Sort: test.a ASC NULLS FIRST
4368+
TableScan: test, full_filters=[test.a > Int64(10)]
4369+
"
4370+
)
4371+
}
4372+
4373+
#[test]
4374+
fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> {
4375+
let table_scan = test_table_scan()?;
4376+
let plan = LogicalPlanBuilder::from(table_scan)
4377+
.sort_with_limit(vec![col("a").sort(true, true)], Some(5))?
4378+
.filter(col("a").gt(lit(10i64)))?
4379+
.build()?;
4380+
// Filter must NOT be pushed below the sort when it has a fetch (limit),
4381+
// because the limit should apply before the filter.
4382+
assert_optimized_plan_equal!(
4383+
plan,
4384+
@r"
4385+
Filter: test.a > Int64(10)
4386+
Sort: test.a ASC NULLS FIRST, fetch=5
4387+
TableScan: test
4388+
"
4389+
)
4390+
}
43184391
}

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 127 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,11 +1405,22 @@ impl ExecutionPlan for SortExec {
14051405
config: &datafusion_common::config::ConfigOptions,
14061406
) -> Result<FilterDescription> {
14071407
if phase != FilterPushdownPhase::Post {
1408+
if self.fetch.is_some() {
1409+
return Ok(FilterDescription::all_unsupported(
1410+
&parent_filters,
1411+
&self.children(),
1412+
));
1413+
}
14081414
return FilterDescription::from_children(parent_filters, &self.children());
14091415
}
14101416

1411-
let mut child =
1412-
ChildFilterDescription::from_child(&parent_filters, self.input())?;
1417+
// In Post phase: block parent filters when fetch is set,
1418+
// but still push the TopK dynamic filter (self-filter).
1419+
let mut child = if self.fetch.is_some() {
1420+
ChildFilterDescription::all_unsupported(&parent_filters)
1421+
} else {
1422+
ChildFilterDescription::from_child(&parent_filters, self.input())?
1423+
};
14131424

14141425
if let Some(filter) = &self.filter
14151426
&& config.optimizer.enable_topk_dynamic_filter_pushdown
@@ -1430,8 +1441,10 @@ mod tests {
14301441
use super::*;
14311442
use crate::coalesce_partitions::CoalescePartitionsExec;
14321443
use crate::collect;
1444+
use crate::empty::EmptyExec;
14331445
use crate::execution_plan::Boundedness;
14341446
use crate::expressions::col;
1447+
use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
14351448
use crate::test;
14361449
use crate::test::TestMemoryExec;
14371450
use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
@@ -1441,15 +1454,19 @@ mod tests {
14411454
use arrow::compute::SortOptions;
14421455
use arrow::datatypes::*;
14431456
use datafusion_common::cast::as_primitive_array;
1457+
use datafusion_common::config::ConfigOptions;
14441458
use datafusion_common::test_util::batches_to_string;
14451459
use datafusion_common::{DataFusionError, Result, ScalarValue};
14461460
use datafusion_execution::RecordBatchStream;
14471461
use datafusion_execution::config::SessionConfig;
1462+
use datafusion_execution::memory_pool::{
1463+
GreedyMemoryPool, MemoryConsumer, MemoryPool,
1464+
};
14481465
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
14491466
use datafusion_physical_expr::EquivalenceProperties;
14501467
use datafusion_physical_expr::expressions::{Column, Literal};
14511468

1452-
use futures::{FutureExt, Stream};
1469+
use futures::{FutureExt, Stream, TryStreamExt};
14531470
use insta::assert_snapshot;
14541471

14551472
#[derive(Debug, Clone)]
@@ -2748,11 +2765,6 @@ mod tests {
27482765
/// those bytes become unaccounted-for reserved memory that nobody uses.
27492766
#[tokio::test]
27502767
async fn test_sort_merge_reservation_transferred_not_freed() -> Result<()> {
2751-
use datafusion_execution::memory_pool::{
2752-
GreedyMemoryPool, MemoryConsumer, MemoryPool,
2753-
};
2754-
use futures::TryStreamExt;
2755-
27562768
let sort_spill_reservation_bytes: usize = 10 * 1024; // 10 KB
27572769

27582770
// Pool: merge reservation (10KB) + enough room for sort to work.
@@ -2863,4 +2875,111 @@ mod tests {
28632875
drop(contender);
28642876
Ok(())
28652877
}
2878+
2879+
#[test]
2880+
fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
2881+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2882+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2883+
let sort = SortExec::new(
2884+
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
2885+
input,
2886+
)
2887+
.with_fetch(Some(10));
2888+
2889+
let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
2890+
let config = ConfigOptions::new();
2891+
2892+
let desc = sort.gather_filters_for_pushdown(
2893+
FilterPushdownPhase::Pre,
2894+
vec![parent_filter],
2895+
&config,
2896+
)?;
2897+
2898+
// Parent filter must be unsupported — it must not be pushed below
2899+
// a sort with fetch (TopK).
2900+
let parent_filters = desc.parent_filters();
2901+
assert_eq!(parent_filters.len(), 1);
2902+
assert_eq!(parent_filters[0].len(), 1);
2903+
assert!(
2904+
matches!(parent_filters[0][0].discriminant, PushedDown::No),
2905+
"Parent filter should be unsupported when sort has fetch"
2906+
);
2907+
2908+
Ok(())
2909+
}
2910+
2911+
#[test]
2912+
fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> {
2913+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2914+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2915+
let sort = SortExec::new(
2916+
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
2917+
input,
2918+
);
2919+
2920+
let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
2921+
let config = ConfigOptions::new();
2922+
2923+
let desc = sort.gather_filters_for_pushdown(
2924+
FilterPushdownPhase::Pre,
2925+
vec![parent_filter],
2926+
&config,
2927+
)?;
2928+
2929+
// Parent filter should be supported — plain sort (no fetch) is
2930+
// filter-commutative.
2931+
let parent_filters = desc.parent_filters();
2932+
assert_eq!(parent_filters.len(), 1);
2933+
assert_eq!(parent_filters[0].len(), 1);
2934+
assert!(
2935+
matches!(parent_filters[0][0].discriminant, PushedDown::Yes),
2936+
"Parent filter should be supported when sort has no fetch"
2937+
);
2938+
2939+
Ok(())
2940+
}
2941+
2942+
#[test]
2943+
fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() -> Result<()> {
2944+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2945+
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
2946+
let sort = SortExec::new(
2947+
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
2948+
input,
2949+
)
2950+
.with_fetch(Some(10));
2951+
2952+
// with_fetch(Some(_)) creates the TopK dynamic filter automatically.
2953+
assert!(sort.filter.is_some(), "TopK filter should be created");
2954+
2955+
let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
2956+
let mut config = ConfigOptions::new();
2957+
config.optimizer.enable_topk_dynamic_filter_pushdown = true;
2958+
2959+
let desc = sort.gather_filters_for_pushdown(
2960+
FilterPushdownPhase::Post,
2961+
vec![parent_filter],
2962+
&config,
2963+
)?;
2964+
2965+
// Parent filters should be blocked in Post phase when fetch is set.
2966+
let parent_filters = desc.parent_filters();
2967+
assert_eq!(parent_filters.len(), 1);
2968+
assert_eq!(parent_filters[0].len(), 1);
2969+
assert!(
2970+
matches!(parent_filters[0][0].discriminant, PushedDown::No),
2971+
"Parent filter should be unsupported in Post phase when sort has fetch"
2972+
);
2973+
2974+
// The TopK self-filter should still be allowed through.
2975+
let self_filters = desc.self_filters();
2976+
assert_eq!(self_filters.len(), 1);
2977+
assert_eq!(
2978+
self_filters[0].len(),
2979+
1,
2980+
"TopK dynamic self-filter should be pushed down"
2981+
);
2982+
2983+
Ok(())
2984+
}
28662985
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Tests for filter pushdown behavior with Sort + LIMIT (fetch).
19+
20+
statement ok
21+
CREATE TABLE t(id INT, value INT) AS VALUES
22+
(1, 100),
23+
(2, 200),
24+
(3, 300),
25+
(4, 400),
26+
(5, 500);
27+
28+
# Take the 3 smallest values (100, 200, 300), then filter value > 200.
29+
query II
30+
SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
31+
----
32+
3 300
33+
34+
# Take the 3 largest values (500, 400, 300), then filter value < 400.
35+
query II
36+
SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400;
37+
----
38+
3 300
39+
40+
# The filter stays above the sort+fetch in the plan.
41+
query TT
42+
EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
43+
----
44+
logical_plan
45+
01)SubqueryAlias: sub
46+
02)--Filter: t.value > Int32(200)
47+
03)----Sort: t.value ASC NULLS LAST, fetch=3
48+
04)------TableScan: t projection=[id, value]
49+
physical_plan
50+
01)FilterExec: value@1 > 200
51+
02)--SortExec: TopK(fetch=3), expr=[value@1 ASC NULLS LAST], preserve_partitioning=[false]
52+
03)----DataSourceExec: partitions=1, partition_sizes=[1]
53+
54+
statement ok
55+
DROP TABLE t;

datafusion/sqllogictest/test_files/window.slt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
31983198
----
31993199
logical_plan
32003200
01)Sort: rn1 ASC NULLS LAST
3201-
02)--Sort: rn1 ASC NULLS LAST, fetch=5
3202-
03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
3203-
04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
3201+
02)--Filter: rn1 < UInt64(50)
3202+
03)----Sort: rn1 ASC NULLS LAST, fetch=5
3203+
04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
32043204
05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
32053205
06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
32063206
physical_plan
3207-
01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3208-
02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5
3209-
03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3210-
04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
3207+
01)FilterExec: rn1@5 < 50
3208+
02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3209+
03)----GlobalLimitExec: skip=0, fetch=5
3210+
04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3211+
05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
32113212

32123213
# Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required
32133214
# global order. The existing sort is for the second-term lexicographical ordering requirement, which is being

0 commit comments

Comments
 (0)