Skip to content

Commit b508f1f

Browse files
alambkumarUjjawal
andauthored
[branch-54] fix: clear handled OFFSET before child recursion in LimitPushdown (#22525) (#22631)
- Part of #21080 This PR: - Backports #22525 from @kumarUjjawal to the `branch-54` line Co-authored-by: Kumar Ujjawal <ujjawalpathak6@gmail.com>
1 parent dbd97e6 commit b508f1f

3 files changed

Lines changed: 183 additions & 0 deletions

File tree

datafusion/core/tests/physical_optimizer/limit_pushdown.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,3 +714,123 @@ fn no_limit_preserves_plan_identity() -> Result<()> {
714714

715715
Ok(())
716716
}
717+
718+
#[test]
719+
fn outer_offset_does_not_leak_through_sort_into_inner_limit() -> Result<()> {
720+
// Regression test for https://github.com/apache/datafusion/issues/22489
721+
//
722+
// When an outer OFFSET is separated from an inner LIMIT by a SortExec
723+
// with different sort keys, the outer skip must not reduce the inner
724+
// fetch. Before the fix, combine_limit merged them, producing
725+
// GlobalLimitExec(skip=1, fetch=7) instead of preserving the inner
726+
// LIMIT 8.
727+
//
728+
// Plan structure:
729+
// GlobalLimitExec: skip=1, fetch=None (outer OFFSET 1)
730+
// SortExec: [c1 DESC] (outer sort — different key)
731+
// GlobalLimitExec: skip=0, fetch=8 (inner LIMIT 8)
732+
// SortExec: [c2 ASC] (inner sort — different key)
733+
// EmptyExec
734+
let schema = create_schema();
735+
let empty = empty_exec(Arc::clone(&schema));
736+
737+
let inner_ordering: LexOrdering = [PhysicalSortExpr {
738+
expr: col("c2", &schema)?,
739+
options: SortOptions::default(),
740+
}]
741+
.into();
742+
let inner_sort = sort_exec(inner_ordering, empty);
743+
let inner_limit = global_limit_exec(inner_sort, 0, Some(8));
744+
745+
let outer_ordering: LexOrdering = [PhysicalSortExpr {
746+
expr: col("c1", &schema)?,
747+
options: SortOptions {
748+
descending: true,
749+
nulls_first: false,
750+
},
751+
}]
752+
.into();
753+
let outer_sort = sort_exec(outer_ordering, inner_limit);
754+
let outer_limit = global_limit_exec(outer_sort, 1, None);
755+
756+
let initial = format_plan(&outer_limit);
757+
insta::assert_snapshot!(
758+
initial,
759+
@r"
760+
GlobalLimitExec: skip=1, fetch=None
761+
SortExec: expr=[c1@0 DESC NULLS LAST], preserve_partitioning=[false]
762+
GlobalLimitExec: skip=0, fetch=8
763+
SortExec: expr=[c2@1 ASC], preserve_partitioning=[false]
764+
EmptyExec
765+
"
766+
);
767+
768+
let after_optimize =
769+
LimitPushdown::new().optimize(outer_limit, &ConfigOptions::new())?;
770+
let optimized = format_plan(&after_optimize);
771+
insta::assert_snapshot!(
772+
optimized,
773+
@r"
774+
GlobalLimitExec: skip=1, fetch=None
775+
SortExec: expr=[c1@0 DESC NULLS LAST], preserve_partitioning=[false]
776+
SortExec: TopK(fetch=8), expr=[c2@1 ASC], preserve_partitioning=[false]
777+
EmptyExec
778+
"
779+
);
780+
781+
Ok(())
782+
}
783+
784+
#[test]
785+
fn outer_offset_with_same_sort_key_still_pushes_limit() -> Result<()> {
786+
// Companion to outer_offset_does_not_leak_through_sort_into_inner_limit:
787+
// when both sorts use the *same* key, the inner LIMIT should still be
788+
// pushed into the SortExec as TopK.
789+
//
790+
// Plan structure:
791+
// GlobalLimitExec: skip=1, fetch=None (outer OFFSET 1)
792+
// SortExec: [c1 ASC] (outer sort — same key)
793+
// GlobalLimitExec: skip=0, fetch=8 (inner LIMIT 8)
794+
// SortExec: [c1 ASC] (inner sort — same key)
795+
// EmptyExec
796+
let schema = create_schema();
797+
let empty = empty_exec(Arc::clone(&schema));
798+
799+
let ordering: LexOrdering = [PhysicalSortExpr {
800+
expr: col("c1", &schema)?,
801+
options: SortOptions::default(),
802+
}]
803+
.into();
804+
805+
let inner_sort = sort_exec(ordering.clone(), empty);
806+
let inner_limit = global_limit_exec(inner_sort, 0, Some(8));
807+
let outer_sort = sort_exec(ordering, inner_limit);
808+
let outer_limit = global_limit_exec(outer_sort, 1, None);
809+
810+
let initial = format_plan(&outer_limit);
811+
insta::assert_snapshot!(
812+
initial,
813+
@r"
814+
GlobalLimitExec: skip=1, fetch=None
815+
SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]
816+
GlobalLimitExec: skip=0, fetch=8
817+
SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]
818+
EmptyExec
819+
"
820+
);
821+
822+
let after_optimize =
823+
LimitPushdown::new().optimize(outer_limit, &ConfigOptions::new())?;
824+
let optimized = format_plan(&after_optimize);
825+
insta::assert_snapshot!(
826+
optimized,
827+
@r"
828+
GlobalLimitExec: skip=1, fetch=None
829+
SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]
830+
SortExec: TopK(fetch=8), expr=[c1@0 ASC], preserve_partitioning=[false]
831+
EmptyExec
832+
"
833+
);
834+
835+
Ok(())
836+
}

