Skip to content

Commit 7014a45

Browse files
authored
feat: Extract NDV (distinct_count) statistics from Parquet metadata (#19957)
## Which issue does this PR close? - Part of #15265 Related: #18628, #8227 (I am not sure if an new issue specifically for the scope of the PR is needed, happy to create it if needed) <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> This work originates from a discussion in datafusion-distributed about improving the `TaskEstimator` API: datafusion-contrib/datafusion-distributed#296 (comment) We agreed that improved statistics support in DataFusion would benefit both projects. For distributed-datafusion, better cardinality estimation helps decide how to split computation across network boundaries. This also benefits DataFusion directly, as CBO is already in place, for example, join cardinality estimation ([`joins/utils.rs:586-646`](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/utils.rs#L586-L646)) uses `distinct_count` via `max_distinct_count` to compute join selectivity. Currently this field is always `Absent` when reading from Parquet, so this PR fills that gap. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Commit 1 - Reading NDV from Parquet files: - Extract `distinct_count` from Parquet row group column statistics - Single row group with NDV -> `Precision::Exact(ndv)` - Multiple row groups with NDV -> `Precision::Inexact(max)` as conservative lower bound - No NDV available -> `Precision::Absent` Commit 2 - Statistics propagation (can be split to a separate PR, if preferred): - `Statistics::try_merge()`: use max as conservative lower bound instead of discarding NDV - `Projection`: preserve NDV for single-column expressions as upper bound I'm including the second commit to showcase how I intend to use the statistics, but these changes can be split to a follow-up PR to keep review scope limited. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes, 7 unit tests are added for NDV extraction: - Single/multiple row groups with NDV - Partial NDV availability across row groups - Multiple columns with different NDV values - Integration test reading a real Parquet file with distinct_count statistics (following the pattern in [`row_filter.rs:685-696`](https://github.com/apache/datafusion/blob/main/datafusion/datasource-parquet/src/row_filter.rs#L685-L696), using `parquet_to_arrow_schema` to derive the schema from the file) ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> No breaking changes. Statistics consumers will now see populated `distinct_count` values when available in Parquet metadata. Disclaimer: I used AI (Claude Code) to assist translating my ideas into code as I am still ramping up with the codebase and especially with Rust (guidance on both aspects is highly appreciated). I have a good understanding of the core concepts (statistics, CBO etc.) and have carefully double-checked that the PR matches my intentions and understanding. cc: @gabotechs @jayshrivastava @NGA-TRAN @gene-bordegaray
1 parent d138c36 commit 7014a45

File tree

5 files changed

+858
-170
lines changed

5 files changed

+858
-170
lines changed

datafusion/common/src/stats.rs

Lines changed: 357 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ impl Statistics {
665665
max_value: cs.max_value.clone(),
666666
min_value: cs.min_value.clone(),
667667
sum_value: cs.sum_value.clone(),
668-
distinct_count: Precision::Absent,
668+
distinct_count: cs.distinct_count,
669669
byte_size: cs.byte_size,
670670
})
671671
.collect();
@@ -679,11 +679,24 @@ impl Statistics {
679679
let item_cs = &stat.column_statistics[col_idx];
680680

681681
col_stats.null_count = col_stats.null_count.add(&item_cs.null_count);
682-
col_stats.byte_size = col_stats.byte_size.add(&item_cs.byte_size);
683-
col_stats.sum_value =
684-
precision_add(&col_stats.sum_value, &item_cs.sum_value);
682+
683+
// NDV must be computed before min/max update (needs pre-merge ranges)
684+
col_stats.distinct_count = match (
685+
col_stats.distinct_count.get_value(),
686+
item_cs.distinct_count.get_value(),
687+
) {
688+
(Some(&l), Some(&r)) => Precision::Inexact(
689+
estimate_ndv_with_overlap(col_stats, item_cs, l, r)
690+
.unwrap_or_else(|| usize::max(l, r)),
691+
),
692+
_ => Precision::Absent,
693+
};
694+
685695
col_stats.min_value = col_stats.min_value.min(&item_cs.min_value);
686696
col_stats.max_value = col_stats.max_value.max(&item_cs.max_value);
697+
col_stats.sum_value =
698+
precision_add(&col_stats.sum_value, &item_cs.sum_value);
699+
col_stats.byte_size = col_stats.byte_size.add(&item_cs.byte_size);
687700
}
688701
}
689702

@@ -695,6 +708,96 @@ impl Statistics {
695708
}
696709
}
697710

