Skip to content

Commit ec00d02

Browse files
shivbhatia10Shiv Bhatia
authored andcommitted
Fix push_down_filter for children with non-empty fetch fields (apache#21057)
<!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#21063 <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Currently if we see a filter with a limit underneath, we don't push the filter past the limit. However, sort nodes and table scan nodes can have fetch fields which do essentially the same thing, and we don't stop filters being pushed past them. This is a correctness bug that can lead to undefined behaviour. I added checks for exactly this condition so we don't push the filter down. I think the prior expectation was that there would be a limit node between any of these nodes, but this is also not true. In `push_down_limit.rs`, there's code that does this optimisation when a limit has a sort under it: ``` LogicalPlan::Sort(mut sort) => { let new_fetch = { let sort_fetch = skip + fetch; Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch)) }; if new_fetch == sort.fetch { if skip > 0 { original_limit(skip, fetch, LogicalPlan::Sort(sort)) } else { Ok(Transformed::yes(LogicalPlan::Sort(sort))) } } else { sort.fetch = new_fetch; limit.input = Arc::new(LogicalPlan::Sort(sort)); Ok(Transformed::yes(LogicalPlan::Limit(limit))) } } ``` The first time this runs, it sets the internal fetch of the sort to new_fetch, and on the second optimisation pass it hits the branch where we just get rid of the limit node altogether, leaving the sort node exposed to potential filters which can now push down into it. There is also a related fix in `gather_filters_for_pushdown` in `SortExec`, which does the same thing for physical plan nodes. If we see that a given execution plan has non-empty fetch, it should not allow any parent filters to be pushed down. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Added checks in the optimisation rule to avoid pushing filters past children with built-in limits. <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes: - Unit tests in `push_down_filter.rs` - Fixed an existing test in `window.slt` - Unit tests for the physical plan change in `sort.rs` - New slt test in `push_down_filter_sort_fetch.slt` for this exact behaviour <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> No <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Shiv Bhatia <sbhatia@palantir.com>
1 parent 01df975 commit ec00d02

File tree

5 files changed

+270
-9
lines changed

5 files changed

+270
-9
lines changed

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1392,6 +1392,82 @@ impl LogicalPlan {
13921392
}
13931393
}
13941394

