Skip to content

Commit bd904b3

Browse files
authored
feat: eliminate GlobalLimitExec when input statistics prove limit is already satisfied (#22150)
## Which issue does this PR close? - Closes #. ## Rationale for this change `GlobalLimitExec` (and `LocalLimitExec`) are sometimes redundant: if the input can be proven via exact statistics to produce no more rows than the fetch value, the limit node does nothing and should be removed entirely. Previously, the `LimitPushdown` rule had no mechanism to eliminate such trivially-satisfied limits. A query like `SELECT * FROM (VALUES ...) LIMIT 10` — where the input is a single-row `PlaceholderRowExec` — still carried an unnecessary `GlobalLimitExec` in the physical plan. Similarly, a `LIMIT N` over an `EmptyExec` or any zero-row plan was retained. ## What changes are included in this PR? - Adds `limit_satisfied_by_input()` in `limit_pushdown.rs`: checks whether a plan's child provably produces at most `fetch` rows (requires `skip == 0` and a single output partition). - Adds `limit_eliminable_exact_num_rows()`: iteratively unwraps `ProjectionExec` wrappers and recognises `EmptyExec` (0 rows), `PlaceholderRowExec` (1 row), and any plan reporting `Precision::Exact(0)` rows as eliminable producers. - When a limit is statically satisfied, marks `global_state.satisfied = true` and returns early — **without** resetting `fetch`/`skip` — so nested limit nodes still receive the correct outer constraints to merge against. - Updates the `merges_local_limit_with_local_limit` snapshot: the result is now bare `EmptyExec` (limit eliminated). - Updates `union.slt`: `ProjectionExec` over `PlaceholderRowExec` (1 row) with `fetch=3` no longer carries a redundant `GlobalLimitExec`. - Adds `explain_tree.slt` test: `SELECT count(*) … LIMIT 10` over a two-row VALUES clause is correctly reduced to `ProjectionExec → PlaceholderRowExec` with no limit node. - Updates copy.slt: `fetch=10` is now correctly pushed all the way down to `DataSourceExec`. ## Are these changes tested? Yes. - `cargo fmt --all` - `cargo clippy --all-targets --all-features -- -D warnings` - `cargo test -p datafusion-core --test physical_optimizer limit` - `cargo test --features backtrace,parquet_encryption --profile ci --package datafusion-sqllogictest --test sqllogictests -- copy.slt union.slt explain_tree.slt` ## Are there any user-facing changes? No API changes. Physical plans for queries with `LIMIT` over statically small inputs (`EmptyExec`, `PlaceholderRowExec`, or zero-row tables) will now have the redundant `GlobalLimitExec`/`LocalLimitExec` nodes eliminated, resulting in simpler and slightly more efficient plans.
1 parent 9d92944 commit bd904b3

5 files changed

Lines changed: 97 additions & 12 deletions

File tree

datafusion/core/tests/physical_optimizer/limit_pushdown.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -467,10 +467,7 @@ fn merges_local_limit_with_local_limit() -> Result<()> {
467467
let optimized = format_plan(&after_optimize);
468468
insta::assert_snapshot!(
469469
optimized,
470-
@r"
471-
GlobalLimitExec: skip=0, fetch=10
472-
EmptyExec
473-
"
470+
@"EmptyExec"
474471
);
475472

476473
Ok(())

datafusion/physical-optimizer/src/limit_pushdown.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,14 @@ use crate::PhysicalOptimizerRule;
6767

6868
use datafusion_common::config::ConfigOptions;
6969
use datafusion_common::error::Result;
70+
use datafusion_common::stats::Precision;
7071
use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
7172
use datafusion_common::utils::combine_limit;
7273
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
74+
use datafusion_physical_plan::empty::EmptyExec;
7375
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
76+
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
77+
use datafusion_physical_plan::projection::ProjectionExec;
7478
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
7579
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
7680
/// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from
@@ -158,6 +162,25 @@ pub fn pushdown_limit_helper(
158162
global_state.preserve_order = limit_info.preserve_order;
159163
global_state.satisfied = false;
160164

165+
if let Some(fetch) = fetch
166+
&& limit_satisfied_by_input(&limit_info.input, skip, fetch)?
167+
{
168+
// The input already produces at most `fetch` rows, so no new limit
169+
// node is needed. Mark satisfied so downstream won't re-add one,
170+
// but preserve skip/fetch so any nested limit nodes (e.g. an inner
171+
// GlobalLimitExec) can still be merged with the outer constraint.
172+
global_state.satisfied = true;
173+
174+
return Ok((
175+
Transformed {
176+
data: limit_info.input,
177+
transformed: true,
178+
tnr: TreeNodeRecursion::Stop,
179+
},
180+
global_state,
181+
));
182+
}
183+
161184
// Now the global state has the most recent information, we can remove
162185
// the limit node. We will decide later if we should add it again or
163186
// not.
@@ -284,6 +307,59 @@ pub fn pushdown_limit_helper(
284307
}
285308
}
286309

310+
/// Returns true if exact input statistics prove that applying the limit would
311+
/// not remove any rows.
312+
fn limit_satisfied_by_input(
313+
plan: &Arc<dyn ExecutionPlan>,
314+
skip: usize,
315+
fetch: usize,
316+
) -> Result<bool> {
317+
if skip > 0 {
318+
return Ok(false);
319+
}
320+
321+
if plan.output_partitioning().partition_count() != 1 {
322+
return Ok(false);
323+
}
324+
325+
let Some(num_rows) = limit_eliminable_exact_num_rows(plan)? else {
326+
return Ok(false);
327+
};
328+
329+
Ok(num_rows <= fetch)
330+
}
331+
332+
/// Returns exact row counts only from a conservative whitelist of operators
333+
/// whose row-count guarantees are strong enough to remove a limit.
334+
fn limit_eliminable_exact_num_rows(
335+
plan: &Arc<dyn ExecutionPlan>,
336+
) -> Result<Option<usize>> {
337+
// Unwrap any wrapping ProjectionExec layers; projections preserve row count
338+
// but may derive statistics in ways that are not trustworthy, so we peek
339+
// through them to the underlying producer.
340+
let mut current = plan;
341+
while let Some(projection) = current.downcast_ref::<ProjectionExec>() {
342+
current = projection.input();
343+
}
344+
345+
if current.is::<EmptyExec>() {
346+
return Ok(Some(0));
347+
}
348+
349+
if current.is::<PlaceholderRowExec>() {
350+
return Ok(Some(1));
351+
}
352+
353+
if matches!(
354+
current.partition_statistics(None)?.num_rows,
355+
Precision::Exact(0)
356+
) {
357+
return Ok(Some(0));
358+
}
359+
360+
Ok(None)
361+
}
362+
287363
/// Pushes down the limit through the plan.
288364
pub(crate) fn pushdown_limits(
289365
pushdown_plan: Arc<dyn ExecutionPlan>,

datafusion/sqllogictest/test_files/explain_tree.slt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1803,6 +1803,19 @@ physical_plan
18031803
07)│ PlaceholderRowExec │
18041804
08)└───────────────────────────┘
18051805

1806+
query TT
1807+
EXPLAIN select count(*) from (values ('a', 'b'), ('c', 'd')) as t (c1, c2) order by 1 limit 10
1808+
----
1809+
physical_plan
1810+
01)┌───────────────────────────┐
1811+
02)│ ProjectionExec │
1812+
03)│ -------------------- │
1813+
04)│ count(*): 2 │
1814+
05)└─────────────┬─────────────┘
1815+
06)┌─────────────┴─────────────┐
1816+
07)│ PlaceholderRowExec │
1817+
08)└───────────────────────────┘
1818+
18061819