711+
/// Estimates the combined number of distinct values (NDV) when merging two
712+
/// column statistics, using range overlap to avoid double-counting shared values.
713+
///
714+
/// Assumes values are distributed uniformly within each input's
715+
/// `[min, max]` range (the standard assumption when only summary
716+
/// statistics are available). Under uniformity the fraction of an input's
717+
/// distinct values that land in a sub-range equals the fraction of
718+
/// the range that sub-range covers.
719+
///
720+
/// The combined value space is split into three disjoint regions:
721+
///
722+
/// ```text
723+
/// |-- only A --|-- overlap --|-- only B --|
724+
/// ```
725+
///
726+
/// * **Only in A/B** - values outside the other input's range
727+
/// contribute `(1 - overlap_a) * NDV_a` and `(1 - overlap_b) * NDV_b`.
728+
/// * **Overlap** - both inputs may produce values here. We take
729+
/// `max(overlap_a * NDV_a, overlap_b * NDV_b)` rather than the
730+
/// sum because values in the same sub-range are likely shared
731+
/// (the smaller set is assumed to be a subset of the larger).
732+
///
733+
/// The formula ranges between `[max(NDV_a, NDV_b), NDV_a + NDV_b]`,
734+
/// from full overlap to no overlap.
735+
///
736+
/// ```text
737+
/// NDV = max(overlap_a * NDV_a, overlap_b * NDV_b) [intersection]
738+
/// + (1 - overlap_a) * NDV_a [only in A]
739+
/// + (1 - overlap_b) * NDV_b [only in B]
740+
/// ```
741+
///
742+
/// Returns `None` when min/max are absent or distance is unsupported
743+
/// (e.g. strings), in which case the caller should fall back to a simpler
744+
/// estimate.
745+
pub fn estimate_ndv_with_overlap(
746+
left: &ColumnStatistics,
747+
right: &ColumnStatistics,
748+
ndv_left: usize,
749+
ndv_right: usize,
750+
) -> Option<usize> {
751+
let left_min = left.min_value.get_value()?;
752+
let left_max = left.max_value.get_value()?;
753+
let right_min = right.min_value.get_value()?;
754+
let right_max = right.max_value.get_value()?;
755+
756+
let range_left = left_max.distance(left_min)?;
757+
let range_right = right_max.distance(right_min)?;
758+
759+
// Constant columns (range == 0) can't use the proportional overlap
760+
// formula below, so check interval overlap directly instead.
761+
if range_left == 0 || range_right == 0 {
762+
let overlaps = left_min <= right_max && right_min <= left_max;
763+
return Some(if overlaps {
764+
usize::max(ndv_left, ndv_right)
765+
} else {
766+
ndv_left + ndv_right
767+
});
768+
}
769+
770+
let overlap_min = if left_min >= right_min {
771+
left_min
772+
} else {
773+
right_min
774+
};
775+
let overlap_max = if left_max <= right_max {
776+
left_max
777+
} else {
778+
right_max
779+
};
780+
781+
// Disjoint ranges: no overlap, NDVs are additive
782+
if overlap_min > overlap_max {
783+
return Some(ndv_left + ndv_right);
784+
}
785+
786+
let overlap_range = overlap_max.distance(overlap_min)? as f64;
787+
788+
let overlap_left = overlap_range / range_left as f64;
789+
let overlap_right = overlap_range / range_right as f64;
790+
791+
let intersection = f64::max(
792+
overlap_left * ndv_left as f64,
793+
overlap_right * ndv_right as f64,
794+
);
795+
let only_left = (1.0 - overlap_left) * ndv_left as f64;
796+
let only_right = (1.0 - overlap_right) * ndv_right as f64;
797+
798+
Some((intersection + only_left + only_right).round() as usize)
799+
}
800+
698801
/// Creates an estimate of the number of rows in the output using the given
699802
/// optional value and exactness flag.
700803
fn check_num_rows(value: Option<usize>, is_exact: bool) -> Precision<usize> {
@@ -1361,6 +1464,253 @@ mod tests {
13611464
);
13621465
}
13631466

