Skip to content

Commit 102da39

Browse files
authored
feat: globally reorder files and row groups by statistics for TopK queries (#21956)
## Which issue does this PR close? - Closes #21317 - Partial Closes #21733 - Partial fix for #21399 - Split from #21580 ## Rationale TopK queries (`ORDER BY col LIMIT K`) on parquet with multiple out-of-order row groups converge slowly: the dynamic-filter threshold tightens only after the "best" row groups happen to be read. Reordering row groups so the most-promising ones come first lets the threshold tighten fast and prune the rest. ## What changes - **`PreparedAccessPlan::reorder_by_statistics`** — sorts row groups by `min(col)` ASC using parquet column statistics; composes with `reverse()` for DESC requests. Graceful no-op when stats are unavailable or the column isn't in the file schema. - **`ParquetSource::try_pushdown_sort`** — sets `sort_order_for_reorder` + `reverse_row_groups` whenever pushdown fires. Pushdown fires when the leading sort column is in the file schema, or when the source's reversed equivalence ordering satisfies the request (covers function-wrapped sorts via `EquivalenceProperties`'s monotonicity machinery). - **`FileSource::reorder_files`** — file-level reorder in the shared work queue, keyed off the file's `min` (ASC by `min` for ASC requests, DESC by `min` for DESC requests) so it matches `reorder_by_statistics` at the row-group level. The two orderings nest: file `i`'s `min` is a lower bound on every row group inside it, so the file queue's order is a natural prefix of the within-file row-group order. EXPLAIN surfaces both `sort_order_for_reorder=[...]` and `reverse_row_groups=true` so the optimization is visible in plans and snapshot tests. ## Where the optimization applies ```sql SELECT ... FROM <parquet_table> ORDER BY <col> [ASC|DESC] [LIMIT N] ``` The leading sort key must be a column that exists in the parquet file schema, with no `AggregateExec` between the `SortExec` and the parquet scan. Time-series, log-tailing, ranking lookups, and the `sort_pushdown` benchmark in this repo (`benchmarks/queries/sort_pushdown/q1-q8.sql`) all fit. ClickBench's TopK queries (`ORDER BY COUNT(*) DESC LIMIT N` etc.) are **not** in this category — the leading sort key is an aggregation result, so the column-in-schema check fails and the pushdown does not fire. Pushing sort metadata through `AggregateExec` is a separate problem (the aggregated column doesn't exist before aggregation, so even if the metadata reached the scan there'd be nothing actionable to do with it). ## Tested - 116 parquet lib unit tests pass (incl. `reorder_files` ASC/DESC/missing-stats/no-op coverage) - 32 `physical_optimizer::pushdown_sort` integration tests pass - SLT suites: `sort_pushdown`, `topk`, `dynamic_filter_pushdown_config`, `push_down_filter_parquet`, `window_topk_pushdown` - `cargo clippy --all-targets --all-features -- -D warnings` clean ## User-facing changes None — pure performance optimization. Query results unchanged.
1 parent 0512962 commit 102da39

24 files changed

Lines changed: 1745 additions & 184 deletions

datafusion/core/tests/dataframe/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3268,7 +3268,7 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti
32683268
UnionExec
32693269
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
32703270
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
3271-
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
3271+
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet, sort_order_for_reorder=[id@0 ASC NULLS LAST]
32723272
");
32733273
Ok(())
32743274
}
@@ -3286,7 +3286,7 @@ async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_reparti
32863286
UnionExec
32873287
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
32883288
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
3289-
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
3289+
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet, sort_order_for_reorder=[id@0 ASC NULLS LAST]
32903290
");
32913291

32923292
Ok(())

datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,23 @@ async fn test_fuzz_topk_filter_pushdown() {
317317
.map(|col| orders.get(**col).unwrap())
318318
.multi_cartesian_product()
319319
{
320+
// Add remaining columns as tiebreakers (ASC NULLS LAST)
321+
// to ensure deterministic results when RG reorder changes
322+
// the read order of rows with equal sort key values.
323+
let tiebreakers: Vec<String> = ["id", "name", "department"]
324+
.iter()
325+
.filter(|c| {
326+
!order_columns
327+
.iter()
328+
.take(num_order_by_columns)
329+
.any(|oc| **oc == **c)
330+
})
331+
.map(|c| format!("{c} ASC NULLS LAST"))
332+
.collect();
333+
let all_orderings =
334+
orderings.into_iter().chain(tiebreakers.iter()).join(", ");
320335
let query = format!(
321-
"SELECT * FROM test_table ORDER BY {} LIMIT {}",
322-
orderings.into_iter().join(", "),
323-
limit
336+
"SELECT * FROM test_table ORDER BY {all_orderings} LIMIT {limit}",
324337
);
325338
queries.push(query);
326339
}

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ fn test_sort_pushdown_basic_phase1() {
8585
output:
8686
Ok:
8787
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
88-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
88+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
8989
"
9090
);
9191
}
@@ -114,7 +114,7 @@ fn test_sort_with_limit_phase1() {
114114
output:
115115
Ok:
116116
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
117-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
117+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
118118
"
119119
);
120120
}
@@ -145,7 +145,7 @@ fn test_sort_multiple_columns_phase1() {
145145
output:
146146
Ok:
147147
- SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false]
148-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
148+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 ASC, b@1 DESC NULLS LAST]
149149
"
150150
);
151151
}
@@ -180,7 +180,7 @@ fn test_prefix_match_single_column() {
180180
output:
181181
Ok:
182182
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
183-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
183+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 ASC]
184184
"
185185
);
186186
}
@@ -214,7 +214,7 @@ fn test_prefix_match_with_limit() {
214214
output:
215215
Ok:
216216
- SortExec: TopK(fetch=100), expr=[a@0 DESC NULLS LAST, b@1 ASC], preserve_partitioning=[false]
217-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
217+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST, b@1 ASC], reverse_row_groups=true
218218
"
219219
);
220220
}
@@ -249,7 +249,7 @@ fn test_prefix_match_through_transparent_nodes() {
249249
Ok:
250250
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
251251
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
252-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
252+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 ASC]
253253
"
254254
);
255255
}
@@ -285,8 +285,13 @@ fn test_exact_prefix_match_same_direction() {
285285
}
286286

287287
#[test]
288-
fn test_no_prefix_match_longer_than_source() {
289-
// Test that prefix matching does NOT work if requested is longer than source
288+
fn test_inexact_pushdown_when_prefix_longer_than_source() {
289+
// Source has [a DESC] ordering, request is [a ASC, b DESC] — longer
290+
// than the source ordering so the prefix can't be matched. The
291+
// primary sort column 'a' is in the file schema, so sort pushdown
292+
// returns `Inexact` with `sort_order_for_reorder` set, drops the
293+
// source's `output_ordering` (the runtime reorder invalidates it),
294+
// and leaves the outer `SortExec` to enforce the full ordering.
290295
let schema = schema();
291296

292297
// Source has [a DESC] ordering (single column)
@@ -310,7 +315,7 @@ fn test_no_prefix_match_longer_than_source() {
310315
output:
311316
Ok:
312317
- SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST], preserve_partitioning=[false]
313-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST], file_type=parquet
318+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 ASC, b@1 DESC NULLS LAST]
314319
"
315320
);
316321
}
@@ -343,7 +348,7 @@ fn test_sort_through_repartition() {
343348
Ok:
344349
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
345350
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
346-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
351+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
347352
"
348353
);
349354
}
@@ -375,7 +380,7 @@ fn test_nested_sorts() {
375380
Ok:
376381
- SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
377382
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
378-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
383+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
379384
"
380385
);
381386
}
@@ -435,7 +440,7 @@ fn test_sort_through_coalesce_partitions() {
435440
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
436441
- CoalescePartitionsExec
437442
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
438-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
443+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
439444
"
440445
);
441446
}
@@ -467,7 +472,7 @@ fn test_complex_plan_with_multiple_operators() {
467472
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
468473
- CoalescePartitionsExec
469474
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
470-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
475+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
471476
"
472477
);
473478
}
@@ -501,14 +506,19 @@ fn test_multiple_sorts_different_columns() {
501506
Ok:
502507
- SortExec: expr=[c@2 ASC], preserve_partitioning=[false]
503508
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
504-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
509+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
505510
"
506511
);
507512
}
508513

