Skip to content

Commit 6adef34

Browse files
committed
Cap NDV at row count in joins, filters, and with_fetch
1 parent be273d6 commit 6adef34

File tree

3 files changed

+121
-5
lines changed

3 files changed

+121
-5
lines changed

datafusion/common/src/stats.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,10 @@ impl Statistics {
551551
}
552552
Precision::Absent => Precision::Absent,
553553
};
554+
// NDV can never exceed the number of rows
555+
if let Some(&rows) = self.num_rows.get_value() {
556+
cs.distinct_count = cs.distinct_count.min(&Precision::Inexact(rows));
557+
}
554558
cs
555559
})
556560
.collect();
@@ -1974,7 +1978,8 @@ mod tests {
19741978
result_col_stats.sum_value,
19751979
Precision::Inexact(ScalarValue::Int32(Some(123456)))
19761980
);
1977-
assert_eq!(result_col_stats.distinct_count, Precision::Inexact(789));
1981+
// NDV is capped at the new row count (250) since 789 > 250
1982+
assert_eq!(result_col_stats.distinct_count, Precision::Inexact(250));
19781983
}
19791984

19801985
#[test]
@@ -2075,6 +2080,46 @@ mod tests {
20752080
assert_eq!(result.total_byte_size, Precision::Inexact(800));
20762081
}
20772082

