Skip to content

Commit 4b2ab26

Browse files
committed
Cap NDV at row count in joins, filters, and with_fetch
1 parent 8d91fb0 commit 4b2ab26

4 files changed

Lines changed: 136 additions & 18 deletions

File tree

datafusion/common/src/stats.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,10 @@ impl Statistics {
594594
}
595595
Precision::Absent => Precision::Absent,
596596
};
597+
// NDV can never exceed the number of rows
598+
if let Some(&rows) = self.num_rows.get_value() {
599+
cs.distinct_count = cs.distinct_count.min(&Precision::Inexact(rows));
600+
}
597601
cs
598602
})
599603
.collect();
@@ -2169,7 +2173,8 @@ mod tests {
21692173
result_col_stats.sum_value,
21702174
Precision::Inexact(ScalarValue::Int32(Some(123456)))
21712175
);
2172-
assert_eq!(result_col_stats.distinct_count, Precision::Inexact(789));
2176+
// NDV is capped at the new row count (250) since 789 > 250
2177+
assert_eq!(result_col_stats.distinct_count, Precision::Inexact(250));
21732178
}
21742179

21752180
#[test]
@@ -2280,6 +2285,46 @@ mod tests {
22802285
assert_eq!(result.total_byte_size, Precision::Inexact(800));
22812286
}
22822287

