Skip to content
Merged
73 changes: 73 additions & 0 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,13 @@ impl OptimizerRule for PushDownFilter {
insert_below(LogicalPlan::Distinct(distinct), new_filter)
}
LogicalPlan::Sort(sort) => {
// If the sort has a fetch (limit), pushing a filter below
// it would change semantics: the limit should apply before
// the filter, not after.
if sort.fetch.is_some() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionPlan has fetch on it so I wonder if we need to do this more generically?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's physical nodes, this is the logical plan optimizer

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For logical nodes as far as I can see only Sort, Limit, and TableScan have fetch

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry yes, that sounds right

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was just trying to think about how we can prevent this kind of bug happening in the fututre.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to modify gather_filters_for_pushdown in SortExec

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will need to check physical plan too.

if self.fetch.is_some() {
          return Ok(FilterDescription::all_unsupported(
              &parent_filters,
              &self.children(),
          ));
      }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be reasonable to add a fn fetch(&self) -> Option<usize> to LogicalPlan. We could make the match exhaustive / have no fall-through so that anyone adding a new variant has to update it. At least then it will be in one place and also closer to the change -> better for human and machine to pick it up.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be nice!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense @adriangb, I've tried adding that: 0e83891

Comment thread
shivbhatia10 marked this conversation as resolved.
Outdated
filter.input = Arc::new(LogicalPlan::Sort(sort));
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}
let new_filter =
Filter::try_new(filter.predicate, Arc::clone(&sort.input))
.map(LogicalPlan::Filter)?;
Expand Down Expand Up @@ -1130,6 +1137,13 @@ impl OptimizerRule for PushDownFilter {
}
LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)),
LogicalPlan::TableScan(scan) => {
// If the scan has a fetch (limit), pushing filters into it
// would change semantics: the limit should apply before the
// filter, not after.
if scan.fetch.is_some() {
filter.input = Arc::new(LogicalPlan::TableScan(scan));
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}
Copy link
Copy Markdown
Contributor

@hareshkh hareshkh Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb : Curious about your thoughts for this part of the change?
For a plan like:

FILTER col = val
|---- LIMIT 50
      |---- TABLE_SCAN

After PushDownLimit folds the limit, we get FILTER -> TABLE_SCAN(fetch=50). This PR prevents pushing the filter into scan.filters when fetch is set.

Since there is no ordering specified, the LIMIT is essentially non-deterministic in the rows it returns. So the filter can be moved past it or run after it — both are semantically correct - in which case we can remove this part of the change. But if the table scan has an underlying implicit ordering (due to the layout of data or such), then pushing the filter around may be incorrect.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After PushDownLimit folds the limit, we get FILTER -> TABLE_SCAN(fetch=50). This PR prevents pushing the filter into scan.filters when fetch is set.

This is an excellent point

. So the filter can be moved past it or run after it — both are semantically correct

I am not sure they are both semantically correct. I think limit is (should be) applied after Filters in table providers and there are some places where that already happens. For example

  • // We should not limit the number of partitioned files to scan if there are filters and limit
    // at the same time. This is because the limit should be applied after the filters are applied.
    let statistic_file_limit = if filters.is_empty() { limit } else { None };

Also in the parquet data source I know the limit is applied after filters as well

However, that does not appear to be explicitly documented anywhere I could find -- I will make a PR to clarify the documentation

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thankyou @alamb - I think keeping this check for TableScan makes sense.

let filter_predicates = split_conjunction(&filter.predicate);

let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) =
Expand Down Expand Up @@ -4315,4 +4329,63 @@ mod tests {
"
)
}

#[test]
fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> {
let scan = test_table_scan()?;
let scan_with_fetch = match scan {
LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan {
fetch: Some(10),
..scan
}),
_ => unreachable!(),
};
let plan = LogicalPlanBuilder::from(scan_with_fetch)
.filter(col("a").gt(lit(10i64)))?
.build()?;
// Filter must NOT be pushed into the table scan when it has a fetch (limit)
assert_optimized_plan_equal!(
plan,
@r"
Filter: test.a > Int64(10)
TableScan: test, fetch=10
"
)
}

