Skip to content

Commit 35e4927

Browse files
committed
Merge branch 'exact-reverse-scan' into branch-52-reverse-scan
2 parents b4dbb6a + f0004c4 commit 35e4927

16 files changed

Lines changed: 1379 additions & 146 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,13 @@ config_namespace! {
732732
/// parquet reader setting. 0 means no caching.
733733
pub max_predicate_cache_size: Option<usize>, default = None
734734

735+
/// (reading) If true, reverse scans produce exact descending order
736+
/// by reversing rows within each row group. This allows the Sort
737+
/// operator to be removed entirely and fetch/limit to be pushed
738+
/// down to the scan. If false (default), reverse scans only reverse
739+
/// row group order (inexact), keeping TopK above for final sorting.
740+
pub enable_exact_reverse_scan: bool, default = false
741+
735742
// The following options affect writing to parquet files
736743
// and map to parquet::file::properties::WriterProperties
737744

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ impl ParquetOptions {
209209
coerce_int96: _, // not used for writer props
210210
skip_arrow_metadata: _,
211211
max_predicate_cache_size: _,
212+
enable_exact_reverse_scan: _, // reads not used for writer props
212213
} = self;
213214

214215
let mut builder = WriterProperties::builder()
@@ -464,6 +465,7 @@ mod tests {
464465
skip_arrow_metadata: defaults.skip_arrow_metadata,
465466
coerce_int96: None,
466467
max_predicate_cache_size: defaults.max_predicate_cache_size,
468+
enable_exact_reverse_scan: defaults.enable_exact_reverse_scan,
467469
}
468470
}
469471

@@ -578,6 +580,8 @@ mod tests {
578580
binary_as_string: global_options_defaults.binary_as_string,
579581
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
580582
coerce_int96: None,
583+
enable_exact_reverse_scan: global_options_defaults
584+
.enable_exact_reverse_scan,
581585
},
582586
column_specific_options,
583587
key_value_metadata,

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ use std::sync::Arc;
3333

3434
use crate::physical_optimizer::test_utils::{
3535
OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, parquet_exec,
36-
parquet_exec_with_sort, projection_exec, projection_exec_with_alias,
37-
repartition_exec, schema, simple_projection_exec, sort_exec, sort_exec_with_fetch,
38-
sort_expr, sort_expr_named, test_scan_with_ordering,
36+
parquet_exec_with_sort, parquet_exec_with_sort_exact_reverse, projection_exec,
37+
projection_exec_with_alias, repartition_exec, schema, simple_projection_exec,
38+
sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_named, test_scan_with_ordering,
3939
};
4040

4141
#[test]
@@ -1038,3 +1038,119 @@ fn test_sort_pushdown_with_test_scan_arbitrary_ordering() {
10381038
"
10391039
);
10401040
}
1041+
1042+
// ============================================================================
1043+
// EXACT REVERSE SCAN TESTS
1044+
// ============================================================================
1045+
// These tests verify behavior when exact_reverse is enabled on ParquetSource.
1046+
// With exact reverse, the Sort operator is removed entirely and fetch is pushed
1047+
// down to the scan.
1048+
1049+
#[test]
1050+
fn test_exact_reverse_removes_sort() {
1051+
// With exact_reverse=true, Sort should be removed entirely
1052+
let schema = schema();
1053+
let a = sort_expr("a", &schema);
1054+
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
1055+
let source =
1056+
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);
1057+
1058+
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
1059+
let plan = sort_exec(desc_ordering, source);
1060+
1061+
insta::assert_snapshot!(
1062+
OptimizationTest::new(plan, PushdownSort::new(), true),
1063+
@r"
1064+
OptimizationTest:
1065+
input:
1066+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1067+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1068+
output:
1069+
Ok:
1070+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
1071+
"
1072+
);
1073+
}
1074+
1075+
#[test]
1076+
fn test_exact_reverse_with_fetch_pushes_limit() {
1077+
// With exact_reverse=true, Sort with fetch should be removed and fetch
1078+
// pushed down to the scan
1079+
let schema = schema();
1080+
let a = sort_expr("a", &schema);
1081+
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
1082+
let source =
1083+
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);
1084+
1085+
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
1086+
let plan = sort_exec_with_fetch(desc_ordering, Some(10), source);
1087+
1088+
insta::assert_snapshot!(
1089+
OptimizationTest::new(plan, PushdownSort::new(), true),
1090+
@r"
1091+
OptimizationTest:
1092+
input:
1093+
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1094+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1095+
output:
1096+
Ok:
1097+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], limit=10, output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
1098+
"
1099+
);
1100+
}
1101+
1102+
#[test]
1103+
fn test_exact_reverse_through_projection_with_fetch() {
1104+
// Exact reverse with fetch pushes through projection
1105+
let schema = schema();
1106+
let a = sort_expr("a", &schema);
1107+
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
1108+
let source =
1109+
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);
1110+
1111+
let projection = simple_projection_exec(source, vec![0, 1]);
1112+
1113+
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
1114+
let plan = sort_exec_with_fetch(desc_ordering, Some(5), projection);
1115+
1116+
insta::assert_snapshot!(
1117+
OptimizationTest::new(plan, PushdownSort::new(), true),
1118+
@r"
1119+
OptimizationTest:
1120+
input:
1121+
- SortExec: TopK(fetch=5), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1122+
- ProjectionExec: expr=[a@0 as a, b@1 as b]
1123+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1124+
output:
1125+
Ok:
1126+
- ProjectionExec: expr=[a@0 as a, b@1 as b]
1127+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], limit=5, output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
1128+
"
1129+
);
1130+
}
1131+
1132+
#[test]
1133+
fn test_exact_reverse_without_fetch_no_limit() {
1134+
// Exact reverse without fetch: Sort removed, no limit on scan
1135+
let schema = schema();
1136+
let a = sort_expr("a", &schema);
1137+
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
1138+
let source =
1139+
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);
1140+
1141+
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
1142+
let plan = sort_exec(desc_ordering, source); // no fetch
1143+
1144+
insta::assert_snapshot!(
1145+
OptimizationTest::new(plan, PushdownSort::new(), true),
1146+
@r"
1147+
OptimizationTest:
1148+
input:
1149+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1150+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1151+
output:
1152+
Ok:
1153+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
1154+
"
1155+
);
1156+
}

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,23 @@ pub(crate) fn parquet_exec_with_sort(
101101
DataSourceExec::from_data_source(config)
102102
}
103103

104+
/// Create a single parquet file that is sorted with exact_reverse enabled
105+
pub(crate) fn parquet_exec_with_sort_exact_reverse(
106+
schema: SchemaRef,
107+
output_ordering: Vec<LexOrdering>,
108+
) -> Arc<DataSourceExec> {
109+
let source = ParquetSource::new(schema).with_exact_reverse(true);
110+
let config = FileScanConfigBuilder::new(
111+
ObjectStoreUrl::parse("test:///").unwrap(),
112+
Arc::new(source),
113+
)
114+
.with_file(PartitionedFile::new("x".to_string(), 100))
115+
.with_output_ordering(output_ordering)
116+
.build();
117+
118+
DataSourceExec::from_data_source(config)
119+
}
120+
104121
fn int64_stats() -> ColumnStatistics {
105122
ColumnStatistics {
106123
null_count: Precision::Absent,

0 commit comments

Comments
 (0)