Skip to content

Commit 03cd09a

Browse files
committed
Add session config, limit-after-reverse fix, SLT tests, and plan display for exact reverse scan
- Add `enable_exact_reverse_scan` to ParquetOptions (default false) - Wire config through `with_table_parquet_options` to set `exact_reverse` - Fix limit correctness: skip passing limit to parquet reader when reverse_rows=true; apply limit in ReversedRowGroupStream after reversal - Display `scan_direction=Reversed` for exact, `reverse_row_groups=true` for inexact - Add 4 snapshot tests for exact reverse (removes Sort, fetch pushdown, through projection) - Add 8 SLT tests: EXPLAIN plans + result verification, LIMIT, OFFSET+LIMIT, ASC unchanged, toggle off
1 parent 7d2793b commit 03cd09a

7 files changed

Lines changed: 315 additions & 12 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,13 @@ config_namespace! {
732732
/// parquet reader setting. 0 means no caching.
733733
pub max_predicate_cache_size: Option<usize>, default = None
734734

735+
/// (reading) If true, reverse scans produce exact descending order
736+
/// by reversing rows within each row group. This allows the Sort
737+
/// operator to be removed entirely and fetch/limit to be pushed
738+
/// down to the scan. If false (default), reverse scans only reverse
739+
/// row group order (inexact), keeping TopK above for final sorting.
740+
pub enable_exact_reverse_scan: bool, default = false
741+
735742
// The following options affect writing to parquet files
736743
// and map to parquet::file::properties::WriterProperties
737744

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ impl ParquetOptions {
209209
coerce_int96: _, // not used for writer props
210210
skip_arrow_metadata: _,
211211
max_predicate_cache_size: _,
212+
enable_exact_reverse_scan: _, // reads not used for writer props
212213
} = self;
213214

214215
let mut builder = WriterProperties::builder()

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ use std::sync::Arc;
3333

3434
use crate::physical_optimizer::test_utils::{
3535
OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, parquet_exec,
36-
parquet_exec_with_sort, projection_exec, projection_exec_with_alias,
37-
repartition_exec, schema, simple_projection_exec, sort_exec, sort_exec_with_fetch,
38-
sort_expr, sort_expr_named, test_scan_with_ordering,
36+
parquet_exec_with_sort, parquet_exec_with_sort_exact_reverse, projection_exec,
37+
projection_exec_with_alias, repartition_exec, schema, simple_projection_exec,
38+
sort_exec, sort_exec_with_fetch, sort_expr, sort_expr_named, test_scan_with_ordering,
3939
};
4040

4141
#[test]
@@ -1038,3 +1038,119 @@ fn test_sort_pushdown_with_test_scan_arbitrary_ordering() {
10381038
"
10391039
);
10401040
}
1041+
1042+
// ============================================================================
1043+
// EXACT REVERSE SCAN TESTS
1044+
// ============================================================================
1045+
// These tests verify behavior when exact_reverse is enabled on ParquetSource.
1046+
// With exact reverse, the Sort operator is removed entirely and fetch is pushed
1047+
// down to the scan.
1048+
1049+
#[test]
1050+
fn test_exact_reverse_removes_sort() {
1051+
// With exact_reverse=true, Sort should be removed entirely
1052+
let schema = schema();
1053+
let a = sort_expr("a", &schema);
1054+
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
1055+
let source =
1056+
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);
1057+
1058+
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
1059+
let plan = sort_exec(desc_ordering, source);
1060+
1061+
insta::assert_snapshot!(
1062+
OptimizationTest::new(plan, PushdownSort::new(), true),
1063+
@r"
1064+
OptimizationTest:
1065+
input:
1066+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1067+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1068+
output:
1069+
Ok:
1070+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
1071+
"
1072+
);
1073+
}
1074+
1075+
#[test]
1076+
fn test_exact_reverse_with_fetch_pushes_limit() {
1077+
// With exact_reverse=true, Sort with fetch should be removed and fetch
1078+
// pushed down to the scan
1079+
let schema = schema();
1080+
let a = sort_expr("a", &schema);
1081+
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
1082+
let source =
1083+
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);
1084+
1085+
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
1086+
let plan = sort_exec_with_fetch(desc_ordering, Some(10), source);
1087+
1088+
insta::assert_snapshot!(
1089+
OptimizationTest::new(plan, PushdownSort::new(), true),
1090+
@r"
1091+
OptimizationTest:
1092+
input:
1093+
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1094+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1095+
output:
1096+
Ok:
1097+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], limit=10, output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
1098+
"
1099+
);
1100+
}
1101+
1102+
#[test]
1103+
fn test_exact_reverse_through_projection_with_fetch() {
1104+
// Exact reverse with fetch pushes through projection
1105+
let schema = schema();
1106+
let a = sort_expr("a", &schema);
1107+
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
1108+
let source =
1109+
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);
1110+
1111+
let projection = simple_projection_exec(source, vec![0, 1]);
1112+
1113+
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
1114+
let plan = sort_exec_with_fetch(desc_ordering, Some(5), projection);
1115+
1116+
insta::assert_snapshot!(
1117+
OptimizationTest::new(plan, PushdownSort::new(), true),
1118+
@r"
1119+
OptimizationTest:
1120+
input:
1121+
- SortExec: TopK(fetch=5), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1122+
- ProjectionExec: expr=[a@0 as a, b@1 as b]
1123+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1124+
output:
1125+
Ok:
1126+
- ProjectionExec: expr=[a@0 as a, b@1 as b]
1127+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], limit=5, output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
1128+
"
1129+
);
1130+
}
1131+
1132+
#[test]
1133+
fn test_exact_reverse_without_fetch_no_limit() {
1134+
// Exact reverse without fetch: Sort removed, no limit on scan
1135+
let schema = schema();
1136+
let a = sort_expr("a", &schema);
1137+
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
1138+
let source =
1139+
parquet_exec_with_sort_exact_reverse(schema.clone(), vec![source_ordering]);
1140+
1141+
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
1142+
let plan = sort_exec(desc_ordering, source); // no fetch
1143+
1144+
insta::assert_snapshot!(
1145+
OptimizationTest::new(plan, PushdownSort::new(), true),
1146+
@r"
1147+
OptimizationTest:
1148+
input:
1149+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1150+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
1151+
output:
1152+
Ok:
1153+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, scan_direction=Reversed
1154+
"
1155+
);
1156+
}

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,23 @@ pub(crate) fn parquet_exec_with_sort(
101101
DataSourceExec::from_data_source(config)
102102
}
103103