#[test]
fn filter_push_down_through_sort_without_fetch() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.sort(vec![col("a").sort(true, true)])?
.filter(col("a").gt(lit(10i64)))?
.build()?;
// Filter should be pushed below the sort
assert_optimized_plan_equal!(
plan,
@r"
Sort: test.a ASC NULLS FIRST
TableScan: test, full_filters=[test.a > Int64(10)]
"
)
}

#[test]
fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.sort_with_limit(vec![col("a").sort(true, true)], Some(5))?
.filter(col("a").gt(lit(10i64)))?
.build()?;
// Filter must NOT be pushed below the sort when it has a fetch (limit),
// because the limit should apply before the filter.
assert_optimized_plan_equal!(
plan,
@r"
Filter: test.a > Int64(10)
Sort: test.a ASC NULLS FIRST, fetch=5
TableScan: test
"
)
}
}
135 changes: 127 additions & 8 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1405,11 +1405,22 @@ impl ExecutionPlan for SortExec {
config: &datafusion_common::config::ConfigOptions,
) -> Result<FilterDescription> {
if phase != FilterPushdownPhase::Post {
if self.fetch.is_some() {
return Ok(FilterDescription::all_unsupported(
&parent_filters,
&self.children(),
));
}
return FilterDescription::from_children(parent_filters, &self.children());
}

let mut child =
ChildFilterDescription::from_child(&parent_filters, self.input())?;
// In Post phase: block parent filters when fetch is set,
// but still push the TopK dynamic filter (self-filter).
let mut child = if self.fetch.is_some() {
ChildFilterDescription::all_unsupported(&parent_filters)
} else {
ChildFilterDescription::from_child(&parent_filters, self.input())?
};

if let Some(filter) = &self.filter
&& config.optimizer.enable_topk_dynamic_filter_pushdown
Expand All @@ -1430,8 +1441,10 @@ mod tests {
use super::*;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::collect;
use crate::empty::EmptyExec;
use crate::execution_plan::Boundedness;
use crate::expressions::col;
use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
use crate::test;
use crate::test::TestMemoryExec;
use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
Expand All @@ -1441,15 +1454,19 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::*;
use datafusion_common::cast::as_primitive_array;
use datafusion_common::config::ConfigOptions;
use datafusion_common::test_util::batches_to_string;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_execution::RecordBatchStream;
use datafusion_execution::config::SessionConfig;
use datafusion_execution::memory_pool::{
GreedyMemoryPool, MemoryConsumer, MemoryPool,
};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::expressions::{Column, Literal};

use futures::{FutureExt, Stream};
use futures::{FutureExt, Stream, TryStreamExt};
use insta::assert_snapshot;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -2748,11 +2765,6 @@ mod tests {
/// those bytes become unaccounted-for reserved memory that nobody uses.
#[tokio::test]
async fn test_sort_merge_reservation_transferred_not_freed() -> Result<()> {
use datafusion_execution::memory_pool::{
GreedyMemoryPool, MemoryConsumer, MemoryPool,
};
use futures::TryStreamExt;

let sort_spill_reservation_bytes: usize = 10 * 1024; // 10 KB

// Pool: merge reservation (10KB) + enough room for sort to work.
Expand Down Expand Up @@ -2863,4 +2875,111 @@ mod tests {
drop(contender);
Ok(())
}

#[test]
fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
Comment thread
alamb marked this conversation as resolved.
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let sort = SortExec::new(
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
input,
)
.with_fetch(Some(10));

let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
let config = ConfigOptions::new();

let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Pre,
vec![parent_filter],
&config,
)?;

// Parent filter must be unsupported — it must not be pushed below
// a sort with fetch (TopK).
let parent_filters = desc.parent_filters();
assert_eq!(parent_filters.len(), 1);
assert_eq!(parent_filters[0].len(), 1);
assert!(
matches!(parent_filters[0][0].discriminant, PushedDown::No),
"Parent filter should be unsupported when sort has fetch"
);

Ok(())
}

#[test]
fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let sort = SortExec::new(
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
input,
);

let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
let config = ConfigOptions::new();

let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Pre,
vec![parent_filter],
&config,
)?;

// Parent filter should be supported — plain sort (no fetch) is
// filter-commutative.
let parent_filters = desc.parent_filters();
assert_eq!(parent_filters.len(), 1);
assert_eq!(parent_filters[0].len(), 1);
assert!(
matches!(parent_filters[0][0].discriminant, PushedDown::Yes),
"Parent filter should be supported when sort has no fetch"
);

Ok(())
}

