Skip to content

Commit 7ea5383

Browse files
committed
Add exact reverse scan: reverse rows within each batch for Exact sort pushdown
When try_reverse_output is called, enable both reverse_row_groups (reverse RG read order) and reverse_rows (reverse rows within each batch). This gives globally sorted output, allowing the Sort operator to be removed entirely and fetch to be pushed down to the scan for file-level early termination. Previously, only reverse_row_groups was set (Inexact), which kept TopK in the plan and prevented fetch pushdown — all files had to be opened even when only 1 row was needed.
1 parent 6d2c1cf commit 7ea5383

3 files changed

Lines changed: 64 additions & 57 deletions

File tree

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 32 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ fn test_sort_pushdown_basic_phase1() {
8484
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
8585
output:
8686
Ok:
87-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
88-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
87+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
8988
"
9089
);
9190
}
@@ -113,8 +112,7 @@ fn test_sort_with_limit_phase1() {
113112
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
114113
output:
115114
Ok:
116-
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
117-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
115+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
118116
"
119117
);
120118
}
@@ -144,8 +142,7 @@ fn test_sort_multiple_columns_phase1() {
144142
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
145143
output:
146144
Ok:
147-
- SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false]
148-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
145+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet, reverse_row_groups=true
149146
"
150147
);
151148
}
@@ -179,8 +176,7 @@ fn test_prefix_match_single_column() {
179176
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
180177
output:
181178
Ok:
182-
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
183-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
179+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet, reverse_row_groups=true
184180
"
185181
);
186182
}
@@ -213,8 +209,7 @@ fn test_prefix_match_with_limit() {
213209
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 DESC NULLS LAST, c@2 ASC], file_type=parquet
214210
output:
215211
Ok:
216-
- SortExec: TopK(fetch=100), expr=[a@0 DESC NULLS LAST, b@1 ASC], preserve_partitioning=[false]
217-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
212+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 DESC NULLS LAST, c@2 ASC], file_type=parquet, reverse_row_groups=true
218213
"
219214
);
220215
}
@@ -249,10 +244,9 @@ fn test_prefix_match_through_transparent_nodes() {
249244
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC, c@2 DESC NULLS LAST], file_type=parquet
250245
output:
251246
Ok:
252-
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
253-
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
254-
- CoalesceBatchesExec: target_batch_size=1024
255-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
247+
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
248+
- CoalesceBatchesExec: target_batch_size=1024
249+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC, c@2 DESC NULLS LAST], file_type=parquet, reverse_row_groups=true
256250
"
257251
);
258252
}
@@ -344,9 +338,8 @@ fn test_sort_through_coalesce_batches() {
344338
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
345339
output:
346340
Ok:
347-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
348-
- CoalesceBatchesExec: target_batch_size=1024
349-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
341+
- CoalesceBatchesExec: target_batch_size=1024
342+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
350343
"
351344
);
352345
}
@@ -373,9 +366,8 @@ fn test_sort_through_repartition() {
373366
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
374367
output:
375368
Ok:
376-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
377-
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
378-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
369+
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
370+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
379371
"
380372
);
381373
}
@@ -406,8 +398,7 @@ fn test_nested_sorts() {
406398
output:
407399
Ok:
408400
- SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
409-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
410-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
401+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
411402
"
412403
);
413404
}
@@ -469,8 +460,8 @@ fn test_sort_through_coalesce_partitions() {
469460
Ok:
470461
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
471462
- CoalescePartitionsExec
472-
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
473-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
463+
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
464+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
474465
"
475466
);
476467
}
@@ -503,9 +494,9 @@ fn test_complex_plan_with_multiple_operators() {
503494
Ok:
504495
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
505496
- CoalescePartitionsExec
506-
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
497+
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
507498
- CoalesceBatchesExec: target_batch_size=1024
508-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
499+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
509500
"
510501
);
511502
}
@@ -538,8 +529,7 @@ fn test_multiple_sorts_different_columns() {
538529
output:
539530
Ok:
540531
- SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
541-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
542-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
532+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
543533
"
544534
);
545535
}
@@ -667,8 +657,7 @@ fn test_pushdown_through_blocking_node() {
667657
Ok:
668658
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
669659
- AggregateExec: mode=Final, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted
670-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
671-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
660+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
672661
"
673662
);
674663
}
@@ -704,9 +693,8 @@ fn test_sort_pushdown_through_simple_projection() {
704693
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
705694
output:
706695
Ok:
707-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
708-
- ProjectionExec: expr=[a@0 as a, b@1 as b]
709-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
696+
- ProjectionExec: expr=[a@0 as a, b@1 as b]
697+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
710698
"
711699
);
712700
}
@@ -739,9 +727,8 @@ fn test_sort_pushdown_through_projection_with_alias() {
739727
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
740728
output:
741729
Ok:
742-
- SortExec: expr=[id@0 DESC NULLS LAST], preserve_partitioning=[false]
743-
- ProjectionExec: expr=[a@0 as id, b@1 as value]
744-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
730+
- ProjectionExec: expr=[a@0 as id, b@1 as value]
731+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
745732
"
746733
);
747734
}
@@ -828,9 +815,8 @@ fn test_sort_pushdown_projection_reordered_columns() {
828815
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
829816
output:
830817
Ok:
831-
- SortExec: expr=[a@2 DESC NULLS LAST], preserve_partitioning=[false]
832-
- ProjectionExec: expr=[c@2 as c, b@1 as b, a@0 as a]
833-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
818+
- ProjectionExec: expr=[c@2 as c, b@1 as b, a@0 as a]
819+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
834820
"
835821
);
836822
}
@@ -862,9 +848,8 @@ fn test_sort_pushdown_projection_with_limit() {
862848
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
863849
output:
864850
Ok:
865-
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
866-
- ProjectionExec: expr=[a@0 as a, b@1 as b]
867-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
851+
- ProjectionExec: expr=[a@0 as a, b@1 as b]
852+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
868853
"
869854
);
870855
}
@@ -899,10 +884,9 @@ fn test_sort_pushdown_through_projection_and_coalesce() {
899884
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
900885
output:
901886
Ok:
902-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
903-
- ProjectionExec: expr=[a@0 as a, b@1 as b]
904-
- CoalesceBatchesExec: target_batch_size=1024
905-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
887+
- ProjectionExec: expr=[a@0 as a, b@1 as b]
888+
- CoalesceBatchesExec: target_batch_size=1024
889+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet, reverse_row_groups=true
906890
"
907891
);
908892
}
@@ -935,9 +919,8 @@ fn test_sort_pushdown_projection_subset_of_columns() {
935919
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=parquet
936920
output:
937921
Ok:
938-
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
939-
- ProjectionExec: expr=[a@0 as a]
940-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
922+
- ProjectionExec: expr=[a@0 as a]
923+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=parquet, reverse_row_groups=true
941924
"
942925
);
943926
}