509514
#[test]
510-
fn test_no_pushdown_for_unordered_source() {
511-
// Verify pushdown does NOT happen for sources without ordering
515+
fn test_inexact_pushdown_for_unordered_source() {
516+
// Source has no declared `output_ordering`, request is `[a ASC]`.
517+
// The reversed-equivalence check can't fire (nothing to reverse),
518+
// but 'a' is in the file schema — sort pushdown returns `Inexact`
519+
// with `sort_order_for_reorder` set so the opener can sort row
520+
// groups by `min(a)` at scan time. The surrounding `SortExec`
521+
// stays in place to enforce the full ordering.
512522
let schema = schema();
513523
let source = parquet_exec(schema.clone()); // No output_ordering
514524
let sort_exprs = LexOrdering::new(vec![sort_expr("a", &schema)]).unwrap();
@@ -524,14 +534,20 @@ fn test_no_pushdown_for_unordered_source() {
524534
output:
525535
Ok:
526536
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
527-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet
537+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 ASC]
528538
"
529539
);
530540
}
531541

532542
#[test]
533-
fn test_no_pushdown_for_non_reverse_sort() {
534-
// Verify pushdown does NOT happen when sort doesn't reverse source ordering
543+
fn test_inexact_pushdown_when_request_doesnt_match_source_ordering() {
544+
// The requested sort column ('b') doesn't match the source's natural
545+
// ordering ('a' ASC). Neither natural nor reversed satisfies the
546+
// request, but 'b' is in the file schema — so sort pushdown returns
547+
// `Inexact` with `sort_order_for_reorder` set, drops the source's
548+
// claimed `output_ordering` (the runtime row-group reorder
549+
// invalidates it), and keeps the surrounding `SortExec` for
550+
// correctness.
535551
let schema = schema();
536552

537553
// Source sorted by 'a' ASC
@@ -554,7 +570,7 @@ fn test_no_pushdown_for_non_reverse_sort() {
554570
output:
555571
Ok:
556572
- SortExec: expr=[b@1 ASC], preserve_partitioning=[false]
557-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
573+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[b@1 ASC]
558574
"
559575
);
560576
}
@@ -630,7 +646,7 @@ fn test_pushdown_through_blocking_node() {
630646
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
631647
- AggregateExec: mode=Final, gby=[a@0 as a], aggr=[COUNT(b)], ordering_mode=Sorted
632648
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
633-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
649+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
634650
"
635651
);
636652
}
@@ -668,7 +684,7 @@ fn test_sort_pushdown_through_simple_projection() {
668684
Ok:
669685
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
670686
- ProjectionExec: expr=[a@0 as a, b@1 as b]
671-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
687+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
672688
"
673689
);
674690
}
@@ -703,7 +719,7 @@ fn test_sort_pushdown_through_projection_with_alias() {
703719
Ok:
704720
- SortExec: expr=[id@0 DESC NULLS LAST], preserve_partitioning=[false]
705721
- ProjectionExec: expr=[a@0 as id, b@1 as value]
706-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
722+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
707723
"
708724
);
709725
}
@@ -792,7 +808,7 @@ fn test_sort_pushdown_projection_reordered_columns() {
792808
Ok:
793809
- SortExec: expr=[a@2 DESC NULLS LAST], preserve_partitioning=[false]
794810
- ProjectionExec: expr=[c@2 as c, b@1 as b, a@0 as a]
795-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
811+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
796812
"
797813
);
798814
}
@@ -826,7 +842,7 @@ fn test_sort_pushdown_projection_with_limit() {
826842
Ok:
827843
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
828844
- ProjectionExec: expr=[a@0 as a, b@1 as b]
829-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
845+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
830846
"
831847
);
832848
}
@@ -860,7 +876,7 @@ fn test_sort_pushdown_through_projection() {
860876
Ok:
861877
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
862878
- ProjectionExec: expr=[a@0 as a, b@1 as b]
863-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
879+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
864880
"
865881
);
866882
}
@@ -895,7 +911,7 @@ fn test_sort_pushdown_projection_subset_of_columns() {
895911
Ok:
896912
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
897913
- ProjectionExec: expr=[a@0 as a]
898-
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
914+
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, sort_order_for_reorder=[a@0 DESC NULLS LAST], reverse_row_groups=true
899915
"
900916
);
901917
}

0 commit comments

Comments
 (0)