#[test]
fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let sort = SortExec::new(
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
input,
)
.with_fetch(Some(10));

// with_fetch(Some(_)) creates the TopK dynamic filter automatically.
assert!(sort.filter.is_some(), "TopK filter should be created");

let parent_filter: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
let mut config = ConfigOptions::new();
config.optimizer.enable_topk_dynamic_filter_pushdown = true;

let desc = sort.gather_filters_for_pushdown(
FilterPushdownPhase::Post,
vec![parent_filter],
&config,
)?;

// Parent filters should be blocked in Post phase when fetch is set.
let parent_filters = desc.parent_filters();
assert_eq!(parent_filters.len(), 1);
assert_eq!(parent_filters[0].len(), 1);
assert!(
matches!(parent_filters[0][0].discriminant, PushedDown::No),
"Parent filter should be unsupported in Post phase when sort has fetch"
);

// The TopK self-filter should still be allowed through.
let self_filters = desc.self_filters();
assert_eq!(self_filters.len(), 1);
assert_eq!(
self_filters[0].len(),
1,
"TopK dynamic self-filter should be pushed down"
);

Ok(())
}
}
55 changes: 55 additions & 0 deletions datafusion/sqllogictest/test_files/push_down_filter_sort_fetch.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Tests for filter pushdown behavior with Sort + LIMIT (fetch).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding this to an existing test rather than an entirely new .slt test to make it easier to discover int he future

Perhaps datafusion/sqllogictest/test_files/limit.slt


statement ok
CREATE TABLE t(id INT, value INT) AS VALUES
(1, 100),
(2, 200),
(3, 300),
(4, 400),
(5, 500);

# Take the 3 smallest values (100, 200, 300), then filter value > 200.
query II
SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
----
3 300

# Take the 3 largest values (500, 400, 300), then filter value < 400.
query II
SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400;
----
3 300

# The filter stays above the sort+fetch in the plan.
query TT
EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
----
logical_plan
01)SubqueryAlias: sub
02)--Filter: t.value > Int32(200)
03)----Sort: t.value ASC NULLS LAST, fetch=3
04)------TableScan: t projection=[id, value]
physical_plan
01)FilterExec: value@1 > 200
02)--SortExec: TopK(fetch=3), expr=[value@1 ASC NULLS LAST], preserve_partitioning=[false]
03)----DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
DROP TABLE t;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails without our fixes

15 changes: 8 additions & 7 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
----
logical_plan
01)Sort: rn1 ASC NULLS LAST
02)--Sort: rn1 ASC NULLS LAST, fetch=5
03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it definitely had the filter too low 👍

02)--Filter: rn1 < UInt64(50)
03)----Sort: rn1 ASC NULLS LAST, fetch=5
04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
physical_plan
01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5
03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
01)FilterExec: rn1@5 < 50
02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
03)----GlobalLimitExec: skip=0, fetch=5
04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]

# Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required
# global order. The existing sort is for the second-term lexicographical ordering requirement, which is being
Expand Down
Loading