datafusion/datasource-parquet/src/opener.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ pub(super) struct ParquetOpener {
118118
pub max_predicate_cache_size: Option<usize>,
119119
/// Whether to read row groups in reverse order
120120
pub reverse_row_groups: bool,
121+
/// Whether to reverse rows within each batch (for Exact reverse scan)
122+
pub reverse_rows: bool,
121123
}
122124

123125
/// Represents a prepared access plan with optional row selection
@@ -274,6 +276,7 @@ impl FileOpener for ParquetOpener {
274276
let max_predicate_cache_size = self.max_predicate_cache_size;
275277

276278
let reverse_row_groups = self.reverse_row_groups;
279+
let reverse_rows = self.reverse_rows;
277280
Ok(Box::pin(async move {
278281
#[cfg(feature = "parquet_encryption")]
279282
let file_decryption_properties = encryption_context
@@ -601,6 +604,16 @@ impl FileOpener for ParquetOpener {
601604
&predicate_cache_inner_records,
602605
&predicate_cache_records,
603606
);
607+
// When exact reverse scanning, reverse the rows within each
608+
// batch so that the output is in exact descending order.
609+
// Combined with reversed row group order, this gives globally
610+
// sorted output (Exact), enabling fetch pushdown to the scan.
611+
if reverse_rows && b.num_rows() > 1 {
612+
let indices = arrow::array::UInt32Array::from_iter_values(
613+
(0..b.num_rows() as u32).rev(),
614+
);
615+
b = arrow::compute::take_record_batch(&b, &indices)?;
616+
}
604617
b = projector.project_batch(&b)?;
605618
if replace_schema {
606619
// Ensure the output batch has the expected schema.
@@ -1076,6 +1089,7 @@ mod test {
10761089
coerce_int96: Option<arrow::datatypes::TimeUnit>,
10771090
max_predicate_cache_size: Option<usize>,
10781091
reverse_row_groups: bool,
1092+
reverse_rows: bool,
10791093
}
10801094

10811095
impl ParquetOpenerBuilder {
@@ -1101,6 +1115,7 @@ mod test {
11011115
coerce_int96: None,
11021116
max_predicate_cache_size: None,
11031117
reverse_row_groups: false,
1118+
reverse_rows: false,
11041119
}
11051120
}
11061121

@@ -1208,6 +1223,7 @@ mod test {
12081223
encryption_factory: None,
12091224
max_predicate_cache_size: self.max_predicate_cache_size,
12101225
reverse_row_groups: self.reverse_row_groups,
1226+
reverse_rows: self.reverse_rows,
12111227
}
12121228
}
12131229
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -288,11 +288,14 @@ pub struct ParquetSource {
288288
pub(crate) projection: ProjectionExprs,
289289
#[cfg(feature = "parquet_encryption")]
290290
pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
291-
/// If true, read files in reverse order and reverse row groups within files.
292-
/// But it's not guaranteed that rows within row groups are in reverse order,
293-
/// so we still need to sort them after reading, so the reverse scan is inexact.
294-
/// Used to optimize ORDER BY ... DESC on sorted data.
291+
/// If true, read row groups in reverse order within each file.
292+
/// Combined with `reverse_rows`, controls the sort pushdown behavior:
293+
/// - `reverse_row_groups=true, reverse_rows=false`: Inexact (RGs reversed, rows within RG not)
294+
/// - `reverse_row_groups=true, reverse_rows=true`: Exact (both RGs and rows reversed)
295295
reverse_row_groups: bool,
296+
/// If true, reverse the row order within each batch after reading.
297+
/// This gives exact descending order when combined with `reverse_row_groups`.
298+
reverse_rows: bool,
296299
}
297300

298301
impl ParquetSource {
@@ -318,6 +321,7 @@ impl ParquetSource {
318321
#[cfg(feature = "parquet_encryption")]
319322
encryption_factory: None,
320323
reverse_row_groups: false,
324+
reverse_rows: false,
321325
}
322326
}
323327

@@ -567,6 +571,7 @@ impl FileSource for ParquetSource {
567571
encryption_factory: self.get_encryption_factory_with_config(),
568572
max_predicate_cache_size: self.max_predicate_cache_size(),
569573
reverse_row_groups: self.reverse_row_groups,
574+
reverse_rows: self.reverse_rows,
570575
});
571576
Ok(opener)
572577
}
@@ -803,10 +808,13 @@ impl FileSource for ParquetSource {
803808
return Ok(SortOrderPushdownResult::Unsupported);
804809
}
805810

806-
// Return Inexact because we're only reversing row group order,
807-
// not guaranteeing perfect row-level ordering
808-
let new_source = self.clone().with_reverse_row_groups(true);
809-
Ok(SortOrderPushdownResult::Inexact {
811+
// Return Exact because the opener reverses both row group order
812+
// AND rows within each batch, giving globally sorted output.
813+
// This allows the Sort operator to be removed entirely and
814+
// fetch to be pushed down to the scan for early termination.
815+
let mut new_source = self.clone().with_reverse_row_groups(true);
816+
new_source.reverse_rows = true;
817+
Ok(SortOrderPushdownResult::Exact {
810818
inner: Arc::new(new_source) as Arc<dyn FileSource>,
811819
})
812820

0 commit comments

Comments
 (0)