Skip to content

Commit 10fae81

Browse files
shivbhatia10Shiv Bhatia
andauthored
Fix push_down_filter for children with non-empty fetch fields (#21057)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #21063 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Currently if we see a filter with a limit underneath, we don't push the filter past the limit. However, sort nodes and table scan nodes can have fetch fields which do essentially the same thing, and we don't stop filters being pushed past them. This is a correctness bug that can lead to undefined behaviour. I added checks for exactly this condition so we don't push the filter down. I think the prior expectation was that there would be a limit node between any of these nodes, but this is also not true. In `push_down_limit.rs`, there's code that does this optimisation when a limit has a sort under it: ``` LogicalPlan::Sort(mut sort) => { let new_fetch = { let sort_fetch = skip + fetch; Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch)) }; if new_fetch == sort.fetch { if skip > 0 { original_limit(skip, fetch, LogicalPlan::Sort(sort)) } else { Ok(Transformed::yes(LogicalPlan::Sort(sort))) } } else { sort.fetch = new_fetch; limit.input = Arc::new(LogicalPlan::Sort(sort)); Ok(Transformed::yes(LogicalPlan::Limit(limit))) } } ``` The first time this runs, it sets the internal fetch of the sort to new_fetch, and on the second optimisation pass it hits the branch where we just get rid of the limit node altogether, leaving the sort node exposed to potential filters which can now push down into it. There is also a related fix in `gather_filters_for_pushdown` in `SortExec`, which does the same thing for physical plan nodes. If we see that a given execution plan has non-empty fetch, it should not allow any parent filters to be pushed down. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Added checks in the optimisation rule to avoid pushing filters past children with built-in limits. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes: - Unit tests in `push_down_filter.rs` - Fixed an existing test in `window.slt` - Unit tests for the physical plan change in `sort.rs` - New slt test in `push_down_filter_sort_fetch.slt` for this exact behaviour ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> No <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Shiv Bhatia <sbhatia@palantir.com>
1 parent dc9098e commit 10fae81

File tree

5 files changed

+274
-14
lines changed

5 files changed

+274
-14
lines changed

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1392,6 +1392,82 @@ impl LogicalPlan {
13921392
}
13931393
}
13941394