datafusion/physical-optimizer/src/limit_pushdown.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,14 @@ pub(crate) fn pushdown_limits(
375375
(new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
376376
}
377377

378+
// Once a limit has been materialized above the current node, child
379+
// subtrees should not inherit its `skip`. Keep `fetch`, but clear
380+
// `skip` before recursing so child-local limits are not merged with
381+
// an `OFFSET` that has already been applied.
382+
if global_state.satisfied {
383+
global_state.skip = 0;
384+
}
385+
378386
// Apply pushdown limits in children
379387
let children = new_node.data.children();
380388
let mut changed = false;

datafusion/sqllogictest/test_files/limit.slt

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -989,3 +989,58 @@ c-4
989989

990990
statement ok
991991
DROP TABLE t21176;
992+
993+
# Regression test for https://github.com/apache/datafusion/issues/22489
994+
# An outer ORDER BY / OFFSET must not reduce an inner LIMIT when the two are
995+
# separated by a sort on a *different* key.
996+
997+
statement ok
998+
set datafusion.execution.target_partitions = 4;
999+
1000+
statement ok
1001+
CREATE TABLE t22489 (g INT, x INT, y INT) AS VALUES (1, 10, 4), (2, 20, 3), (3, 30, 2), (4, 40, 1);
1002+
1003+
# Inner ORDER BY sx DESC LIMIT 4 keeps all four groups; the outer ORDER BY
1004+
# sy DESC OFFSET 1 then drops only the sy-max group (g=1), so g=2,3,4 remain.
1005+
query III
1006+
SELECT * FROM (
1007+
SELECT g, SUM(x) AS sx, SUM(y) AS sy FROM t22489 GROUP BY g
1008+
ORDER BY sx DESC LIMIT 4
1009+
) q
1010+
ORDER BY sy DESC
1011+
OFFSET 1;
1012+
----
1013+
2 20 3
1014+
3 30 2
1015+
4 40 1
1016+
1017+
query TT
1018+
EXPLAIN
1019+
SELECT * FROM (
1020+
SELECT g, SUM(x) AS sx, SUM(y) AS sy FROM t22489 GROUP BY g
1021+
ORDER BY sx DESC LIMIT 4
1022+
) q
1023+
ORDER BY sy DESC
1024+
OFFSET 1;
1025+
----
1026+
logical_plan
1027+
01)Limit: skip=1, fetch=None
1028+
02)--Sort: q.sy DESC NULLS FIRST
1029+
03)----SubqueryAlias: q
1030+
04)------Sort: sx DESC NULLS FIRST, fetch=4
1031+
05)--------Projection: t22489.g, sum(t22489.x) AS sx, sum(t22489.y) AS sy
1032+
06)----------Aggregate: groupBy=[[t22489.g]], aggr=[[sum(CAST(t22489.x AS Int64)), sum(CAST(t22489.y AS Int64))]]
1033+
07)------------TableScan: t22489 projection=[g, x, y]
1034+
physical_plan
1035+
01)GlobalLimitExec: skip=1, fetch=None
1036+
02)--SortExec: expr=[sy@2 DESC], preserve_partitioning=[false]
1037+
03)----SortPreservingMergeExec: [sx@1 DESC], fetch=4
1038+
04)------SortExec: TopK(fetch=4), expr=[sx@1 DESC], preserve_partitioning=[true]
1039+
05)--------ProjectionExec: expr=[g@0 as g, sum(t22489.x)@1 as sx, sum(t22489.y)@2 as sy]
1040+
06)----------AggregateExec: mode=FinalPartitioned, gby=[g@0 as g], aggr=[sum(t22489.x), sum(t22489.y)]
1041+
07)------------RepartitionExec: partitioning=Hash([g@0], 4), input_partitions=1
1042+
08)--------------AggregateExec: mode=Partial, gby=[g@0 as g], aggr=[sum(t22489.x), sum(t22489.y)]
1043+
09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
1044+
1045+
statement ok
1046+
DROP TABLE t22489;

0 commit comments

Comments
 (0)