Skip to content

Commit d09a919

Browse files
authored
Use shared statistics merge for union stats (apache#21430)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Part of apache#8229 ## Rationale for this change DataFusion already has shared logic for merging `Statistics`, but `UnionExec` and `InterleaveExec` still used their own local merge code. That left duplicated path in the codebase and kept the behavior less consistent than the other statistics aggregation paths. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - Reuse `Statistics::try_merge_iter` for `UnionExec` statistics merging - Reuse the same shared path for `InterleaveExec` statistics merging - Remove the local union-specific statistics merge helpers - Add tests for union and interleave statistics merging - Add a test for interleave partition-level statistics merging <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 040c21e commit d09a919

3 files changed

Lines changed: 620 additions & 376 deletions

File tree

datafusion/common/src/stats.rs

Lines changed: 297 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,27 @@ pub struct Statistics {
372372
pub column_statistics: Vec<ColumnStatistics>,
373373
}
374374

375+
/// Fallback to use when NDV overlap can not be estimated from column bounds.
376+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
377+
pub enum NdvFallback {
378+
/// Use the larger input NDV. This is the conservative default for
379+
/// related fragments such as files from the same table.
380+
#[default]
381+
Max,
382+
/// Sum the input NDVs. This is a conservative upper bound for
383+
/// independent inputs such as `UNION ALL`.
384+
Sum,
385+
}
386+
387+
impl NdvFallback {
388+
fn merge(self, left: usize, right: usize) -> usize {
389+
match self {
390+
Self::Max => usize::max(left, right),
391+
Self::Sum => left.saturating_add(right),
392+
}
393+
}
394+
}
395+
375396
impl Default for Statistics {
376397
/// Returns a new [`Statistics`] instance with all fields set to unknown
377398
/// and no columns.
@@ -630,6 +651,9 @@ impl Statistics {
630651
/// The method assumes that all statistics are for the same schema.
631652
/// If not, maybe you can call `SchemaMapper::map_column_statistics` to make them consistent.
632653
///
654+
/// This method uses [`NdvFallback::Max`] when `distinct_count` overlap
655+
/// can not be estimated from column bounds.
656+
///
633657
/// Returns an error if the statistics do not match the specified schemas.
634658
///
635659
/// # Example
@@ -670,6 +694,19 @@ impl Statistics {
670694
/// Precision::Exact(ScalarValue::Int64(Some(1500))));
671695
/// ```
672696
pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result<Statistics>
697+
where
698+
I: IntoIterator<Item = &'a Statistics>,
699+
{
700+
Self::try_merge_iter_with_ndv_fallback(items, schema, NdvFallback::Max)
701+
}
702+
703+
/// Same as [`Statistics::try_merge_iter`], but lets callers choose the
704+
/// fallback used when `distinct_count` overlap can not be estimated.
705+
pub fn try_merge_iter_with_ndv_fallback<'a, I>(
706+
items: I,
707+
schema: &Schema,
708+
ndv_fallback: NdvFallback,
709+
) -> Result<Statistics>
673710
where
674711
I: IntoIterator<Item = &'a Statistics>,
675712
{
@@ -717,7 +754,7 @@ impl Statistics {
717754
) {
718755
(Some(&l), Some(&r)) => Precision::Inexact(
719756
estimate_ndv_with_overlap(col_stats, item_cs, l, r)
720-
.unwrap_or_else(|| usize::max(l, r)),
757+
.unwrap_or_else(|| ndv_fallback.merge(l, r)),
721758
),
722759
_ => Precision::Absent,
723760
};
@@ -1465,6 +1502,44 @@ mod tests {
14651502
}
14661503
}
14671504

1505+
fn make_single_i64_ndv_stats(
1506+
distinct_count: Precision<usize>,
1507+
min_value: Option<i64>,
1508+
max_value: Option<i64>,
1509+
) -> Statistics {
1510+
let to_precision = |value| Precision::Exact(ScalarValue::Int64(Some(value)));
1511+
1512+
Statistics::default()
1513+
.with_num_rows(Precision::Exact(10))
1514+
.add_column_statistics(
1515+
ColumnStatistics::new_unknown()
1516+
.with_distinct_count(distinct_count)
1517+
.with_min_value(
1518+
min_value.map(to_precision).unwrap_or(Precision::Absent),
1519+
)
1520+
.with_max_value(
1521+
max_value.map(to_precision).unwrap_or(Precision::Absent),
1522+
),
1523+
)
1524+
}
1525+
1526+
fn merge_single_i64_ndv_distinct_count(
1527+
left: Statistics,
1528+
right: Statistics,
1529+
ndv_fallback: NdvFallback,
1530+
) -> Precision<usize> {
1531+
let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
1532+
1533+
Statistics::try_merge_iter_with_ndv_fallback(
1534+
[&left, &right],
1535+
&schema,
1536+
ndv_fallback,
1537+
)
1538+
.unwrap()
1539+
.column_statistics[0]
1540+
.distinct_count
1541+
}
1542+
14681543
#[test]
14691544
fn test_try_merge() {
14701545
// Create a schema with two columns
@@ -1815,7 +1890,7 @@ mod tests {
18151890

18161891
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
18171892
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
1818-
// No min/max -> fallback to max(5, 8)
1893+
// No min/max -> default fallback is max
18191894
assert_eq!(
18201895
merged.column_statistics[0].distinct_count,
18211896
Precision::Inexact(8)
@@ -1851,13 +1926,55 @@ mod tests {
18511926

18521927
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
18531928
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
1854-
// distance() unsupported for strings -> fallback to max
1929+
// distance() unsupported for strings -> default fallback is max
18551930
assert_eq!(
18561931
merged.column_statistics[0].distinct_count,
18571932
Precision::Inexact(8)
18581933
);
18591934
}
18601935

1936+
#[test]
1937+
fn test_try_merge_ndv_non_numeric_types_sum_fallback() {
1938+
let stats1 = Statistics::default()
1939+
.with_num_rows(Precision::Exact(10))
1940+
.add_column_statistics(
1941+
ColumnStatistics::new_unknown()
1942+
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
1943+
"aaa".to_string(),
1944+
))))
1945+
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
1946+
"zzz".to_string(),
1947+
))))
1948+
.with_distinct_count(Precision::Exact(5)),
1949+
);
1950+
let stats2 = Statistics::default()
1951+
.with_num_rows(Precision::Exact(10))
1952+
.add_column_statistics(
1953+
ColumnStatistics::new_unknown()
1954+
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
1955+
"bbb".to_string(),
1956+
))))
1957+
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
1958+
"yyy".to_string(),
1959+
))))
1960+
.with_distinct_count(Precision::Exact(8)),
1961+
);
1962+
1963+
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1964+
let merged = Statistics::try_merge_iter_with_ndv_fallback(
1965+
[&stats1, &stats2],
1966+
&schema,
1967+
NdvFallback::Sum,
1968+
)
1969+
.unwrap();
1970+
1971+
// distance() unsupported for strings -> sum fallback is caller-selected
1972+
assert_eq!(
1973+
merged.column_statistics[0].distinct_count,
1974+
Precision::Inexact(13)
1975+
);
1976+
}
1977+
18611978
#[test]
18621979
fn test_try_merge_ndv_constant_columns() {
18631980
// Same constant: [5,5]+[5,5] -> max
@@ -1910,6 +2027,183 @@ mod tests {
19102027
);
19112028
}
19122029