1395+
/// Returns the skip (offset) of this plan node, if it has one.
1396+
///
1397+
/// Only [`LogicalPlan::Limit`] carries a skip value; all other variants
1398+
/// return `Ok(None)`. Returns `Ok(None)` for a zero skip.
1399+
pub fn skip(&self) -> Result<Option<usize>> {
1400+
match self {
1401+
LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
1402+
SkipType::Literal(0) => Ok(None),
1403+
SkipType::Literal(n) => Ok(Some(n)),
1404+
SkipType::UnsupportedExpr => Ok(None),
1405+
},
1406+
LogicalPlan::Sort(_) => Ok(None),
1407+
LogicalPlan::TableScan(_) => Ok(None),
1408+
LogicalPlan::Projection(_) => Ok(None),
1409+
LogicalPlan::Filter(_) => Ok(None),
1410+
LogicalPlan::Window(_) => Ok(None),
1411+
LogicalPlan::Aggregate(_) => Ok(None),
1412+
LogicalPlan::Join(_) => Ok(None),
1413+
LogicalPlan::Repartition(_) => Ok(None),
1414+
LogicalPlan::Union(_) => Ok(None),
1415+
LogicalPlan::EmptyRelation(_) => Ok(None),
1416+
LogicalPlan::Subquery(_) => Ok(None),
1417+
LogicalPlan::SubqueryAlias(_) => Ok(None),
1418+
LogicalPlan::Statement(_) => Ok(None),
1419+
LogicalPlan::Values(_) => Ok(None),
1420+
LogicalPlan::Explain(_) => Ok(None),
1421+
LogicalPlan::Analyze(_) => Ok(None),
1422+
LogicalPlan::Extension(_) => Ok(None),
1423+
LogicalPlan::Distinct(_) => Ok(None),
1424+
LogicalPlan::Dml(_) => Ok(None),
1425+
LogicalPlan::Ddl(_) => Ok(None),
1426+
LogicalPlan::Copy(_) => Ok(None),
1427+
LogicalPlan::DescribeTable(_) => Ok(None),
1428+
LogicalPlan::Unnest(_) => Ok(None),
1429+
LogicalPlan::RecursiveQuery(_) => Ok(None),
1430+
}
1431+
}
1432+
1433+
/// Returns the fetch (limit) of this plan node, if it has one.
1434+
///
1435+
/// [`LogicalPlan::Sort`], [`LogicalPlan::TableScan`], and
1436+
/// [`LogicalPlan::Limit`] may carry a fetch value; all other variants
1437+
/// return `Ok(None)`.
1438+
pub fn fetch(&self) -> Result<Option<usize>> {
1439+
match self {
1440+
LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
1441+
LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
1442+
LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
1443+
FetchType::Literal(s) => Ok(s),
1444+
FetchType::UnsupportedExpr => Ok(None),
1445+
},
1446+
LogicalPlan::Projection(_) => Ok(None),
1447+
LogicalPlan::Filter(_) => Ok(None),
1448+
LogicalPlan::Window(_) => Ok(None),
1449+
LogicalPlan::Aggregate(_) => Ok(None),
1450+
LogicalPlan::Join(_) => Ok(None),
1451+
LogicalPlan::Repartition(_) => Ok(None),
1452+
LogicalPlan::Union(_) => Ok(None),
1453+
LogicalPlan::EmptyRelation(_) => Ok(None),
1454+
LogicalPlan::Subquery(_) => Ok(None),
1455+
LogicalPlan::SubqueryAlias(_) => Ok(None),
1456+
LogicalPlan::Statement(_) => Ok(None),
1457+
LogicalPlan::Values(_) => Ok(None),
1458+
LogicalPlan::Explain(_) => Ok(None),
1459+
LogicalPlan::Analyze(_) => Ok(None),
1460+
LogicalPlan::Extension(_) => Ok(None),
1461+
LogicalPlan::Distinct(_) => Ok(None),
1462+
LogicalPlan::Dml(_) => Ok(None),
1463+
LogicalPlan::Ddl(_) => Ok(None),
1464+
LogicalPlan::Copy(_) => Ok(None),
1465+
LogicalPlan::DescribeTable(_) => Ok(None),
1466+
LogicalPlan::Unnest(_) => Ok(None),
1467+
LogicalPlan::RecursiveQuery(_) => Ok(None),
1468+
}
1469+
}
1470+
13951471
/// If this node's expressions contains any references to an outer subquery
13961472
pub fn contains_outer_reference(&self) -> bool {
13971473
let mut contains = false;

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,13 @@ impl OptimizerRule for PushDownFilter {
796796
filter.predicate = new_predicate;
797797
}
798798

799+
// If the child has a fetch (limit) or skip (offset), pushing a filter
800+
// below it would change semantics: the limit/offset should apply before
801+
// the filter, not after.
802+
if filter.input.fetch()?.is_some() || filter.input.skip()?.is_some() {
803+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
804+
}
805+
799806
match Arc::unwrap_or_clone(filter.input) {
800807
LogicalPlan::Filter(child_filter) => {
801808
let parents_predicates = split_conjunction_owned(filter.predicate);
@@ -4315,4 +4322,63 @@ mod tests {
43154322
"
43164323
)
43174324
}
4325+
4326+
#[test]
4327+
fn filter_not_pushed_down_through_table_scan_with_fetch() -> Result<()> {
4328+
let scan = test_table_scan()?;
4329+
let scan_with_fetch = match scan {
4330+
LogicalPlan::TableScan(scan) => LogicalPlan::TableScan(TableScan {
4331+
fetch: Some(10),
4332+
..scan
4333+
}),
4334+
_ => unreachable!(),
4335+
};
4336+
let plan = LogicalPlanBuilder::from(scan_with_fetch)
4337+
.filter(col("a").gt(lit(10i64)))?
4338+
.build()?;
4339+
// Filter must NOT be pushed into the table scan when it has a fetch (limit)
4340+
assert_optimized_plan_equal!(
4341+
plan,
4342+
@r"
4343+
Filter: test.a > Int64(10)
4344+
TableScan: test, fetch=10
4345+
"
4346+
)
4347+
}
4348+
4349+
#[test]
4350+
fn filter_push_down_through_sort_without_fetch() -> Result<()> {
4351+
let table_scan = test_table_scan()?;
4352+
let plan = LogicalPlanBuilder::from(table_scan)
4353+
.sort(vec![col("a").sort(true, true)])?
4354+
.filter(col("a").gt(lit(10i64)))?
4355+
.build()?;
4356+
// Filter should be pushed below the sort
4357+
assert_optimized_plan_equal!(
4358+
plan,
4359+
@r"
4360+
Sort: test.a ASC NULLS FIRST
4361+
TableScan: test, full_filters=[test.a > Int64(10)]
4362+
"
4363+
)
4364+
}
4365+
4366+
#[test]
4367+
fn filter_not_pushed_down_through_sort_with_fetch() -> Result<()> {
4368+
let table_scan = test_table_scan()?;
4369+
let plan = LogicalPlanBuilder::from(table_scan)
4370+
.sort_with_limit(vec![col("a").sort(true, true)], Some(5))?
4371+
.filter(col("a").gt(lit(10i64)))?
4372+
.build()?;
4373+
// Filter must NOT be pushed below the sort when it has a fetch (limit),
4374+
// because the limit should apply before the filter.
4375+
assert_optimized_plan_equal!(
4376+
plan,
4377+
@r"
4378+
Filter: test.a > Int64(10)
4379+
Sort: test.a ASC NULLS FIRST, fetch=5
4380+
TableScan: test
4381+
"
4382+
)
4383+
}
43184384
}

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,11 +1405,22 @@ impl ExecutionPlan for SortExec {
14051405
config: &datafusion_common::config::ConfigOptions,
14061406
) -> Result<FilterDescription> {
14071407
if phase != FilterPushdownPhase::Post {
1408+
if self.fetch.is_some() {
1409+
return Ok(FilterDescription::all_unsupported(
1410+
&parent_filters,
1411+
&self.children(),
1412+
));
1413+
}
14081414
return FilterDescription::from_children(parent_filters, &self.children());
14091415
}
14101416

1411-
let mut child =
1412-
ChildFilterDescription::from_child(&parent_filters, self.input())?;
1417+
// In Post phase: block parent filters when fetch is set,
1418+
// but still push the TopK dynamic filter (self-filter).
1419+
let mut child = if self.fetch.is_some() {
1420+
ChildFilterDescription::all_unsupported(&parent_filters)
1421+
} else {
1422+
ChildFilterDescription::from_child(&parent_filters, self.input())?
1423+
};
14131424

14141425
if let Some(filter) = &self.filter
14151426
&& config.optimizer.enable_topk_dynamic_filter_pushdown
@@ -1430,7 +1441,10 @@ mod tests {
14301441
use super::*;
14311442
use crate::coalesce_partitions::CoalescePartitionsExec;
14321443
use crate::collect;
1444+
use crate::empty::EmptyExec;
1445+
use crate::execution_plan::Boundedness;
14331446
use crate::expressions::col;
1447+
use crate::filter_pushdown::{FilterPushdownPhase, PushedDown};
14341448
use crate::test;
14351449
use crate::test::TestMemoryExec;
14361450
use crate::test::exec::{BlockingExec, assert_strong_count_converges_to_zero};
@@ -1441,14 +1455,18 @@ mod tests {
14411455
use arrow::datatypes::*;
14421456
use datafusion_common::ScalarValue;
14431457
use datafusion_common::cast::as_primitive_array;
1458+
use datafusion_common::config::ConfigOptions;
14441459
use datafusion_common::test_util::batches_to_string;
14451460
use datafusion_execution::RecordBatchStream;
14461461
use datafusion_execution::config::SessionConfig;
1462+
use datafusion_execution::memory_pool::{
1463+
GreedyMemoryPool, MemoryConsumer, MemoryPool,
1464+
};
14471465
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
14481466
use datafusion_physical_expr::EquivalenceProperties;
14491467
use datafusion_physical_expr::expressions::{Column, Literal};
14501468

1451-
use futures::{FutureExt, Stream};
1469+
use futures::{FutureExt, Stream, TryStreamExt};
14521470
use insta::assert_snapshot;
14531471

14541472
#[derive(Debug, Clone)]
@@ -2747,10 +2765,6 @@ mod tests {
27472765
/// those bytes become unaccounted-for reserved memory that nobody uses.
27482766
#[tokio::test]
27492767
async fn test_sort_merge_reservation_transferred_not_freed() -> Result<()> {
2750-
use datafusion_execution::memory_pool::{
2751-
GreedyMemoryPool, MemoryConsumer, MemoryPool,
2752-
};
2753-
27542768
let sort_spill_reservation_bytes: usize = 10 * 1024; // 10 KB
27552769

27562770
// Pool: merge reservation (10KB) + enough room for sort to work.
@@ -2861,4 +2875,68 @@ mod tests {
28612875
drop(contender);
28622876
Ok(())
28632877
}
2878+
2879+
fn make_sort_exec_with_fetch(fetch: Option<usize>) -> SortExec {
2880+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
2881+
let input = Arc::new(EmptyExec::new(schema));
2882+
SortExec::new(
2883+
[PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)))].into(),
2884+
input,
2885+
)
2886+
.with_fetch(fetch)
2887+
}
2888+
2889+
#[test]
2890+
fn test_sort_with_fetch_blocks_filter_pushdown() -> Result<()> {
2891+
let sort = make_sort_exec_with_fetch(Some(10));
2892+
let desc = sort.gather_filters_for_pushdown(
2893+
FilterPushdownPhase::Pre,
2894+
vec![Arc::new(Column::new("a", 0))],
2895+
&ConfigOptions::new(),
2896+
)?;
2897+
// Sort with fetch (TopK) must not allow filters to be pushed below it.
2898+
assert!(matches!(
2899+
desc.parent_filters()[0][0].discriminant,
2900+
PushedDown::No
2901+
));
2902+
Ok(())
2903+
}
2904+
2905+
#[test]
2906+
fn test_sort_without_fetch_allows_filter_pushdown() -> Result<()> {
2907+
let sort = make_sort_exec_with_fetch(None);
2908+
let desc = sort.gather_filters_for_pushdown(
2909+
FilterPushdownPhase::Pre,
2910+
vec![Arc::new(Column::new("a", 0))],
2911+
&ConfigOptions::new(),
2912+
)?;
2913+
// Plain sort (no fetch) is filter-commutative.
2914+
assert!(matches!(
2915+
desc.parent_filters()[0][0].discriminant,
2916+
PushedDown::Yes
2917+
));
2918+
Ok(())
2919+
}
2920+
2921+
#[test]
2922+
fn test_sort_with_fetch_allows_topk_self_filter_in_post_phase() -> Result<()> {
2923+
let sort = make_sort_exec_with_fetch(Some(10));
2924+
assert!(sort.filter.is_some(), "TopK filter should be created");
2925+
2926+
let mut config = ConfigOptions::new();
2927+
config.optimizer.enable_topk_dynamic_filter_pushdown = true;
2928+
let desc = sort.gather_filters_for_pushdown(
2929+
FilterPushdownPhase::Post,
2930+
vec![Arc::new(Column::new("a", 0))],
2931+
&config,
2932+
)?;
2933+
// Parent filters are still blocked in the Post phase.
2934+
assert!(matches!(
2935+
desc.parent_filters()[0][0].discriminant,
2936+
PushedDown::No
2937+
));
2938+
// But the TopK self-filter should be pushed down.
2939+
assert_eq!(desc.self_filters()[0].len(), 1);
2940+
Ok(())
2941+
}
28642942
}

datafusion/sqllogictest/test_files/limit.slt

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -885,6 +885,45 @@ reset datafusion.optimizer.repartition_file_min_size;
885885
statement ok
886886
DROP TABLE test_limit_with_partitions;
887887

888+
# Tests for filter pushdown behavior with Sort + LIMIT (fetch).
889+
890+
statement ok
891+
CREATE TABLE t(id INT, value INT) AS VALUES
892+
(1, 100),
893+
(2, 200),
894+
(3, 300),
895+
(4, 400),
896+
(5, 500);
897+
898+
# Take the 3 smallest values (100, 200, 300), then filter value > 200.
899+
query II
900+
SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
901+
----
902+
3 300
903+
904+
# Take the 3 largest values (500, 400, 300), then filter value < 400.
905+
query II
906+
SELECT * FROM (SELECT * FROM t ORDER BY value DESC LIMIT 3) sub WHERE sub.value < 400;
907+
----
908+
3 300
909+
910+
# The filter stays above the sort+fetch in the plan.
911+
query TT
912+
EXPLAIN SELECT * FROM (SELECT * FROM t ORDER BY value LIMIT 3) sub WHERE sub.value > 200;
913+
----
914+
logical_plan
915+
01)SubqueryAlias: sub
916+
02)--Filter: t.value > Int32(200)
917+
03)----Sort: t.value ASC NULLS LAST, fetch=3
918+
04)------TableScan: t projection=[id, value]
919+
physical_plan
920+
01)FilterExec: value@1 > 200
921+
02)--SortExec: TopK(fetch=3), expr=[value@1 ASC NULLS LAST], preserve_partitioning=[false]
922+
03)----DataSourceExec: partitions=1, partition_sizes=[1]
923+
924+
statement ok
925+
DROP TABLE t;
926+
888927
# Tear down src_table table:
889928
statement ok
890929
DROP TABLE src_table;

