Skip to content

Commit f6408c4

Browse files
authored
Merge pull request #2 from adriangb/pr22493-rg-reorder-fix
fix(sort-pushdown): drop runtime row-group reorder hints on Inexact→Exact upgrade (correctness)
2 parents 42149c2 + ae3e8c7 commit f6408c4

3 files changed

Lines changed: 116 additions & 22 deletions

File tree

datafusion/datasource/src/file_scan_config/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -978,7 +978,7 @@ impl DataSource for FileScanConfig {
978978
}
979979
}
980980
SortOrderPushdownResult::Inexact { inner } => {
981-
let config = self.rebuild_with_source(inner, false, order)?;
981+
let mut config = self.rebuild_with_source(inner, false, order)?;
982982
// `rebuild_with_source` reorders files by stats; if the
983983
// post-sort files are non-overlapping AND the request now
984984
// validates against the new file groups, `output_ordering`
@@ -992,6 +992,26 @@ impl DataSource for FileScanConfig {
992992
inner: Arc::new(config),
993993
})
994994
} else {
995+
// Upgrading to Exact: the post-sort file groups are
996+
// non-overlapping and each file's declared ordering
997+
// re-validates, so reading the files in their natural
998+
// (declared-sorted) order already yields the requested
999+
// ordering — exactly like the `Unsupported` → Exact path,
1000+
// which reads files in natural order too.
1001+
//
1002+
// Drop the runtime row-group reorder hints the Inexact
1003+
// source carried (`sort_order_for_reorder` /
1004+
// `reverse_row_groups`) by restoring the original,
1005+
// hint-free source. With the `SortExec` removed those
1006+
// hints are not just redundant but unsafe: for a DESC
1007+
// request the opener sorts row groups ASC-by-min and then
1008+
// reverses them, which mis-orders two row groups within a
1009+
// single file that share the same `min` (e.g. a file
1010+
// `[10,8,8,8]` whose row groups are `[10,8]` and `[8,8]`
1011+
// would stream as `8,8,10,8`). The `SortExec` used to
1012+
// mask this; once it is gone the reordered stream is the
1013+
// final, wrong answer.
1014+
config.file_source = Arc::clone(&self.file_source);
9951015
Ok(SortOrderPushdownResult::Exact {
9961016
inner: Arc::new(config),
9971017
})

datafusion/datasource/src/file_scan_config/sort_pushdown.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -169,20 +169,24 @@ impl FileScanConfig {
169169
//
170170
// This interleaving is actually beneficial because SPM pulls from both
171171
// partitions concurrently, keeping parallel I/O active.
172-
let keep_ordering = if all_non_overlapping {
173-
if is_exact {
174-
true
175-
} else {
176-
// Re-validate: now that files are sorted, can the
177-
// request be satisfied?
178-
//
179-
// Same NULL guard as `try_sort_file_groups_by_statistics`:
180-
// we cannot claim Exact if any non-last file contains
181-
// NULLs in the sort columns. With NULLS LAST those
182-
// NULLs sit after all non-null rows in the file, so
183-
// when the next file's non-nulls are smaller than the
184-
// previous file's max, they'd appear *after* the NULLs
185-
// in the concatenated stream — breaking the ordering.
172+
let keep_ordering = match (all_non_overlapping, is_exact) {
173+
// Files still overlap after the stats sort — the combined
174+
// stream isn't ordered, so `output_ordering` must be dropped.
175+
(false, _) => false,
176+
// Source already had validated ordering and the post-sort
177+
// files still don't overlap — Exact carries through.
178+
(true, true) => true,
179+
// Source returned `Inexact`; re-validate against the
180+
// reordered file groups to decide whether to upgrade.
181+
//
182+
// Same NULL guard as `try_sort_file_groups_by_statistics`:
183+
// we cannot claim Exact if any non-last file contains
184+
// NULLs in the sort columns. With NULLS LAST those
185+
// NULLs sit after all non-null rows in the file, so
186+
// when the next file's non-nulls are smaller than the
187+
// previous file's max, they'd appear *after* the NULLs
188+
// in the concatenated stream — breaking the ordering.
189+
(true, false) => {
186190
let projected_schema = new_config.projected_schema()?;
187191
let projection_indices = new_config
188192
.file_source
@@ -201,8 +205,6 @@ impl FileScanConfig {
201205
new_eq_props.ordering_satisfy(order.iter().cloned())?
202206
}
203207
}
204-
} else {
205-
false
206208
};
207209

208210
if !keep_ordering {

datafusion/sqllogictest/test_files/sort_pushdown.slt

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,7 +1110,7 @@ EXPLAIN SELECT * FROM reversed_parquet ORDER BY id ASC;
11101110
logical_plan
11111111
01)Sort: reversed_parquet.id ASC NULLS LAST
11121112
02)--TableScan: reversed_parquet projection=[id, value]
1113-
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, sort_order_for_reorder=[id@0 ASC NULLS LAST]
1113+
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
11141114

11151115
# Test 4.2: Results must be correct
11161116
query II
@@ -1336,7 +1336,7 @@ EXPLAIN SELECT * FROM reversed_with_order_parquet ORDER BY id ASC;
13361336
logical_plan
13371337
01)Sort: reversed_with_order_parquet.id ASC NULLS LAST
13381338
02)--TableScan: reversed_with_order_parquet projection=[id, value]
1339-
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, sort_order_for_reorder=[id@0 ASC NULLS LAST]
1339+
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/c_low.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/reversed/a_high.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
13401340

13411341
# Test 6.2: Results must be correct
13421342
query II
@@ -1473,7 +1473,7 @@ EXPLAIN SELECT * FROM desc_reversed_parquet ORDER BY id DESC;
14731473
logical_plan
14741474
01)Sort: desc_reversed_parquet.id DESC NULLS FIRST
14751475
02)--TableScan: desc_reversed_parquet projection=[id, value]
1476-
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/b_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/a_low.parquet]]}, projection=[id, value], output_ordering=[id@0 DESC], file_type=parquet, sort_order_for_reorder=[id@0 DESC], reverse_row_groups=true
1476+
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/b_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/desc_reversed/a_low.parquet]]}, projection=[id, value], output_ordering=[id@0 DESC], file_type=parquet
14771477