1467+
#[test]
1468+
fn test_try_merge_distinct_count_absent() {
1469+
// Create statistics with known distinct counts
1470+
let stats1 = Statistics::default()
1471+
.with_num_rows(Precision::Exact(10))
1472+
.with_total_byte_size(Precision::Exact(100))
1473+
.add_column_statistics(
1474+
ColumnStatistics::new_unknown()
1475+
.with_null_count(Precision::Exact(0))
1476+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(1))))
1477+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
1478+
.with_distinct_count(Precision::Exact(5)),
1479+
);
1480+
1481+
let stats2 = Statistics::default()
1482+
.with_num_rows(Precision::Exact(15))
1483+
.with_total_byte_size(Precision::Exact(150))
1484+
.add_column_statistics(
1485+
ColumnStatistics::new_unknown()
1486+
.with_null_count(Precision::Exact(0))
1487+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
1488+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(20))))
1489+
.with_distinct_count(Precision::Exact(7)),
1490+
);
1491+
1492+
// Merge statistics
1493+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1494+
let merged_stats =
1495+
Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
1496+
1497+
// Verify the results
1498+
assert_eq!(merged_stats.num_rows, Precision::Exact(25));
1499+
assert_eq!(merged_stats.total_byte_size, Precision::Exact(250));
1500+
1501+
let col_stats = &merged_stats.column_statistics[0];
1502+
assert_eq!(col_stats.null_count, Precision::Exact(0));
1503+
assert_eq!(
1504+
col_stats.min_value,
1505+
Precision::Exact(ScalarValue::Int32(Some(1)))
1506+
);
1507+
assert_eq!(
1508+
col_stats.max_value,
1509+
Precision::Exact(ScalarValue::Int32(Some(20)))
1510+
);
1511+
// Overlap-based NDV: ranges [1,10] and [5,20], overlap [5,10]
1512+
// range_left=9, range_right=15, overlap=5
1513+
// overlap_left=5*(5/9)=2.78, overlap_right=7*(5/15)=2.33
1514+
// result = max(2.78, 2.33) + (5-2.78) + (7-2.33) = 9.67 -> 10
1515+
assert_eq!(col_stats.distinct_count, Precision::Inexact(10));
1516+
}
1517+
1518+
#[test]
1519+
fn test_try_merge_ndv_disjoint_ranges() {
1520+
let stats1 = Statistics::default()
1521+
.with_num_rows(Precision::Exact(10))
1522+
.add_column_statistics(
1523+
ColumnStatistics::new_unknown()
1524+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
1525+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
1526+
.with_distinct_count(Precision::Exact(5)),
1527+
);
1528+
let stats2 = Statistics::default()
1529+
.with_num_rows(Precision::Exact(10))
1530+
.add_column_statistics(
1531+
ColumnStatistics::new_unknown()
1532+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(20))))
1533+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(30))))
1534+
.with_distinct_count(Precision::Exact(8)),
1535+
);
1536+
1537+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1538+
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
1539+
// No overlap -> sum of NDVs
1540+
assert_eq!(
1541+
merged.column_statistics[0].distinct_count,
1542+
Precision::Inexact(13)
1543+
);
1544+
}
1545+
1546+
#[test]
1547+
fn test_try_merge_ndv_identical_ranges() {
1548+
let stats1 = Statistics::default()
1549+
.with_num_rows(Precision::Exact(100))
1550+
.add_column_statistics(
1551+
ColumnStatistics::new_unknown()
1552+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
1553+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
1554+
.with_distinct_count(Precision::Exact(50)),
1555+
);
1556+
let stats2 = Statistics::default()
1557+
.with_num_rows(Precision::Exact(100))
1558+
.add_column_statistics(
1559+
ColumnStatistics::new_unknown()
1560+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
1561+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
1562+
.with_distinct_count(Precision::Exact(30)),
1563+
);
1564+
1565+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1566+
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
1567+
// Full overlap -> max(50, 30) = 50
1568+
assert_eq!(
1569+
merged.column_statistics[0].distinct_count,
1570+
Precision::Inexact(50)
1571+
);
1572+
}
1573+
1574+
#[test]
1575+
fn test_try_merge_ndv_partial_overlap() {
1576+
let stats1 = Statistics::default()
1577+
.with_num_rows(Precision::Exact(100))
1578+
.add_column_statistics(
1579+
ColumnStatistics::new_unknown()
1580+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
1581+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
1582+
.with_distinct_count(Precision::Exact(80)),
1583+
);
1584+
let stats2 = Statistics::default()
1585+
.with_num_rows(Precision::Exact(100))
1586+
.add_column_statistics(
1587+
ColumnStatistics::new_unknown()
1588+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(50))))
1589+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(150))))
1590+
.with_distinct_count(Precision::Exact(60)),
1591+
);
1592+
1593+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1594+
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
1595+
// overlap=[50,100], range_left=100, range_right=100, overlap_range=50
1596+
// overlap_left=80*(50/100)=40, overlap_right=60*(50/100)=30
1597+
// result = max(40,30) + (80-40) + (60-30) = 40 + 40 + 30 = 110
1598+
assert_eq!(
1599+
merged.column_statistics[0].distinct_count,
1600+
Precision::Inexact(110)
1601+
);
1602+
}
1603+
1604+
#[test]
1605+
fn test_try_merge_ndv_missing_min_max() {
1606+
let stats1 = Statistics::default()
1607+
.with_num_rows(Precision::Exact(10))
1608+
.add_column_statistics(
1609+
ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(5)),
1610+
);
1611+
let stats2 = Statistics::default()
1612+
.with_num_rows(Precision::Exact(10))
1613+
.add_column_statistics(
1614+
ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(8)),
1615+
);
1616+
1617+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1618+
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
1619+
// No min/max -> fallback to max(5, 8)
1620+
assert_eq!(
1621+
merged.column_statistics[0].distinct_count,
1622+
Precision::Inexact(8)
1623+
);
1624+
}
1625+
1626+
#[test]
1627+
fn test_try_merge_ndv_non_numeric_types() {
1628+
let stats1 = Statistics::default()
1629+
.with_num_rows(Precision::Exact(10))
1630+
.add_column_statistics(
1631+
ColumnStatistics::new_unknown()
1632+
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
1633+
"aaa".to_string(),
1634+
))))
1635+
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
1636+
"zzz".to_string(),
1637+
))))
1638+
.with_distinct_count(Precision::Exact(5)),
1639+
);
1640+
let stats2 = Statistics::default()
1641+
.with_num_rows(Precision::Exact(10))
1642+
.add_column_statistics(
1643+
ColumnStatistics::new_unknown()
1644+
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
1645+
"bbb".to_string(),
1646+
))))
1647+
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
1648+
"yyy".to_string(),
1649+
))))
1650+
.with_distinct_count(Precision::Exact(8)),
1651+
);
1652+
1653+
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1654+
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
1655+
// distance() unsupported for strings -> fallback to max
1656+
assert_eq!(
1657+
merged.column_statistics[0].distinct_count,
1658+
Precision::Inexact(8)
1659+
);
1660+
}
1661+
1662+
#[test]
1663+
fn test_try_merge_ndv_constant_columns() {
1664+
// Same constant: [5,5]+[5,5] -> max
1665+
let stats1 = Statistics::default()
1666+
.with_num_rows(Precision::Exact(10))
1667+
.add_column_statistics(
1668+
ColumnStatistics::new_unknown()
1669+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
1670+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
1671+
.with_distinct_count(Precision::Exact(1)),
1672+
);
1673+
let stats2 = Statistics::default()
1674+
.with_num_rows(Precision::Exact(10))
1675+
.add_column_statistics(
1676+
ColumnStatistics::new_unknown()
1677+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
1678+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
1679+
.with_distinct_count(Precision::Exact(1)),
1680+
);
1681+
1682+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1683+
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
1684+
assert_eq!(
1685+
merged.column_statistics[0].distinct_count,
1686+
Precision::Inexact(1)
1687+
);
1688+
1689+
// Different constants: [5,5]+[10,10] -> sum
1690+
let stats3 = Statistics::default()
1691+
.with_num_rows(Precision::Exact(10))
1692+
.add_column_statistics(
1693+
ColumnStatistics::new_unknown()
1694+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
1695+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
1696+
.with_distinct_count(Precision::Exact(1)),
1697+
);
1698+
let stats4 = Statistics::default()
1699+
.with_num_rows(Precision::Exact(10))
1700+
.add_column_statistics(
1701+
ColumnStatistics::new_unknown()
1702+
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(10))))
1703+
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
1704+
.with_distinct_count(Precision::Exact(1)),
1705+
);
1706+
1707+
let merged = Statistics::try_merge_iter([&stats3, &stats4], &schema).unwrap();
1708+
assert_eq!(
1709+
merged.column_statistics[0].distinct_count,
1710+
Precision::Inexact(2)
1711+
);
1712+
}
1713+
13641714
#[test]
13651715
fn test_with_fetch_basic_preservation() {
13661716
// Test that column statistics and byte size are preserved (as inexact) when applying fetch
@@ -2005,8 +2355,9 @@ mod tests {
20052355
Precision::Exact(ScalarValue::Int64(Some(3500)))
20062356
);
20072357
assert_eq!(col_stats.byte_size, Precision::Exact(480));
2008-
// distinct_count is always Absent after merge (can't accurately merge NDV)
2009-
assert_eq!(col_stats.distinct_count, Precision::Absent);
2358+
// Overlap-based NDV merge (pairwise left-to-right):
2359+
// stats1+stats2: [10,100]+[5,200] -> NDV=16, then +stats3: [5,200]+[1,150] -> NDV=29
2360+
assert_eq!(col_stats.distinct_count, Precision::Inexact(29));
20102361
}
20112362

20122363
#[test]

0 commit comments

Comments
 (0)