2030+
#[test]
2031+
fn test_try_merge_ndv_original_union_edge_cases() {
2032+
struct NdvTestCase {
2033+
name: &'static str,
2034+
left_ndv: Precision<usize>,
2035+
left_min: Option<i64>,
2036+
left_max: Option<i64>,
2037+
right_ndv: Precision<usize>,
2038+
right_min: Option<i64>,
2039+
right_max: Option<i64>,
2040+
expected: Precision<usize>,
2041+
}
2042+
2043+
let cases = vec![
2044+
NdvTestCase {
2045+
name: "disjoint ranges",
2046+
left_ndv: Precision::Exact(5),
2047+
left_min: Some(0),
2048+
left_max: Some(10),
2049+
right_ndv: Precision::Exact(3),
2050+
right_min: Some(20),
2051+
right_max: Some(30),
2052+
expected: Precision::Inexact(8),
2053+
},
2054+
NdvTestCase {
2055+
name: "identical ranges",
2056+
left_ndv: Precision::Exact(10),
2057+
left_min: Some(0),
2058+
left_max: Some(100),
2059+
right_ndv: Precision::Exact(8),
2060+
right_min: Some(0),
2061+
right_max: Some(100),
2062+
expected: Precision::Inexact(10),
2063+
},
2064+
NdvTestCase {
2065+
name: "partial overlap",
2066+
left_ndv: Precision::Exact(100),
2067+
left_min: Some(0),
2068+
left_max: Some(100),
2069+
right_ndv: Precision::Exact(50),
2070+
right_min: Some(50),
2071+
right_max: Some(150),
2072+
expected: Precision::Inexact(125),
2073+
},
2074+
NdvTestCase {
2075+
name: "right contained in left",
2076+
left_ndv: Precision::Exact(100),
2077+
left_min: Some(0),
2078+
left_max: Some(100),
2079+
right_ndv: Precision::Exact(50),
2080+
right_min: Some(25),
2081+
right_max: Some(75),
2082+
expected: Precision::Inexact(100),
2083+
},
2084+
NdvTestCase {
2085+
name: "same constant value",
2086+
left_ndv: Precision::Exact(1),
2087+
left_min: Some(5),
2088+
left_max: Some(5),
2089+
right_ndv: Precision::Exact(1),
2090+
right_min: Some(5),
2091+
right_max: Some(5),
2092+
expected: Precision::Inexact(1),
2093+
},
2094+
NdvTestCase {
2095+
name: "different constant values",
2096+
left_ndv: Precision::Exact(1),
2097+
left_min: Some(5),
2098+
left_max: Some(5),
2099+
right_ndv: Precision::Exact(1),
2100+
right_min: Some(10),
2101+
right_max: Some(10),
2102+
expected: Precision::Inexact(2),
2103+
},
2104+
NdvTestCase {
2105+
name: "left constant within right range",
2106+
left_ndv: Precision::Exact(1),
2107+
left_min: Some(5),
2108+
left_max: Some(5),
2109+
right_ndv: Precision::Exact(10),
2110+
right_min: Some(0),
2111+
right_max: Some(10),
2112+
expected: Precision::Inexact(10),
2113+
},
2114+
NdvTestCase {
2115+
name: "left constant outside right range",
2116+
left_ndv: Precision::Exact(1),
2117+
left_min: Some(20),
2118+
left_max: Some(20),
2119+
right_ndv: Precision::Exact(10),
2120+
right_min: Some(0),
2121+
right_max: Some(10),
2122+
expected: Precision::Inexact(11),
2123+
},
2124+
NdvTestCase {
2125+
name: "right constant within left range",
2126+
left_ndv: Precision::Exact(10),
2127+
left_min: Some(0),
2128+
left_max: Some(10),
2129+
right_ndv: Precision::Exact(1),
2130+
right_min: Some(5),
2131+
right_max: Some(5),
2132+
expected: Precision::Inexact(10),
2133+
},
2134+
NdvTestCase {
2135+
name: "right constant outside left range",
2136+
left_ndv: Precision::Exact(10),
2137+
left_min: Some(0),
2138+
left_max: Some(10),
2139+
right_ndv: Precision::Exact(1),
2140+
right_min: Some(20),
2141+
right_max: Some(20),
2142+
expected: Precision::Inexact(11),
2143+
},
2144+
NdvTestCase {
2145+
name: "missing bounds exact plus exact",
2146+
left_ndv: Precision::Exact(10),
2147+
left_min: None,
2148+
left_max: None,
2149+
right_ndv: Precision::Exact(5),
2150+
right_min: None,
2151+
right_max: None,
2152+
expected: Precision::Inexact(15),
2153+
},
2154+
NdvTestCase {
2155+
name: "missing bounds exact plus inexact",
2156+
left_ndv: Precision::Exact(10),
2157+
left_min: None,
2158+
left_max: None,
2159+
right_ndv: Precision::Inexact(5),
2160+
right_min: None,
2161+
right_max: None,
2162+
expected: Precision::Inexact(15),
2163+
},
2164+
NdvTestCase {
2165+
name: "missing bounds inexact plus inexact",
2166+
left_ndv: Precision::Inexact(7),
2167+
left_min: None,
2168+
left_max: None,
2169+
right_ndv: Precision::Inexact(3),
2170+
right_min: None,
2171+
right_max: None,
2172+
expected: Precision::Inexact(10),
2173+
},
2174+
NdvTestCase {
2175+
name: "exact plus absent",
2176+
left_ndv: Precision::Exact(10),
2177+
left_min: None,
2178+
left_max: None,
2179+
right_ndv: Precision::Absent,
2180+
right_min: None,
2181+
right_max: None,
2182+
expected: Precision::Absent,
2183+
},
2184+
NdvTestCase {
2185+
name: "inexact plus absent",
2186+
left_ndv: Precision::Inexact(4),
2187+
left_min: None,
2188+
left_max: None,
2189+
right_ndv: Precision::Absent,
2190+
right_min: None,
2191+
right_max: None,
2192+
expected: Precision::Absent,
2193+
},
2194+
];
2195+
2196+
for case in cases {
2197+
let actual = merge_single_i64_ndv_distinct_count(
2198+
make_single_i64_ndv_stats(case.left_ndv, case.left_min, case.left_max),
2199+
make_single_i64_ndv_stats(case.right_ndv, case.right_min, case.right_max),
2200+
NdvFallback::Sum,
2201+
);
2202+
2203+
assert_eq!(actual, case.expected, "case {} failed", case.name);
2204+
}
2205+
}
2206+
19132207
#[test]
19142208
fn test_with_fetch_basic_preservation() {
19152209
// Test that column statistics and byte size are preserved (as inexact) when applying fetch

0 commit comments

Comments
 (0)