Skip to content

Commit 969a7b2

Browse files
committed
Merge remote-tracking branch 'origin/main' into neilc/perf-builder-replace
2 parents d286ff7 + 65f337d commit 969a7b2

7 files changed

Lines changed: 937 additions & 161 deletions

File tree

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ use datafusion_physical_optimizer::pushdown_sort::PushdownSort;
3232
use std::sync::Arc;
3333

3434
use crate::physical_optimizer::test_utils::{
35-
OptimizationTest, coalesce_partitions_exec, parquet_exec, parquet_exec_with_sort,
36-
projection_exec, projection_exec_with_alias, repartition_exec, schema,
37-
simple_projection_exec, sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_named,
38-
test_scan_with_ordering,
35+
OptimizationTest, TestScan, coalesce_partitions_exec, parquet_exec,
36+
parquet_exec_with_sort, projection_exec, projection_exec_with_alias,
37+
repartition_exec, schema, simple_projection_exec, sort_exec, sort_exec_with_fetch,
38+
sort_expr, sort_expr_named, test_scan_with_ordering,
3939
};
4040

4141
#[test]
@@ -996,3 +996,91 @@ fn test_sort_pushdown_with_test_scan_arbitrary_ordering() {
996996
"
997997
);
998998
}
999+
1000+
// ============================================================================
1001+
// EXACT PUSHDOWN TESTS (source guarantees ordering, SortExec removed)
1002+
// ============================================================================
1003+
1004+
#[test]
1005+
fn test_sort_pushdown_exact_no_fetch_no_limit() {
1006+
// When a source returns Exact (without fetch), the SortExec should be
1007+
// removed entirely with no GlobalLimitExec wrapper.
1008+
let schema = schema();
1009+
let a = sort_expr("a", &schema);
1010+
let b = sort_expr("b", &schema);
1011+
let source =
1012+
Arc::new(TestScan::new(schema.clone(), vec![]).with_exact_pushdown(true));
1013+
1014+
let ordering = LexOrdering::new(vec![a, b.reverse()]).unwrap();
1015+
let plan = sort_exec(ordering, source);
1016+
1017+
insta::assert_snapshot!(
1018+
OptimizationTest::new(plan, PushdownSort::new(), true),
1019+
@r"
1020+
OptimizationTest:
1021+
input:
1022+
- SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false]
1023+
- TestScan
1024+
output:
1025+
Ok:
1026+
- TestScan: requested_ordering=[a@0 ASC, b@1 DESC NULLS LAST]
1027+
"
1028+
);
1029+
}
1030+
1031+
#[test]
1032+
fn test_sort_pushdown_exact_preserves_fetch_with_global_limit() {
1033+
// When a source returns Exact but does NOT support with_fetch(),
1034+
// the optimizer must wrap the result with GlobalLimitExec to preserve
1035+
// the LIMIT from the eliminated SortExec.
1036+
let schema = schema();
1037+
let a = sort_expr("a", &schema);
1038+
let source =
1039+
Arc::new(TestScan::new(schema.clone(), vec![]).with_exact_pushdown(true));
1040+
1041+
let ordering = LexOrdering::new(vec![a]).unwrap();
1042+
let plan = sort_exec_with_fetch(ordering, Some(10), source);
1043+
1044+
insta::assert_snapshot!(
1045+
OptimizationTest::new(plan, PushdownSort::new(), true),
1046+
@r"
1047+
OptimizationTest:
1048+
input:
1049+
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false]
1050+
- TestScan
1051+
output:
1052+
Ok:
1053+
- GlobalLimitExec: skip=0, fetch=10
1054+
- TestScan: requested_ordering=[a@0 ASC]
1055+
"
1056+
);
1057+
}
1058+
1059+
#[test]
1060+
fn test_sort_pushdown_exact_preserves_fetch_with_source_support() {
1061+
// When a source returns Exact AND supports with_fetch(),
1062+
// the limit should be pushed into the source directly (no GlobalLimitExec).
1063+
let schema = schema();
1064+
let a = sort_expr("a", &schema);
1065+
let source = Arc::new(
1066+
TestScan::new(schema.clone(), vec![])
1067+
.with_exact_pushdown(true)
1068+
.with_supports_fetch(true),
1069+
);
1070+
1071+
let ordering = LexOrdering::new(vec![a]).unwrap();
1072+
let plan = sort_exec_with_fetch(ordering, Some(10), source);
1073+
1074+
insta::assert_snapshot!(
1075+
OptimizationTest::new(plan, PushdownSort::new(), true),
1076+
@r"
1077+
OptimizationTest:
1078+
input:
1079+
- SortExec: TopK(fetch=10), expr=[a@0 ASC], preserve_partitioning=[false]
1080+
- TestScan
1081+
output:
1082+
Ok:
1083+
- TestScan: requested_ordering=[a@0 ASC], fetch=10
1084+
"
1085+
);
1086+
}

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 75 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -830,15 +830,30 @@ pub fn sort_expr_named(name: &str, index: usize) -> PhysicalSortExpr {
830830
}
831831
}
832832

