Skip to content

Commit fdfa2c7

Browse files
committed
Add session config, limit-after-reverse fix, SLT tests, and plan display for exact reverse scan
- Add `enable_exact_reverse_scan` to ParquetOptions (default false) - Wire config through `with_table_parquet_options` to set `exact_reverse` - Fix limit correctness: skip passing limit to parquet reader when reverse_rows=true; apply limit in ReversedRowGroupStream after reversal - Display `scan_direction=Reversed` for exact, `reverse_row_groups=true` for inexact - Add 4 snapshot tests for exact reverse (removes Sort, fetch pushdown, through projection) - Add 8 SLT tests: EXPLAIN plans + result verification, LIMIT, OFFSET+LIMIT, ASC unchanged, toggle off
1 parent 7c347ed commit fdfa2c7

7 files changed

Lines changed: 315 additions & 12 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,13 @@ config_namespace! {
732732
/// parquet reader setting. 0 means no caching.
733733
pub max_predicate_cache_size: Option<usize>, default = None
734734

735+
/// (reading) If true, reverse scans produce exact descending order
736+
/// by reversing rows within each row group. This allows the Sort
737+
/// operator to be removed entirely and fetch/limit to be pushed
738+
/// down to the scan. If false (default), reverse scans only reverse
739+
/// row group order (inexact), keeping TopK above for final sorting.
740+
pub enable_exact_reverse_scan: bool, default = false
741+
735742
// The following options affect writing to parquet files
736743
// and map to parquet::file::properties::WriterProperties
737744

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ impl ParquetOptions {
209209
coerce_int96: _, // not used for writer props
210210
skip_arrow_metadata: _,
211211
max_predicate_cache_size: _,
212+
enable_exact_reverse_scan: _, // reads not used for writer props
212213
} = self;
213214

214215
let mut builder = WriterProperties::builder()

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ use std::sync::Arc;
3333

3434
use crate::physical_optimizer::test_utils::{
3535
OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, parquet_exec,
36-
parquet_exec_with_sort, projection_exec, projection_exec_with_alias,
37-
repartition_exec, schema, simple_projection_exec, sort_exec, sort_exec_with_fetch,
38-
sort_expr, sort_expr_named, test_scan_with_ordering,
36+
parquet_exec_with_sort, parquet_exec_with_sort_exact_reverse, projection_exec,
37+
projection_exec_with_alias, repartition_exec, schema, simple_projection_exec,
38+
sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_named, test_scan_with_ordering,
3939
};
4040

4141
#[test]
@@ -1038,3 +1038,119 @@ fn test_sort_pushdown_with_test_scan_arbitrary_ordering() {
10381038
"
10391039
);
10401040
}
1041+
1042+
// ============================================================================
1043+
// EXACT REVERSE SCAN TESTS
1044+
// ============================================================================
1045+
// These tests verify behavior when exact_reverse is enabled on ParquetSource.
1046+
// With exact reverse, the Sort operator is removed entirely and fetch is pushed
1047+
// down to the scan.
1048+
1049+
#[test]
1050+
fn test_exact_reverse_removes_sort() {
1051+
// With exact_reverse=true, Sort should be removed entirely
1052+
let schema = schema();
1053+
let a = sort_expr("a", &schema);
1054+
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
1055+
let source =
1056+
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);
1057+
1058+
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
1059+
let plan = sort_exec(desc_ordering, source);
1060+
1061+
insta::assert_snapshot!(
1062+
OptimizationTest::new(plan, PushdownSort::new(), true),
1063+
@r"
1064+
OptimizationTest:
1065+
input:
1066+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1067+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1068+
output:
1069+
Ok:
1070+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
1071+
"
1072+
);
1073+
}
1074+
1075+
#[test]
1076+
fn test_exact_reverse_with_fetch_pushes_limit() {
1077+
// With exact_reverse=true, Sort with fetch should be removed and fetch
1078+
// pushed down to the scan
1079+
let schema = schema();
1080+
let a = sort_expr("a", &schema);
1081+
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
1082+
let source =
1083+
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);
1084+
1085+
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
1086+
let plan = sort_exec_with_fetch(desc_ordering, Some(10), source);
1087+
1088+
insta::assert_snapshot!(
1089+
OptimizationTest::new(plan, PushdownSort::new(), true),
1090+
@r"
1091+
OptimizationTest:
1092+
input:
1093+
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1094+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1095+
output:
1096+
Ok:
1097+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], limit=10, output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
1098+
"
1099+
);
1100+
}
1101+
1102+
#[test]
1103+
fn test_exact_reverse_through_projection_with_fetch() {
1104+
// Exact reverse with fetch pushes through projection
1105+
let schema = schema();
1106+
let a = sort_expr("a", &schema);
1107+
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
1108+
let source =
1109+
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);
1110+
1111+
let projection = simple_projection_exec(source, vec![0, 1]);
1112+
1113+
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
1114+
let plan = sort_exec_with_fetch(desc_ordering, Some(5), projection);
1115+
1116+
insta::assert_snapshot!(
1117+
OptimizationTest::new(plan, PushdownSort::new(), true),
1118+
@r"
1119+
OptimizationTest:
1120+
input:
1121+
- SortExec: TopK(fetch=5), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1122+
- ProjectionExec: expr=[a@0 as a, b@1 as b]
1123+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1124+
output:
1125+
Ok:
1126+
- ProjectionExec: expr=[a@0 as a, b@1 as b]
1127+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], limit=5, output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
1128+
"
1129+
);
1130+
}
1131+
1132+
#[test]
1133+
fn test_exact_reverse_without_fetch_no_limit() {
1134+
// Exact reverse without fetch: Sort removed, no limit on scan
1135+
let schema = schema();
1136+
let a = sort_expr("a", &schema);
1137+
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
1138+
let source =
1139+
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);
1140+
1141+
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
1142+
let plan = sort_exec(desc_ordering, source); // no fetch
1143+
1144+
insta::assert_snapshot!(
1145+
OptimizationTest::new(plan, PushdownSort::new(), true),
1146+
@r"
1147+
OptimizationTest:
1148+
input:
1149+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1150+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1151+
output:
1152+
Ok:
1153+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
1154+
"
1155+
);
1156+
}

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,23 @@ pub(crate) fn parquet_exec_with_sort(
101101
DataSourceExec::from_data_source(config)
102102
}
103103