1395+
/// Returns the skip (offset) of this plan node, if it has one.
1396+
///
1397+
/// Only [`LogicalPlan::Limit`] carries a skip value; all other variants
1398+
/// return `Ok(None)`. Returns `Ok(None)` for a zero skip.
1399+
pub fn skip(&self) -> Result<Option<usize>> {
1400+
match self {
1401+
LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
1402+
SkipType::Literal(0) => Ok(None),
1403+
SkipType::Literal(n) => Ok(Some(n)),
1404+
SkipType::UnsupportedExpr => Ok(None),
1405+
},
1406+
LogicalPlan::Sort(_) => Ok(None),
1407+
LogicalPlan::TableScan(_) => Ok(None),
1408+
LogicalPlan::Projection(_) => Ok(None),
1409+
LogicalPlan::Filter(_) => Ok(None),
1410+
LogicalPlan::Window(_) => Ok(None),
1411+
LogicalPlan::Aggregate(_) => Ok(None),
1412+
LogicalPlan::Join(_) => Ok(None),
1413+
LogicalPlan::Repartition(_) => Ok(None),
1414+
LogicalPlan::Union(_) => Ok(None),
1415+
LogicalPlan::EmptyRelation(_) => Ok(None),
1416+
LogicalPlan::Subquery(_) => Ok(None),
1417+
LogicalPlan::SubqueryAlias(_) => Ok(None),
1418+
LogicalPlan::Statement(_) => Ok(None),
1419+
LogicalPlan::Values(_) => Ok(None),
1420+
LogicalPlan::Explain(_) => Ok(None),
1421+
LogicalPlan::Analyze(_) => Ok(None),
1422+
LogicalPlan::Extension(_) => Ok(None),
1423+
LogicalPlan::Distinct(_) => Ok(None),
1424+
LogicalPlan::Dml(_) => Ok(None),
1425+
LogicalPlan::Ddl(_) => Ok(None),
1426+
LogicalPlan::Copy(_) => Ok(None),
1427+
LogicalPlan::DescribeTable(_) => Ok(None),
1428+
LogicalPlan::Unnest(_) => Ok(None),
1429+
LogicalPlan::RecursiveQuery(_) => Ok(None),
1430+
}
1431+
}
1432+
1433+
/// Returns the fetch (limit) of this plan node, if it has one.
1434+
///
1435+
/// [`LogicalPlan::Sort`], [`LogicalPlan::TableScan`], and
1436+
/// [`LogicalPlan::Limit`] may carry a fetch value; all other variants
1437+
/// return `Ok(None)`.
1438+
pub fn fetch(&self) -> Result<Option<usize>> {
1439+
match self {
1440+
LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
1441+
LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
1442+
LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
1443+
FetchType::Literal(s) => Ok(s),
1444+
FetchType::UnsupportedExpr => Ok(None),
1445+
},
1446+
LogicalPlan::Projection(_) => Ok(None),
1447+
LogicalPlan::Filter(_) => Ok(None),
1448+
LogicalPlan::Window(_) => Ok(None),
1449+
LogicalPlan::Aggregate(_) => Ok(None),
1450+
LogicalPlan::Join(_) => Ok(None),
1451+
LogicalPlan::Repartition(_) => Ok(None),
1452+
LogicalPlan::Union(_) => Ok(None),
1453+
LogicalPlan::EmptyRelation(_) => Ok(None),
1454+
LogicalPlan::Subquery(_) => Ok(None),
1455+
LogicalPlan::SubqueryAlias(_) => Ok(None),
1456+
LogicalPlan::Statement(_) => Ok(None),
1457+
LogicalPlan::Values(_) => Ok(None),
1458+
LogicalPlan::Explain(_) => Ok(None),
1459+
LogicalPlan::Analyze(_) => Ok(None),
1460+
LogicalPlan::Extension(_) => Ok(None),
1461+
LogicalPlan::Distinct(_) => Ok(None),
1462+
LogicalPlan::Dml(_) => Ok(None),
1463+
LogicalPlan::Ddl(_) => Ok(None),
1464+
LogicalPlan::Copy(_) => Ok(None),
1465+
LogicalPlan::DescribeTable(_) => Ok(None),
1466+
LogicalPlan::Unnest(_) => Ok(None),
1467+
LogicalPlan::RecursiveQuery(_) => Ok(None),
1468+
}
1469+
}
1470+
13951471
/// If this node's expressions contains any references to an outer subquery
13961472
pub fn contains_outer_reference(&self) -> bool {
13971473
let mut contains = false;

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,13 @@ impl OptimizerRule for PushDownFilter {
793793
filter.predicate = new_predicate;
794794
}
795795

796+
// If the child has a fetch (limit) or skip (offset), pushing a filter
797+
// below it would change semantics: the limit/offset should apply before
798+
// the filter, not after.
799+
if filter.input.fetch()?.is_some() || filter.input.skip()?.is_some() {
800+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
801+
}
802+
796803
match Arc::unwrap_or_clone(filter.input) {
797804
LogicalPlan::Filter(child_filter) => {
798805
let parents_predicates = split_conjunction_owned(filter.predicate);
@@ -4296,4 +4303,63 @@ mod tests {
42964303
"
42974304
)
42984305
}
4306+
4307+
#[test]
4308+
fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> {
4309+
let scan = test_table_scan()?;
4310+
let scan_with_fetch = match scan {
4311+
LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan {
4312+
fetch: Some(10),
4313+
..scan
4314+
}),
4315+
_ => unreachable!(),
4316+
};
4317+
let plan = LogicalPlanBuilder::from(scan_with_fetch)
4318+
.filter(col("a").gt(lit(10i64)))?
4319+
.build()?;
4320+
// Filter must NOT be pushed into the table scan when it has a fetch (limit)
4321+
assert_optimized_plan_equal!(
4322+
plan,
4323+
@r"
4324+
Filter: test.a > Int64(10)
4325+
TableScan: test, fetch=10
4326+
"
4327+
)
4328+
}
4329+
4330+
#[test]
4331+
fn filter_push_down_through_sort_without_fetch() -> Result<()> {
4332+
let table_scan = test_table_scan()?;
4333+
let plan = LogicalPlanBuilder::from(table_scan)
4334+
.sort(vec![col("a").sort(true, true)])?
4335+
.filter(col("a").gt(lit(10i64)))?
4336+
.build()?;
4337+
// Filter should be pushed below the sort
4338+
assert_optimized_plan_equal!(
4339+
plan,
4340+
@r"
4341+
Sort: test.a ASC NULLS FIRST
4342+
TableScan: test, full_filters=[test.a > Int64(10)]
4343+
"
4344+
)
4345+
}
4346+
4347+
#[test]
4348+
fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> {
4349+
let table_scan = test_table_scan()?;
4350+
let plan = LogicalPlanBuilder::from(table_scan)
4351+
.sort_with_limit(vec![col("a").sort(true, true)], Some(5))?
4352+
.filter(col("a").gt(lit(10i64)))?
4353+
.build()?;
4354+
// Filter must NOT be pushed below the sort when it has a fetch (limit),
4355+
// because the limit should apply before the filter.
4356+
assert_optimized_plan_equal!(
4357+
plan,
4358+
@r"
4359+
Filter: test.a > Int64(10)
4360+
Sort: test.a ASC NULLS FIRST, fetch=5
4361+
TableScan: test
4362+
"
4363+
)
4364+
}
42994365
}

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,11 +1374,22 @@ impl ExecutionPlan for SortExec {
13741374
config: &datafusion_common::config::ConfigOptions,
13751375
) -> Result<FilterDescription> {
13761376
if phase != FilterPushdownPhase::Post {
1377+
if self.fetch.is_some() {
1378+
return Ok(FilterDescription::all_unsupported(
1379+
&parent_filters,
1380+
&self.children(),
1381+
));
1382+
}
13771383
return FilterDescription::from_children(parent_filters, &self.children());
13781384
}
13791385

1380-
let mut child =
1381-
ChildFilterDescription::from_child(&parent_filters, self.input())?;
1386+
// In Post phase: block parent filters when fetch is set,
1387+
// but still push the TopK dynamic filter (self-filter).
1388+
let mut child = if self.fetch.is_some() {
1389+
ChildFilterDescription::all_unsupported(&parent_filters)
1390+
} else {
1391+
ChildFilterDescription::from_child(&parent_filters, self.input())?
1392+
};
13821393

13831394
if let Some(filter) = &self.filter
13841395
&& config.optimizer.enable_topk_dynamic_filter_pushdown
@@ -1399,8 +1410,10 @@ mod tests {
13991410
use super::*;
14001411
use crate::coalesce_partitions::CoalescePartitionsExec;
14011412
use crate::collect;
1413+
use crate::empty::EmptyExec;
14021414
use crate::execution_plan::Boundedness;
14031415
use crate::expressions::col;
1416+
use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
14041417
use crate::test;
14051418
use crate::test::TestMemoryExec;
14061419
use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
@@ -1410,10 +1423,12 @@ mod tests {
14101423
use arrow::compute::SortOptions;
14111424
use arrow::datatypes::*;
14121425
use datafusion_common::cast::as_primitive_array;
1426+
use datafusion_common::config::ConfigOptions;
14131427
use datafusion_common::test_util::batches_to_string;
14141428
use datafusion_common::{DataFusionError, Result, ScalarValue};
14151429
use datafusion_execution::RecordBatchStream;
14161430
use datafusion_execution::config::SessionConfig;
1431+
use datafusion_execution::memory_pool::MemoryPool;
14171432
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
14181433
use datafusion_physical_expr::EquivalenceProperties;
14191434
use datafusion_physical_expr::expressions::{Column, Literal};
@@ -2691,4 +2706,68 @@ mod tests {
26912706

26922707
Ok(())
26932708
}
2709+
2710+
fn make_sort_exec_with_fetch(fetch: Option<usize>) -> SortExec {
2711+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2712+
let input = Arc::new(EmptyExec::new(schema));
2713+
SortExec::new(
2714+
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
2715+
input,
2716+
)
2717+
.with_fetch(fetch)
2718+
}
2719+
2720+
#[test]
2721+
fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
2722+
let sort = make_sort_exec_with_fetch(Some(10));
2723+
let desc = sort.gather_filters_for_pushdown(
2724+
FilterPushdownPhase::Pre,
2725+
vec![Arc::new(Column::new("a", 0))],
2726+
&ConfigOptions::new(),
2727+
)?;
2728+
// Sort with fetch (TopK) must not allow filters to be pushed below it.
2729+
assert!(matches!(
2730+
desc.parent_filters()[0][0].discriminant,
2731+
PushedDown::No
2732+
));
2733+
Ok(())
2734+
}
2735+
2736+
#[test]
2737+
fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> {
2738+
let sort = make_sort_exec_with_fetch(None);
2739+
let desc = sort.gather_filters_for_pushdown(
2740+
FilterPushdownPhase::Pre,
2741+
vec![Arc::new(Column::new("a", 0))],
2742+
&ConfigOptions::new(),
2743+
)?;
2744+
// Plain sort (no fetch) is filter-commutative.
2745+
assert!(matches!(
2746+
desc.parent_filters()[0][0].discriminant,
2747+
PushedDown::Yes
2748+
));
2749+
Ok(())
2750+
}
2751+
2752+
#[test]
2753+
fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() -> Result<()> {
2754+
let sort = make_sort_exec_with_fetch(Some(10));
2755+
assert!(sort.filter.is_some(), "TopK filter should be created");
2756+
2757+
let mut config = ConfigOptions::new();
2758+
config.optimizer.enable_topk_dynamic_filter_pushdown = true;
2759+
let desc = sort.gather_filters_for_pushdown(
2760+
FilterPushdownPhase::Post,
2761+
vec![Arc::new(Column::new("a", 0))],
2762+
&config,
2763+
)?;
2764+
// Parent filters are still blocked in the Post phase.
2765+
assert!(matches!(
2766+
desc.parent_filters()[0][0].discriminant,
2767+
PushedDown::No
2768+
));
2769+
// But the TopK self-filter should be pushed down.
2770+
assert_eq!(desc.self_filters()[0].len(), 1);
2771+
Ok(())
2772+
}
26942773
}

datafusion/sqllogictest/test_files/limit.slt

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,45 @@ limit 1000;
869869
statement ok
870870
DROP TABLE test_limit_with_partitions;
871871

872+
# Tests for filter pushdown behavior with Sort + LIMIT (fetch).
873+
874+
statement ok
875+
CREATE TABLE t(id INT, value INT) AS VALUES
876+
(1, 100),
877+
(2, 200),
878+
(3, 300),
879+
(4, 400),
880+
(5, 500);
881+
882+
# Take the 3 smallest values (100, 200, 300), then filter value > 200.
883+
query II
884+
SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
885+
----
886+
3 300
887+
888+
# Take the 3 largest values (500, 400, 300), then filter value < 400.
889+
query II
890+
SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400;
891+
----
892+
3 300
893+
894+
# The filter stays above the sort+fetch in the plan.
895+
query TT
896+
EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
897+
----
898+
logical_plan
899+
01)SubqueryAlias: sub
900+
02)--Filter: t.value > Int32(200)
901+
03)----Sort: t.value ASC NULLS LAST, fetch=3
902+
04)------TableScan: t projection=[id, value]
903+
physical_plan
904+
01)FilterExec: value@1 > 200
905+
02)--SortExec: TopK(fetch=3), expr=[value@1 ASC NULLS LAST], preserve_partitioning=[false]
906+
03)----DataSourceExec: partitions=1, partition_sizes=[1]
907+
908+
statement ok
909+
DROP TABLE t;
910+
872911
# Tear down src_table table:
873912
statement ok
874913
DROP TABLE src_table;

datafusion/sqllogictest/test_files/window.slt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
31983198
----
31993199
logical_plan
32003200
01)Sort: rn1 ASC NULLS LAST
3201-
02)--Sort: rn1 ASC NULLS LAST, fetch=5
3202-
03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
3203-
04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
3201+
02)--Filter: rn1 < UInt64(50)
3202+
03)----Sort: rn1 ASC NULLS LAST, fetch=5
3203+
04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
32043204
05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
32053205
06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
32063206
physical_plan
3207-
01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3208-
02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5
3209-
03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3210-
04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
3207+
01)FilterExec: rn1@5 < 50
3208+
02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3209+
03)----GlobalLimitExec: skip=0, fetch=5
3210+
04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3211+
05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
32113212

32123213
# Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required
32133214
# global order. The existing sort is for the second-term lexicographical ordering requirement, which is being

0 commit comments

Comments
 (0)