833-
/// A test data source that can display any requested ordering
834-
/// This is useful for testing sort pushdown behavior
833+
/// A test data source that can display any requested ordering.
834+
/// This is useful for testing sort pushdown behavior.
835+
///
836+
/// ## Configuration
837+
///
838+
/// - `exact_pushdown`: if `true`, `try_pushdown_sort` returns `Exact`
839+
/// (source guarantees ordering, SortExec can be removed); if `false`
840+
/// (default), returns `Inexact` (SortExec kept).
841+
/// - `supports_fetch`: if `true`, `with_fetch()` returns `Some` so the
842+
/// optimizer can push a LIMIT into the source; if `false` (default),
843+
/// `with_fetch()` returns `None`, forcing a `GlobalLimitExec` wrapper.
835844
#[derive(Debug, Clone)]
836845
pub struct TestScan {
837846
schema: SchemaRef,
838847
output_ordering: Vec<LexOrdering>,
839848
plan_properties: Arc<PlanProperties>,
840849
// Store the requested ordering for display
841850
requested_ordering: Option<LexOrdering>,
851+
/// If true, `try_pushdown_sort` returns `Exact` instead of `Inexact`.
852+
exact_pushdown: bool,
853+
/// If true, `with_fetch()` returns `Some(...)` (source absorbs the limit).
854+
supports_fetch: bool,
855+
/// The fetch (LIMIT) value pushed into this scan via `with_fetch()`.
856+
fetch: Option<usize>,
842857
}
843858

844859
impl TestScan {
@@ -872,41 +887,60 @@ impl TestScan {
872887
output_ordering,
873888
plan_properties: Arc::new(plan_properties),
874889
requested_ordering: None,
890+
exact_pushdown: false,
891+
supports_fetch: false,
892+
fetch: None,
875893
}
876894
}
877895

878896
/// Create a TestScan with a single output ordering
879897
pub fn with_ordering(schema: SchemaRef, ordering: LexOrdering) -> Self {
880898
Self::new(schema, vec![ordering])
881899
}
900+
901+
/// Set whether `try_pushdown_sort` returns `Exact` (true) or `Inexact` (false).
902+
pub fn with_exact_pushdown(mut self, exact: bool) -> Self {
903+
self.exact_pushdown = exact;
904+
self
905+
}
906+
907+
/// Set whether `with_fetch()` returns `Some` (true) or `None` (false).
908+
pub fn with_supports_fetch(mut self, supports: bool) -> Self {
909+
self.supports_fetch = supports;
910+
self
911+
}
882912
}
883913

884914
impl DisplayAs for TestScan {
885915
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
886916
match t {
887917
DisplayFormatType::Default | DisplayFormatType::Verbose => {
888918
write!(f, "TestScan")?;
919+
let mut sep = ": ";
889920
if !self.output_ordering.is_empty() {
890-
write!(f, ": output_ordering=[")?;
891-
// Format the ordering in a readable way
921+
write!(f, "{sep}output_ordering=[")?;
892922
for (i, sort_expr) in self.output_ordering[0].iter().enumerate() {
893923
if i > 0 {
894924
write!(f, ", ")?;
895925
}
896926
write!(f, "{sort_expr}")?;
897927
}
898928
write!(f, "]")?;
929+
sep = ", ";
899930
}
900-
// This is the key part - show what ordering was requested
901931
if let Some(ref req) = self.requested_ordering {
902-
write!(f, ", requested_ordering=[")?;
932+
write!(f, "{sep}requested_ordering=[")?;
903933
for (i, sort_expr) in req.iter().enumerate() {
904934
if i > 0 {
905935
write!(f, ", ")?;
906936
}
907937
write!(f, "{sort_expr}")?;
908938
}
909939
write!(f, "]")?;
940+
sep = ", ";
941+
}
942+
if let Some(fetch) = self.fetch {
943+
write!(f, "{sep}fetch={fetch}")?;
910944
}
911945
Ok(())
912946
}
@@ -953,6 +987,20 @@ impl ExecutionPlan for TestScan {
953987
Ok(Arc::new(Statistics::new_unknown(&self.schema)))
954988
}
955989

990+
fn with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
991+
if self.supports_fetch {
992+
let mut new_scan = self.clone();
993+
new_scan.fetch = fetch;
994+
Some(Arc::new(new_scan))
995+
} else {
996+
None
997+
}
998+
}
999+
1000+
fn fetch(&self) -> Option<usize> {
1001+
self.fetch
1002+
}
1003+
9561004
// This is the key method - implement sort pushdown
9571005
fn try_pushdown_sort(
9581006
&self,
@@ -965,10 +1013,27 @@ impl ExecutionPlan for TestScan {
9651013
let mut new_scan = self.clone();
9661014
new_scan.requested_ordering = requested_ordering;
9671015

968-
// Always return Inexact to keep the Sort node (like Phase 1 behavior)
969-
Ok(SortOrderPushdownResult::Inexact {
970-
inner: Arc::new(new_scan),
971-
})
1016+
if self.exact_pushdown {
1017+
// Update plan properties to reflect the guaranteed ordering
1018+
let orderings: Vec<Vec<PhysicalSortExpr>> = vec![order.to_vec()];
1019+
let eq_properties = EquivalenceProperties::new_with_orderings(
1020+
Arc::clone(&self.schema),
1021+
orderings,
1022+
);
1023+
new_scan.plan_properties = Arc::new(PlanProperties::new(
1024+
eq_properties,
1025+
Partitioning::UnknownPartitioning(1),
1026+
EmissionType::Incremental,
1027+
Boundedness::Bounded,
1028+
));
1029+
Ok(SortOrderPushdownResult::Exact {
1030+
inner: Arc::new(new_scan),
1031+
})
1032+
} else {
1033+
Ok(SortOrderPushdownResult::Inexact {
1034+
inner: Arc::new(new_scan),
1035+
})
1036+
}
9721037
}
9731038

9741039
fn apply_expressions(

0 commit comments

Comments
 (0)