104+
/// Create a single parquet file that is sorted with exact_reverse enabled
105+
pub(crate) fn parquet_exec_with_sort_exact_reverse(
106+
schema: SchemaRef,
107+
output_ordering: Vec<LexOrdering>,
108+
) -> Arc<DataSourceExec> {
109+
let source = ParquetSource::new(schema).with_exact_reverse(true);
110+
let config = FileScanConfigBuilder::new(
111+
ObjectStoreUrl::parse("test:///").unwrap(),
112+
Arc::new(source),
113+
)
114+
.with_file(PartitionedFile::new("x".to_string(), 100))
115+
.with_output_ordering(output_ordering)
116+
.build();
117+
118+
DataSourceExec::from_data_source(config)
119+
}
120+
104121
fn int64_stats() -> ColumnStatistics {
105122
ColumnStatistics {
106123
null_count: Precision::Absent,

datafusion/datasource-parquet/src/opener.rs

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -581,8 +581,14 @@ impl FileOpener for ParquetOpener {
581581
// Apply the prepared plan to the builder
582582
builder = prepared_plan.apply_to_builder(builder);
583583

584+
// When reverse_rows is enabled, limit must be applied AFTER row
585+
// reversal (in ReversedRowGroupStream), not at the parquet reader
586+
// level. Applying limit here would read the first N rows in forward
587+
// order and then reverse them, giving wrong results.
584588
if let Some(limit) = limit {
585-
builder = builder.with_limit(limit)
589+
if !reverse_rows {
590+
builder = builder.with_limit(limit)
591+
}
586592
}
587593

588594
if let Some(max_predicate_cache_size) = max_predicate_cache_size {
@@ -705,9 +711,11 @@ impl FileOpener for ParquetOpener {
705711

706712
// When exact reverse is enabled, wrap the stream to buffer
707713
// and reverse rows per row group. Memory cost: O(largest_RG).
714+
// The limit is applied here (after reversal) instead of at the
715+
// parquet reader level so that we get the correct reversed rows.
708716
let stream: futures::stream::BoxStream<'static, Result<RecordBatch>> =
709717
if reverse_rows {
710-
ReversedRowGroupStream::new(stream, rg_row_counts).boxed()
718+
ReversedRowGroupStream::new(stream, rg_row_counts, limit).boxed()
711719
} else {
712720
stream.boxed()
713721
};
@@ -746,10 +754,12 @@ struct ReversedRowGroupStream<S> {
746754
output_buffer: VecDeque<RecordBatch>,
747755
/// Whether the inner stream is exhausted
748756
done: bool,
757+
/// Optional row limit (applied after reversal for correct results)
758+
remaining_limit: Option<usize>,
749759
}
750760

751761
impl<S> ReversedRowGroupStream<S> {
752-
fn new(inner: S, rg_row_counts: Vec<usize>) -> Self {
762+
fn new(inner: S, rg_row_counts: Vec<usize>, limit: Option<usize>) -> Self {
753763
let rows_remaining = rg_row_counts.first().copied().unwrap_or(0);
754764
Self {
755765
inner,
@@ -759,6 +769,25 @@ impl<S> ReversedRowGroupStream<S> {
759769
buffer: Vec::new(),
760770
output_buffer: VecDeque::new(),
761771
done: false,
772+
remaining_limit: limit,
773+
}
774+
}
775+
776+
/// Truncate batch to remaining limit and update the counter.
777+
/// Returns the (possibly truncated) batch.
778+
fn apply_limit(&mut self, batch: RecordBatch) -> RecordBatch {
779+
if let Some(remaining) = self.remaining_limit.as_mut() {
780+
let rows = batch.num_rows();
781+
if rows <= *remaining {
782+
*remaining -= rows;
783+
batch
784+
} else {
785+
let truncated = batch.slice(0, *remaining);
786+
*remaining = 0;
787+
truncated
788+
}
789+
} else {
790+
batch
762791
}
763792
}
764793

@@ -798,11 +827,14 @@ where
798827
mut self: Pin<&mut Self>,
799828
cx: &mut Context<'_>,
800829
) -> Poll<Option<Self::Item>> {
801-
use Poll;
830+
// Check if limit has been reached
831+
if self.remaining_limit == Some(0) {
832+
return Poll::Ready(None);
833+
}
802834

803835
// First, emit any already-reversed batches
804836
if let Some(batch) = self.output_buffer.pop_front() {
805-
return Poll::Ready(Some(Ok(batch)));
837+
return Poll::Ready(Some(Ok(self.apply_limit(batch))));
806838
}
807839

808840
if self.done {
@@ -824,7 +856,7 @@ where
824856
return Poll::Ready(Some(Err(e)));
825857
}
826858
if let Some(batch) = self.output_buffer.pop_front() {
827-
return Poll::Ready(Some(Ok(batch)));
859+
return Poll::Ready(Some(Ok(self.apply_limit(batch))));
828860
}
829861
}
830862
}
@@ -838,7 +870,7 @@ where
838870
return Poll::Ready(Some(Err(e)));
839871
}
840872
if let Some(batch) = self.output_buffer.pop_front() {
841-
return Poll::Ready(Some(Ok(batch)));
873+
return Poll::Ready(Some(Ok(self.apply_limit(batch))));
842874
}
843875
return Poll::Ready(None);
844876
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ impl ParquetSource {
336336
mut self,
337337
table_parquet_options: TableParquetOptions,
338338
) -> Self {
339+
self.exact_reverse = table_parquet_options.global.enable_exact_reverse_scan;
339340
self.table_parquet_options = table_parquet_options;
340341
self
341342
}
@@ -651,8 +652,10 @@ impl FileSource for ParquetSource {
651652

652653
write!(f, "{predicate_string}")?;
653654

654-
// Add reverse_scan info if enabled
655-
if self.reverse_row_groups {
655+
// Add reverse scan info if enabled
656+
if self.reverse_row_groups && self.reverse_rows {
657+
write!(f, ", scan_direction=Reversed")?;
658+
} else if self.reverse_row_groups {
656659
write!(f, ", reverse_row_groups=true")?;
657660
}
658661

0 commit comments

Comments
 (0)