14781478
# Test 8.2: Results must be correct
14791479
query II
@@ -1486,6 +1486,78 @@ SELECT * FROM desc_reversed_parquet ORDER BY id DESC;
14861486
2 200
14871487
1 100
14881488

1489+
# Test 8b: DESC with multiple row groups per file sharing a min value.
1490+
# Regression test for the Inexact→Exact upgrade: when SortExec is eliminated
1491+
# the files must be read in natural order. The opener's runtime row-group
1492+
# reorder (sort ASC-by-min then reverse) mis-orders two row groups in one file
1493+
# that share the same min — so the upgrade must NOT leave those hints active.
1494+
#
1495+
# File b_high is DESC-sorted [10,8,8,8] written with 2 rows per row group:
1496+
# RG0 = [10, 8] (min 8, max 10)
1497+
# RG1 = [ 8, 8] (min 8, max 8)
1498+
# Both row groups have min=8. Naively reordering RGs ASC-by-min then reversing
1499+
# yields [RG1, RG0] → 8,8,10,8 (wrong). Natural order [RG0, RG1] is correct.
1500+
1501+
statement ok
1502+
CREATE TABLE rg_desc_high(id INT, value INT) AS VALUES (10, 100), (8, 801), (8, 802), (8, 803);
1503+
1504+
statement ok
1505+
CREATE TABLE rg_desc_low(id INT, value INT) AS VALUES (3, 300), (2, 200), (1, 100);
1506+
1507+
query I
1508+
COPY (SELECT * FROM rg_desc_high ORDER BY id DESC)
1509+
TO 'test_files/scratch/sort_pushdown/rg_desc/b_high.parquet'
1510+
OPTIONS ('format.max_row_group_size' '2');
1511+
----
1512+
4
1513+
1514+
query I
1515+
COPY (SELECT * FROM rg_desc_low ORDER BY id DESC)
1516+
TO 'test_files/scratch/sort_pushdown/rg_desc/a_low.parquet'
1517+
OPTIONS ('format.max_row_group_size' '2');
1518+
----
1519+
3
1520+
1521+
# Files named so filesystem order [a_low, b_high] is wrong for DESC → the
1522+
# Inexact path fires, stats reorder makes file groups [b_high, a_low]
1523+
# non-overlapping, and the upgrade eliminates SortExec.
1524+
statement ok
1525+
CREATE EXTERNAL TABLE rg_desc_parquet(id INT, value INT)
1526+
STORED AS PARQUET
1527+
LOCATION 'test_files/scratch/sort_pushdown/rg_desc/'
1528+
WITH ORDER (id DESC);
1529+
1530+
# SortExec eliminated, files reordered, NO sort_order_for_reorder /
1531+
# reverse_row_groups (natural read is correct after the upgrade).
1532+
query TT
1533+
EXPLAIN SELECT id FROM rg_desc_parquet ORDER BY id DESC;
1534+
----
1535+
logical_plan
1536+
01)Sort: rg_desc_parquet.id DESC NULLS FIRST
1537+
02)--TableScan: rg_desc_parquet projection=[id]
1538+
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/rg_desc/b_high.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/rg_desc/a_low.parquet]]}, projection=[id], output_ordering=[id@0 DESC], file_type=parquet
1539+
1540+
# Results must be in DESC order — id=10 first.
1541+
query I
1542+
SELECT id FROM rg_desc_parquet ORDER BY id DESC;
1543+
----
1544+
10
1545+
8
1546+
8
1547+
8
1548+
3
1549+
2
1550+
1
1551+
1552+
statement ok
1553+
DROP TABLE rg_desc_parquet;
1554+
1555+
statement ok
1556+
DROP TABLE rg_desc_high;
1557+
1558+
statement ok
1559+
DROP TABLE rg_desc_low;
1560+
14891561
# Test 9: Multi-column sort key validation
14901562
# Files have (category, id) ordering. Files share a boundary value on category='B'
14911563
# so column-level min/max statistics overlap on the primary key column.
@@ -2366,7 +2438,7 @@ logical_plan
23662438
physical_plan
23672439
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST]
23682440
02)--BufferExec: capacity=1073741824
2369-
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, sort_order_for_reorder=[id@0 ASC NULLS LAST]
2441+
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
23702442

23712443
# Verify correctness
23722444
query II
@@ -2393,7 +2465,7 @@ logical_plan
23932465
physical_plan
23942466
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=3
23952467
02)--BufferExec: capacity=1073741824
2396-
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet, sort_order_for_reorder=[id@0 ASC NULLS LAST]
2468+
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/b_mid.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/a_high.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tg_buffer/c_low.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
23972469

23982470
query II
23992471
SELECT * FROM tg_buffer ORDER BY id ASC LIMIT 3;

0 commit comments

Comments
 (0)