2083+
#[test]
2084+
fn test_with_fetch_caps_ndv_at_row_count() {
2085+
// NDV=500 but after LIMIT 10, NDV should be capped at 10
2086+
let stats = Statistics {
2087+
num_rows: Precision::Exact(1000),
2088+
total_byte_size: Precision::Exact(8000),
2089+
column_statistics: vec![ColumnStatistics {
2090+
distinct_count: Precision::Inexact(500),
2091+
..Default::default()
2092+
}],
2093+
};
2094+
2095+
let result = stats.with_fetch(Some(10), 0, 1).unwrap();
2096+
assert_eq!(result.num_rows, Precision::Exact(10));
2097+
assert_eq!(
2098+
result.column_statistics[0].distinct_count,
2099+
Precision::Inexact(10)
2100+
);
2101+
}
2102+
2103+
#[test]
2104+
fn test_with_fetch_ndv_below_row_count_unchanged() {
2105+
// NDV=5 and LIMIT 10: NDV should stay at 5
2106+
let stats = Statistics {
2107+
num_rows: Precision::Exact(1000),
2108+
total_byte_size: Precision::Exact(8000),
2109+
column_statistics: vec![ColumnStatistics {
2110+
distinct_count: Precision::Inexact(5),
2111+
..Default::default()
2112+
}],
2113+
};
2114+
2115+
let result = stats.with_fetch(Some(10), 0, 1).unwrap();
2116+
assert_eq!(result.num_rows, Precision::Exact(10));
2117+
assert_eq!(
2118+
result.column_statistics[0].distinct_count,
2119+
Precision::Inexact(5)
2120+
);
2121+
}
2122+
20782123
#[test]
20792124
fn test_try_merge_iter_basic() {
20802125
let schema = Arc::new(Schema::new(vec![

datafusion/physical-plan/src/filter.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ impl FilterExec {
341341
schema,
342342
&input_stats.column_statistics,
343343
analysis_ctx.boundaries,
344+
num_rows.get_value().copied(),
344345
);
345346
Ok(Statistics {
346347
num_rows,
@@ -809,6 +810,7 @@ fn collect_new_statistics(
809810
schema: &SchemaRef,
810811
input_column_stats: &[ColumnStatistics],
811812
analysis_boundaries: Vec<ExprBoundaries>,
813+
filtered_num_rows: Option<usize>,
812814
) -> Vec<ColumnStatistics> {
813815
analysis_boundaries
814816
.into_iter()
@@ -842,12 +844,19 @@ fn collect_new_statistics(
842844
let is_exact = !lower.is_null() && !upper.is_null() && lower == upper;
843845
let min_value = interval_bound_to_precision(lower, is_exact);
844846
let max_value = interval_bound_to_precision(upper, is_exact);
847+
// NDV can never exceed the number of rows after filtering
848+
let capped_distinct_count = match filtered_num_rows {
849+
Some(rows) => {
850+
distinct_count.to_inexact().min(&Precision::Inexact(rows))
851+
}
852+
None => distinct_count.to_inexact(),
853+
};
845854
ColumnStatistics {
846855
null_count: input_column_stats[idx].null_count.to_inexact(),
847856
max_value,
848857
min_value,
849858
sum_value: Precision::Absent,
850-
distinct_count: distinct_count.to_inexact(),
859+
distinct_count: capped_distinct_count,
851860
byte_size: input_column_stats[idx].byte_size,
852861
}
853862
},
@@ -2274,4 +2283,41 @@ mod tests {
22742283

22752284
Ok(())
22762285
}
2286+
2287+
#[tokio::test]
2288+
async fn test_filter_statistics_ndv_capped_at_row_count() -> Result<()> {
2289+
// Table: a: min=1, max=100, distinct_count=80, 100 rows
2290+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2291+
let input = Arc::new(StatisticsExec::new(
2292+
Statistics {
2293+
num_rows: Precision::Inexact(100),
2294+
total_byte_size: Precision::Inexact(400),
2295+
column_statistics: vec![ColumnStatistics {
2296+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2297+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2298+
distinct_count: Precision::Inexact(80),
2299+
..Default::default()
2300+
}],
2301+
},
2302+
schema.clone(),
2303+
));
2304+
2305+
// a <= 10 => ~10 rows out of 100
2306+
let predicate: Arc<dyn PhysicalExpr> =
2307+
binary(col("a", &schema)?, Operator::LtEq, lit(10i32), &schema)?;
2308+
2309+
let filter: Arc<dyn ExecutionPlan> =
2310+
Arc::new(FilterExec::try_new(predicate, input)?);
2311+
2312+
let statistics = filter.partition_statistics(None)?;
2313+
// Filter estimates ~10 rows (selectivity = 10/100)
2314+
assert_eq!(statistics.num_rows, Precision::Inexact(10));
2315+
// NDV should be capped at the filtered row count (10), not the original 80
2316+
let ndv = &statistics.column_statistics[0].distinct_count;
2317+
assert!(
2318+
ndv.get_value().copied() <= Some(10),
2319+
"Expected NDV <= 10 (filtered row count), got {ndv:?}"
2320+
);
2321+
Ok(())
2322+
}
22772323
}

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,13 @@ fn max_distinct_count(
707707
stats: &ColumnStatistics,
708708
) -> Precision<usize> {
709709
match &stats.distinct_count {
710-
&dc @ (Precision::Exact(_) | Precision::Inexact(_)) => dc,
710+
&dc @ (Precision::Exact(_) | Precision::Inexact(_)) => {
711+
// NDV can never exceed the number of rows
712+
match num_rows {
713+
Precision::Absent => dc,
714+
_ => dc.min(num_rows).to_inexact(),
715+
}
716+
}
711717
_ => {
712718
// The number can never be greater than the number of rows we have
713719
// minus the nulls (since they don't count as distinct values).
@@ -2292,6 +2298,22 @@ mod tests {
22922298
(10, Inexact(1), Inexact(10), Absent, Absent),
22932299
Some(Inexact(0)),
22942300
),
2301+
// NDV > num_rows: distinct count should be capped at row count
2302+
(
2303+
(5, Inexact(1), Inexact(100), Inexact(50), Absent),
2304+
(10, Inexact(1), Inexact(100), Inexact(50), Absent),
2305+
// max_distinct_count caps: left NDV=min(50,5)=5, right NDV=min(50,10)=10
2306+
// cardinality = (5 * 10) / max(5, 10) = 50 / 10 = 5
2307+
Some(Inexact(5)),
2308+
),
2309+
// NDV > num_rows on one side only
2310+
(
2311+
(3, Inexact(1), Inexact(100), Inexact(100), Absent),
2312+
(10, Inexact(1), Inexact(100), Inexact(5), Absent),
2313+
// max_distinct_count caps: left NDV=min(100,3)=3, right NDV=min(5,10)=5
2314+
// cardinality = (3 * 10) / max(3, 5) = 30 / 5 = 6
2315+
Some(Inexact(6)),
2316+
),
22952317
];
22962318

22972319
for (left_info, right_info, expected_cardinality) in cases {
@@ -2431,11 +2453,14 @@ mod tests {
24312453
// y: min=0, max=100, distinct=None
24322454
//
24332455
// Join on a=c, b=d (ignore x/y)
2456+
// Right column d has NDV=2500 but only 2000 rows, so NDV is capped
2457+
// to 2000. join_selectivity = max(500, 2000) = 2000.
2458+
// Inner cardinality = (1000 * 2000) / 2000 = 1000
24342459
let cases = vec![
2435-
(JoinType::Inner, 800),
2460+
(JoinType::Inner, 1000),
24362461
(JoinType::Left, 1000),
24372462
(JoinType::Right, 2000),
2438-
(JoinType::Full, 2200),
2463+
(JoinType::Full, 2000),
24392464
];
24402465

24412466
let left_col_stats = vec![

0 commit comments

Comments
 (0)