2288+
#[test]
2289+
fn test_with_fetch_caps_ndv_at_row_count() {
2290+
// NDV=500 but after LIMIT 10, NDV should be capped at 10
2291+
let stats = Statistics {
2292+
num_rows: Precision::Exact(1000),
2293+
total_byte_size: Precision::Exact(8000),
2294+
column_statistics: vec![ColumnStatistics {
2295+
distinct_count: Precision::Inexact(500),
2296+
..Default::default()
2297+
}],
2298+
};
2299+
2300+
let result = stats.with_fetch(Some(10), 0, 1).unwrap();
2301+
assert_eq!(result.num_rows, Precision::Exact(10));
2302+
assert_eq!(
2303+
result.column_statistics[0].distinct_count,
2304+
Precision::Inexact(10)
2305+
);
2306+
}
2307+
2308+
#[test]
2309+
fn test_with_fetch_ndv_below_row_count_unchanged() {
2310+
// NDV=5 and LIMIT 10: NDV should stay at 5
2311+
let stats = Statistics {
2312+
num_rows: Precision::Exact(1000),
2313+
total_byte_size: Precision::Exact(8000),
2314+
column_statistics: vec![ColumnStatistics {
2315+
distinct_count: Precision::Inexact(5),
2316+
..Default::default()
2317+
}],
2318+
};
2319+
2320+
let result = stats.with_fetch(Some(10), 0, 1).unwrap();
2321+
assert_eq!(result.num_rows, Precision::Exact(10));
2322+
assert_eq!(
2323+
result.column_statistics[0].distinct_count,
2324+
Precision::Inexact(5)
2325+
);
2326+
}
2327+
22832328
#[test]
22842329
fn test_try_merge_iter_basic() {
22852330
let schema = Arc::new(Schema::new(vec![

datafusion/core/tests/custom_sources_cases/statistics.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -281,17 +281,18 @@ async fn sql_limit() -> Result<()> {
281281
let df = ctx.sql("SELECT * FROM stats_table LIMIT 5").await.unwrap();
282282
let physical_plan = df.create_physical_plan().await.unwrap();
283283
// when the limit is smaller than the original number of lines we mark the statistics as inexact
284+
// and cap NDV at the new row count
285+
let limit_stats = physical_plan.partition_statistics(None)?;
286+
assert_eq!(limit_stats.num_rows, Precision::Exact(5));
287+
// c1: NDV=2 stays at 2 (already below limit of 5)
284288
assert_eq!(
285-
Statistics {
286-
num_rows: Precision::Exact(5),
287-
column_statistics: stats
288-
.column_statistics
289-
.iter()
290-
.map(|c| c.clone().to_inexact())
291-
.collect(),
292-
total_byte_size: Precision::Absent
293-
},
294-
*physical_plan.partition_statistics(None)?
289+
limit_stats.column_statistics[0].distinct_count,
290+
Precision::Inexact(2)
291+
);
292+
// c2: NDV=13 capped to 5 (the limit row count)
293+
assert_eq!(
294+
limit_stats.column_statistics[1].distinct_count,
295+
Precision::Inexact(5)
295296
);
296297

297298
let df = ctx

datafusion/physical-plan/src/filter.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ impl FilterExec {
341341
schema,
342342
&input_stats.column_statistics,
343343
analysis_ctx.boundaries,
344+
num_rows.get_value().copied(),
344345
);
345346
Ok(Statistics {
346347
num_rows,
@@ -781,6 +782,7 @@ fn collect_new_statistics(
781782
schema: &SchemaRef,
782783
input_column_stats: &[ColumnStatistics],
783784
analysis_boundaries: Vec<ExprBoundaries>,
785+
filtered_num_rows: Option<usize>,
784786
) -> Vec<ColumnStatistics> {
785787
analysis_boundaries
786788
.into_iter()
@@ -816,11 +818,17 @@ fn collect_new_statistics(
816818
let min_value = interval_bound_to_precision(lower, is_single_value);
817819
let max_value = interval_bound_to_precision(upper, is_single_value);
818820
// When the interval collapses to a single value (equality
819-
// predicate), the column has exactly 1 distinct value
821+
// predicate), the column has exactly 1 distinct value.
822+
// Otherwise, cap NDV at the filtered row count.
820823
let capped_distinct_count = if is_single_value {
821824
Precision::Exact(1)
822825
} else {
823-
distinct_count.to_inexact()
826+
match filtered_num_rows {
827+
Some(rows) => {
828+
distinct_count.to_inexact().min(&Precision::Inexact(rows))
829+
}
830+
None => distinct_count.to_inexact(),
831+
}
824832
};
825833
ColumnStatistics {
826834
null_count: input_column_stats[idx].null_count.to_inexact(),
@@ -2398,10 +2406,12 @@ mod tests {
23982406
statistics.column_statistics[0].distinct_count,
23992407
Precision::Exact(1)
24002408
);
2401-
// b > 10 narrows to [11, 50] but doesn't collapse
2409+
// b > 10 narrows to [11, 50] but doesn't collapse to a single value.
2410+
// The combined selectivity of a=42 (1/80) and c=7 (1/150) on 100 rows
2411+
// computes num_rows = 1, so NDV is capped at the row count: min(40, 1) = 1.
24022412
assert_eq!(
24032413
statistics.column_statistics[1].distinct_count,
2404-
Precision::Inexact(40)
2414+
Precision::Inexact(1)
24052415
);
24062416
// c = 7 collapses to single value
24072417
assert_eq!(
@@ -2639,4 +2649,41 @@ mod tests {
26392649
assert_eq!(out_schema.field(1).name(), "tokens");
26402650
Ok(())
26412651
}
2652+
2653+
#[tokio::test]
2654+
async fn test_filter_statistics_ndv_capped_at_row_count() -> Result<()> {
2655+
// Table: a: min=1, max=100, distinct_count=80, 100 rows
2656+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2657+
let input = Arc::new(StatisticsExec::new(
2658+
Statistics {
2659+
num_rows: Precision::Inexact(100),
2660+
total_byte_size: Precision::Inexact(400),
2661+
column_statistics: vec![ColumnStatistics {
2662+
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
2663+
max_value: Precision::Inexact(ScalarValue::Int32(Some(100))),
2664+
distinct_count: Precision::Inexact(80),
2665+
..Default::default()
2666+
}],
2667+
},
2668+
schema.clone(),
2669+
));
2670+
2671+
// a <= 10 => ~10 rows out of 100
2672+
let predicate: Arc<dyn PhysicalExpr> =
2673+
binary(col("a", &schema)?, Operator::LtEq, lit(10i32), &schema)?;
2674+
2675+
let filter: Arc<dyn ExecutionPlan> =
2676+
Arc::new(FilterExec::try_new(predicate, input)?);
2677+
2678+
let statistics = filter.partition_statistics(None)?;
2679+
// Filter estimates ~10 rows (selectivity = 10/100)
2680+
assert_eq!(statistics.num_rows, Precision::Inexact(10));
2681+
// NDV should be capped at the filtered row count (10), not the original 80
2682+
let ndv = &statistics.column_statistics[0].distinct_count;
2683+
assert!(
2684+
ndv.get_value().copied() <= Some(10),
2685+
"Expected NDV <= 10 (filtered row count), got {ndv:?}"
2686+
);
2687+
Ok(())
2688+
}
26422689
}

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,13 @@ fn max_distinct_count(
708708
stats: &ColumnStatistics,
709709
) -> Precision<usize> {
710710
match &stats.distinct_count {
711-
&dc @ (Precision::Exact(_) | Precision::Inexact(_)) => dc,
711+
&dc @ (Precision::Exact(_) | Precision::Inexact(_)) => {
712+
// NDV can never exceed the number of rows
713+
match num_rows {
714+
Precision::Absent => dc,
715+
_ => dc.min(num_rows).to_inexact(),
716+
}
717+
}
712718
_ => {
713719
// The number can never be greater than the number of rows we have
714720
// minus the nulls (since they don't count as distinct values).
@@ -2287,6 +2293,22 @@ mod tests {
22872293
(10, Inexact(1), Inexact(10), Absent, Absent),
22882294
Some(Inexact(0)),
22892295
),
2296+
// NDV > num_rows: distinct count should be capped at row count
2297+
(
2298+
(5, Inexact(1), Inexact(100), Inexact(50), Absent),
2299+
(10, Inexact(1), Inexact(100), Inexact(50), Absent),
2300+
// max_distinct_count caps: left NDV=min(50,5)=5, right NDV=min(50,10)=10
2301+
// cardinality = (5 * 10) / max(5, 10) = 50 / 10 = 5
2302+
Some(Inexact(5)),
2303+
),
2304+
// NDV > num_rows on one side only
2305+
(
2306+
(3, Inexact(1), Inexact(100), Inexact(100), Absent),
2307+
(10, Inexact(1), Inexact(100), Inexact(5), Absent),
2308+
// max_distinct_count caps: left NDV=min(100,3)=3, right NDV=min(5,10)=5
2309+
// cardinality = (3 * 10) / max(3, 5) = 30 / 5 = 6
2310+
Some(Inexact(6)),
2311+
),
22902312
];
22912313

22922314
for (left_info, right_info, expected_cardinality) in cases {
@@ -2426,11 +2448,14 @@ mod tests {
24262448
// y: min=0, max=100, distinct=None
24272449
//
24282450
// Join on a=c, b=d (ignore x/y)
2451+
// Right column d has NDV=2500 but only 2000 rows, so NDV is capped
2452+
// to 2000. join_selectivity = max(500, 2000) = 2000.
2453+
// Inner cardinality = (1000 * 2000) / 2000 = 1000
24292454
let cases = vec![
2430-
(JoinType::Inner, 800),
2455+
(JoinType::Inner, 1000),
24312456
(JoinType::Left, 1000),
24322457
(JoinType::Right, 2000),
2433-
(JoinType::Full, 2200),
2458+
(JoinType::Full, 2000),
24342459
];
24352460

24362461
let left_col_stats = vec![

0 commit comments

Comments
 (0)