Skip to content

Commit 59e99a4

Browse files
committed
fix: prevent sort elimination when files have NULLs in sort columns
Add null-count check in try_sort_file_groups_by_statistics to avoid claiming Exact when any file contains NULLs. With NULLS LAST/FIRST, eliminating SortExec could produce wrong row ordering across files. Includes unit tests and SLT Test F for NULL handling coverage.
1 parent 35a581c commit 59e99a4

2 files changed

Lines changed: 226 additions & 2 deletions

File tree

datafusion/datasource/src/file_scan_config.rs

Lines changed: 154 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::{
2727
use arrow::datatypes::FieldRef;
2828
use arrow::datatypes::{DataType, Schema, SchemaRef};
2929
use datafusion_common::config::ConfigOptions;
30+
use datafusion_common::stats::Precision;
3031
use datafusion_common::tree_node::TreeNodeRecursion;
3132
use datafusion_common::{
3233
Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
@@ -1556,8 +1557,25 @@ impl FileScanConfig {
15561557
// Re-check: now that files are sorted, does output_ordering become valid?
15571558
// This handles the case where validated_output_ordering() previously
15581559
// stripped the ordering because files were in the wrong order.
1559-
if result.all_non_overlapping && !self.output_ordering.is_empty() {
1560-
// Files are now non-overlapping and we have declared output_ordering.
1560+
//
1561+
// IMPORTANT: We cannot claim Exact if any file in a non-last position
1562+
// contains NULLs in the sort columns. With NULLS LAST, NULLs within
1563+
// a file are placed after all non-null values. If the next file has
1564+
// non-null values smaller than the previous file's max, those values
1565+
// would incorrectly appear after the NULLs. Similarly for NULLS FIRST.
1566+
//
1567+
// Conservative approach: if any file has nulls in the sort columns,
1568+
// do not claim Exact. The SortExec will handle NULL ordering correctly.
1569+
if result.all_non_overlapping
1570+
&& !self.output_ordering.is_empty()
1571+
&& !Self::any_file_has_nulls_in_sort_columns(
1572+
&new_config.file_groups,
1573+
order,
1574+
&projected_schema,
1575+
projection_indices.as_deref(),
1576+
)
1577+
{
1578+
// Files are now non-overlapping, no NULLs in sort columns.
15611579
// Re-ask the FileSource if this ordering satisfies the request,
15621580
// using eq_properties computed from the NEW (sorted) file groups.
15631581
let new_eq_props = new_config.eq_properties();
@@ -1574,6 +1592,43 @@ impl FileScanConfig {
15741592
inner: Arc::new(new_config),
15751593
})
15761594
}
1595+
1596+
/// Check if any file in any group has nulls in the sort columns.
1597+
fn any_file_has_nulls_in_sort_columns(
1598+
file_groups: &[FileGroup],
1599+
order: &[PhysicalSortExpr],
1600+
projected_schema: &SchemaRef,
1601+
projection_indices: Option<&[usize]>,
1602+
) -> bool {
1603+
let Some(sort_columns) =
1604+
sort_columns_from_physical_sort_exprs_nullable(order, projected_schema)
1605+
else {
1606+
return true; // Can't determine, assume nulls exist
1607+
};
1608+
1609+
for group in file_groups {
1610+
for file in group.iter() {
1611+
let Some(stats) = file.statistics.as_ref() else {
1612+
return true; // No stats, assume nulls exist
1613+
};
1614+
for col in &sort_columns {
1615+
let stat_idx = projection_indices
1616+
.map(|p| p[col.index()])
1617+
.unwrap_or_else(|| col.index());
1618+
if stat_idx >= stats.column_statistics.len() {
1619+
return true;
1620+
}
1621+
let col_stats = &stats.column_statistics[stat_idx];
1622+
match &col_stats.null_count {
1623+
Precision::Exact(0) => {} // No nulls, safe
1624+
Precision::Exact(_) => return true, // Has nulls
1625+
_ => return true, // Unknown null count, assume nulls
1626+
}
1627+
}
1628+
}
1629+
}
1630+
false
1631+
}
15771632
}
15781633

15791634
impl Debug for FileScanConfig {
@@ -1629,6 +1684,17 @@ fn ordered_column_indices_from_projection(
16291684
.collect::<Option<Vec<usize>>>()
16301685
}
16311686

1687+
/// Extract Column references from sort expressions for null checking.
1688+
fn sort_columns_from_physical_sort_exprs_nullable(
1689+
order: &[PhysicalSortExpr],
1690+
_schema: &SchemaRef,
1691+
) -> Option<Vec<Column>> {
1692+
order
1693+
.iter()
1694+
.map(|expr| expr.expr.as_any().downcast_ref::<Column>().cloned())
1695+
.collect()
1696+
}
1697+
16321698
/// Check whether a given ordering is valid for all file groups by verifying
16331699
/// that files within each group are sorted according to their min/max statistics.
16341700
///
@@ -3436,4 +3502,90 @@ mod tests {
34363502
assert!(pushed_config.output_ordering.is_empty());
34373503
Ok(())
34383504
}
3505+
3506+
/// Helper: create a PartitionedFile with stats including null count
3507+
fn make_file_with_null_stats(
3508+
name: &str,
3509+
min: f64,
3510+
max: f64,
3511+
null_count: usize,
3512+
) -> PartitionedFile {
3513+
PartitionedFile::new(name.to_string(), 1024).with_statistics(Arc::new(
3514+
Statistics {
3515+
num_rows: Precision::Exact(100),
3516+
total_byte_size: Precision::Exact(1024),
3517+
column_statistics: vec![ColumnStatistics {
3518+
null_count: Precision::Exact(null_count),
3519+
min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
3520+
max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
3521+
..Default::default()
3522+
}],
3523+
},
3524+
))
3525+
}
3526+
3527+
#[test]
3528+
fn sort_pushdown_unsupported_with_nulls_does_not_upgrade_to_exact() -> Result<()> {
3529+
// Files are non-overlapping but one has NULLs.
3530+
// Should NOT upgrade to Exact — NULLs would appear in wrong position.
3531+
let file_schema =
3532+
Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)]));
3533+
let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
3534+
let file_source = Arc::new(MockSource::new(table_schema));
3535+
3536+
let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
3537+
3538+
// Files in wrong order (high min first) to trigger reordering
3539+
let file_groups = vec![FileGroup::new(vec![
3540+
make_file_with_null_stats("b_no_nulls", 10.0, 19.0, 0),
3541+
make_file_with_null_stats("a_with_nulls", 0.0, 9.0, 5), // has NULLs
3542+
])];
3543+
3544+
let config =
3545+
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
3546+
.with_file_groups(file_groups)
3547+
.with_output_ordering(vec![
3548+
LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
3549+
])
3550+
.build();
3551+
3552+
let result = config.try_pushdown_sort(&[sort_expr])?;
3553+
// Should be Inexact (not Exact) because of NULLs
3554+
assert!(
3555+
matches!(result, SortOrderPushdownResult::Inexact { .. }),
3556+
"Expected Inexact due to NULLs, got {result:?}"
3557+
);
3558+
Ok(())
3559+
}
3560+
3561+
#[test]
3562+
fn sort_pushdown_unsupported_no_nulls_upgrades_to_exact() -> Result<()> {
3563+
// Files are non-overlapping, no NULLs → should upgrade to Exact
3564+
let file_schema =
3565+
Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)]));
3566+
let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
3567+
let file_source = Arc::new(MockSource::new(table_schema));
3568+
3569+
let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
3570+
3571+
let file_groups = vec![FileGroup::new(vec![
3572+
make_file_with_null_stats("b_high", 10.0, 19.0, 0),
3573+
make_file_with_null_stats("a_low", 0.0, 9.0, 0),
3574+
])];
3575+
3576+
let config =
3577+
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
3578+
.with_file_groups(file_groups)
3579+
.with_output_ordering(vec![
3580+
LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
3581+
])
3582+
.build();
3583+
3584+
let result = config.try_pushdown_sort(&[sort_expr])?;
3585+
assert!(
3586+
matches!(result, SortOrderPushdownResult::Exact { .. }),
3587+
"Expected Exact (no NULLs), got {result:?}"
3588+
);
3589+
Ok(())
3590+
}
34393591
}