104+
/// Create a single parquet file that is sorted with exact_reverse enabled
105+
pub(crate) fn parquet_exec_with_sort_exact_reverse(
106+
schema: SchemaRef,
107+
output_ordering: Vec<LexOrdering>,
108+
) -> Arc<DataSourceExec> {
109+
let source = ParquetSource::new(schema).with_exact_reverse(true);
110+
let config = FileScanConfigBuilder::new(
111+
ObjectStoreUrl::parse("test:///").unwrap(),
112+
Arc::new(source),
113+
)
114+
.with_file(PartitionedFile::new("x".to_string(), 100))
115+
.with_output_ordering(output_ordering)
116+
.build();
117+
118+
DataSourceExec::from_data_source(config)
119+
}
120+
104121
fn int64_stats() -> ColumnStatistics {
105122
ColumnStatistics {
106123
null_count: Precision::Absent,

datafusion/datasource-parquet/src/opener.rs

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -573,8 +573,14 @@ impl FileOpener for ParquetOpener {
573573
// Apply the prepared plan to the builder
574574
builder = prepared_plan.apply_to_builder(builder);
575575

576+
// When reverse_rows is enabled, limit must be applied AFTER row
577+
// reversal (in ReversedRowGroupStream), not at the parquet reader
578+
// level. Applying limit here would read the first N rows in forward
579+
// order and then reverse them, giving wrong results.
576580
if let Some(limit) = limit {
577-
builder = builder.with_limit(limit)
581+
if !reverse_rows {
582+
builder = builder.with_limit(limit)
583+
}
578584
}
579585

580586
if let Some(max_predicate_cache_size) = max_predicate_cache_size {
@@ -697,9 +703,11 @@ impl FileOpener for ParquetOpener {
697703

698704
// When exact reverse is enabled, wrap the stream to buffer
699705
// and reverse rows per row group. Memory cost: O(largest_RG).
706+
// The limit is applied here (after reversal) instead of at the
707+
// parquet reader level so that we get the correct reversed rows.
700708
let stream: futures::stream::BoxStream<'static, Result<RecordBatch>> =
701709
if reverse_rows {
702-
ReversedRowGroupStream::new(stream, rg_row_counts).boxed()
710+
ReversedRowGroupStream::new(stream, rg_row_counts, limit).boxed()
703711
} else {
704712
stream.boxed()
705713
};
@@ -738,10 +746,12 @@ struct ReversedRowGroupStream<S> {
738746
output_buffer: VecDeque<RecordBatch>,
739747
/// Whether the inner stream is exhausted
740748
done: bool,
749+
/// Optional row limit (applied after reversal for correct results)
750+
remaining_limit: Option<usize>,
741751
}
742752

743753
impl<S> ReversedRowGroupStream<S> {
744-
fn new(inner: S, rg_row_counts: Vec<usize>) -> Self {
754+
fn new(inner: S, rg_row_counts: Vec<usize>, limit: Option<usize>) -> Self {
745755
let rows_remaining = rg_row_counts.first().copied().unwrap_or(0);
746756
Self {
747757
inner,
@@ -751,6 +761,25 @@ impl<S> ReversedRowGroupStream<S> {
751761
buffer: Vec::new(),
752762
output_buffer: VecDeque::new(),
753763
done: false,
764+
remaining_limit: limit,
765+
}
766+
}
767+
768+
/// Truncate batch to remaining limit and update the counter.
769+
/// Returns the (possibly truncated) batch.
770+
fn apply_limit(&mut self, batch: RecordBatch) -> RecordBatch {
771+
if let Some(remaining) = self.remaining_limit.as_mut() {
772+
let rows = batch.num_rows();
773+
if rows <= *remaining {
774+
*remaining -= rows;
775+
batch
776+
} else {
777+
let truncated = batch.slice(0, *remaining);
778+
*remaining = 0;
779+
truncated
780+
}
781+
} else {
782+
batch
754783
}
755784
}
756785

@@ -790,11 +819,14 @@ where
790819
mut self: Pin<&mut Self>,
791820
cx: &mut Context<'_>,
792821
) -> Poll<Option<Self::Item>> {
793-
use Poll;
822+
// Check if limit has been reached
823+
if self.remaining_limit == Some(0) {
824+
return Poll::Ready(None);
825+
}
794826

795827
// First, emit any already-reversed batches
796828
if let Some(batch) = self.output_buffer.pop_front() {
797-
return Poll::Ready(Some(Ok(batch)));
829+
return Poll::Ready(Some(Ok(self.apply_limit(batch))));
798830
}
799831

800832
if self.done {
@@ -816,7 +848,7 @@ where
816848
return Poll::Ready(Some(Err(e)));
817849
}
818850
if let Some(batch) = self.output_buffer.pop_front() {
819-
return Poll::Ready(Some(Ok(batch)));
851+
return Poll::Ready(Some(Ok(self.apply_limit(batch))));
820852
}
821853
}
822854
}
@@ -830,7 +862,7 @@ where
830862
return Poll::Ready(Some(Err(e)));
831863
}
832864
if let Some(batch) = self.output_buffer.pop_front() {
833-
return Poll::Ready(Some(Ok(batch)));
865+
return Poll::Ready(Some(Ok(self.apply_limit(batch))));
834866
}
835867
return Poll::Ready(None);
836868
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ impl ParquetSource {
336336
mut self,
337337
table_parquet_options: TableParquetOptions,
338338
) -> Self {
339+
self.exact_reverse = table_parquet_options.global.enable_exact_reverse_scan;
339340
self.table_parquet_options = table_parquet_options;
340341
self
341342
}
@@ -650,8 +651,10 @@ impl FileSource for ParquetSource {
650651

651652
write!(f, "{predicate_string}")?;
652653

653-
// Add reverse_scan info if enabled
654-
if self.reverse_row_groups {
654+
// Add reverse scan info if enabled
655+
if self.reverse_row_groups && self.reverse_rows {
656+
write!(f, ", scan_direction=Reversed")?;
657+
} else if self.reverse_row_groups {
655658
write!(f, ", reverse_row_groups=true")?;
656659
}
657660

0 commit comments

Comments
 (0)