datafusion/sqllogictest/test_files/window.slt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3198,16 +3198,17 @@ EXPLAIN SELECT * FROM (SELECT *, ROW_NUMBER() OVER(ORDER BY a ASC) as rn1
31983198
----
31993199
logical_plan
32003200
01)Sort: rn1 ASC NULLS LAST
3201-
02)--Sort: rn1 ASC NULLS LAST, fetch=5
3202-
03)----Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
3203-
04)------Filter: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW < UInt64(50)
3201+
02)--Filter: rn1 < UInt64(50)
3202+
03)----Sort: rn1 ASC NULLS LAST, fetch=5
3203+
04)------Projection: annotated_data_infinite2.a0, annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.c, annotated_data_infinite2.d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn1
32043204
05)--------WindowAggr: windowExpr=[[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
32053205
06)----------TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
32063206
physical_plan
3207-
01)ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3208-
02)--FilterExec: row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 < 50, fetch=5
3209-
03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3210-
04)------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
3207+
01)FilterExec: rn1@5 < 50
3208+
02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
3209+
03)----GlobalLimitExec: skip=0, fetch=5
3210+
04)------BoundedWindowAggExec: wdw=[row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Field { "row_number() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW": UInt64 }, frame: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
3211+
05)--------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
32113212

32123213
# Top level sort is pushed down through BoundedWindowAggExec as its SUM result does already satisfy the required
32133214
# global order. The existing sort is for the second-term lexicographical ordering requirement, which is being

0 commit comments

Comments
 (0)