datafusion/sqllogictest/test_files/sort_pushdown.slt

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,6 +2088,78 @@ DROP TABLE te_src_c;
20882088
statement ok
20892089
DROP TABLE te_inferred_multi;
20902090

2091+
# ===========================================================
2092+
# Test F: NULL handling — sort pushdown must not eliminate Sort
2093+
# when files contain NULLs in sort columns, because NULL ordering
2094+
# (NULLS FIRST/LAST) across files requires a full sort.
2095+
# ===========================================================
2096+
2097+
# Test F.1: NULLS LAST — file with NULL must not cause wrong ordering
2098+
statement ok
2099+
CREATE TABLE null_src_a(id INT) AS VALUES (1), (NULL);
2100+
2101+
statement ok
2102+
CREATE TABLE null_src_b(id INT) AS VALUES (2), (3);
2103+
2104+
query I
2105+
COPY (SELECT * FROM null_src_a ORDER BY id ASC NULLS LAST)
2106+
TO 'test_files/scratch/sort_pushdown/tf_nulls/b_null_tail.parquet';
2107+
----
2108+
2
2109+
2110+
query I
2111+
COPY (SELECT * FROM null_src_b ORDER BY id ASC NULLS LAST)
2112+
TO 'test_files/scratch/sort_pushdown/tf_nulls/a_nonnull.parquet';
2113+
----
2114+
2
2115+
2116+
statement ok
2117+
CREATE EXTERNAL TABLE tf_nulls_last(id INT)
2118+
STORED AS PARQUET
2119+
LOCATION 'test_files/scratch/sort_pushdown/tf_nulls/'
2120+
WITH ORDER (id ASC NULLS LAST);
2121+
2122+
# With target_partitions=1, files end up in separate groups via
2123+
# split_groups_by_statistics. EnforceSorting eliminates SortExec,
2124+
# SPM merges the two sorted streams.
2125+
query TT
2126+
EXPLAIN SELECT * FROM tf_nulls_last ORDER BY id ASC NULLS LAST;
2127+
----
2128+
logical_plan
2129+
01)Sort: tf_nulls_last.id ASC NULLS LAST
2130+
02)--TableScan: tf_nulls_last projection=[id]
2131+
physical_plan
2132+
01)SortPreservingMergeExec: [id@0 ASC NULLS LAST]
2133+
02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tf_nulls/a_nonnull.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tf_nulls/b_null_tail.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
2134+
2135+
# Results must be correct: NULLs at the end
2136+
query I
2137+
SELECT * FROM tf_nulls_last ORDER BY id ASC NULLS LAST;
2138+
----
2139+
1
2140+
2
2141+
3
2142+
NULL
2143+
2144+
# Test F.2: NULLS FIRST — NULLs should come first
2145+
query I
2146+
SELECT * FROM tf_nulls_last ORDER BY id ASC NULLS FIRST;
2147+
----
2148+
NULL
2149+
1
2150+
2
2151+
3
2152+
2153+
# Cleanup Test F
2154+
statement ok
2155+
DROP TABLE null_src_a;
2156+
2157+
statement ok
2158+
DROP TABLE null_src_b;
2159+
2160+
statement ok
2161+
DROP TABLE tf_nulls_last;
2162+
20912163
# Reset settings (SLT runner uses target_partitions=4, not system default)
20922164
statement ok
20932165
SET datafusion.execution.target_partitions = 4;

0 commit comments

Comments
 (0)