18071820
# Test explain for large plans
18081821

datafusion/sqllogictest/test_files/push_down_filter_regression.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ STORED AS PARQUET;
393393

394394
statement ok
395395
COPY (
396-
SELECT * FROM (VALUES (3, 6, 90), (8, 12, 110)) AS v(a, b, c)
396+
SELECT * FROM (VALUES (1, 6, 90), (8, 12, 110)) AS v(a, b, c)
397397
) TO 'test_files/scratch/push_down_filter_regression/agg_dyn_mixed/file_1.parquet'
398398
STORED AS PARQUET;
399399

datafusion/sqllogictest/test_files/union.slt

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -530,13 +530,12 @@ physical_plan
530530
13)------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
531531
14)--------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], file_type=csv, has_header=true
532532
15)----ProjectionExec: expr=[1 as cnt]
533-
16)------GlobalLimitExec: skip=0, fetch=3
534-
17)--------PlaceholderRowExec
535-
18)----ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
536-
19)------GlobalLimitExec: skip=0, fetch=3
537-
20)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
538-
21)----------ProjectionExec: expr=[1 as c1]
539-
22)------------PlaceholderRowExec
533+
16)------PlaceholderRowExec
534+
17)----ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
535+
18)------GlobalLimitExec: skip=0, fetch=3
536+
19)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
537+
20)----------ProjectionExec: expr=[1 as c1]
538+
21)------------PlaceholderRowExec
540539

541540

542541
########

0 commit comments

Comments
 (0)