Skip to content
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 280 additions & 2 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ fn col_stats_union(
mut left: ColumnStatistics,
right: &ColumnStatistics,
) -> ColumnStatistics {
left.distinct_count = Precision::Absent;
left.distinct_count = union_distinct_count(&left, right);
left.min_value = left.min_value.min(&right.min_value);
left.max_value = left.max_value.max(&right.max_value);
left.sum_value = left.sum_value.add(&right.sum_value);
Expand All @@ -863,6 +863,92 @@ fn col_stats_union(
left
}

fn union_distinct_count(
left: &ColumnStatistics,
right: &ColumnStatistics,
) -> Precision<usize> {
let (ndv_left, ndv_right) = match (
left.distinct_count.get_value(),
right.distinct_count.get_value(),
) {
(Some(&l), Some(&r)) => (l, r),
_ => return Precision::Absent,
};

// Even with exact inputs, the union NDV depends on how
// many distinct values are shared between the left and right.
// We can only estimate this via range overlap. Thus both paths
// below return `Inexact`.
if let Some(ndv) = estimate_ndv_with_overlap(left, right, ndv_left, ndv_right) {
return Precision::Inexact(ndv);
}

Precision::Inexact(ndv_left + ndv_right)
}

/// Estimates the distinct count for a union using range overlap, following
/// the approach used by Trino:
///
/// overlap_a = fraction of A's range that overlaps with B
/// overlap_b = fraction of B's range that overlaps with A
/// NDV = max(overlap_a * NDV_a, overlap_b * NDV_b) \[intersection\]
/// + (1 - overlap_a) * NDV_a [only in A]
/// + (1 - overlap_b) * NDV_b [only in B]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some comments about why the formula is reasonable?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would my high-level description in #19957 (comment) be what you'd expect as a comment?

In case, @buraksenn feel free to use it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's good

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @asolimando , with that comment and also some llm input I've add comments hope it is ok now

fn estimate_ndv_with_overlap(
left: &ColumnStatistics,
right: &ColumnStatistics,
ndv_left: usize,
ndv_right: usize,
) -> Option<usize> {
let min_left = left.min_value.get_value()?;
let max_left = left.max_value.get_value()?;
let min_right = right.min_value.get_value()?;
let max_right = right.max_value.get_value()?;

let range_left = max_left.distance(min_left)?;
let range_right = max_right.distance(min_right)?;

// Constant columns (range == 0) can't use the proportional overlap
// formula below, so check interval overlap directly instead.
if range_left == 0 || range_right == 0 {
let overlaps = min_left <= max_right && min_right <= max_left;
return Some(if overlaps {
usize::max(ndv_left, ndv_right)
} else {
ndv_left + ndv_right
});
}

let overlap_min = if min_left >= min_right {
min_left
} else {
min_right
};
let overlap_max = if max_left <= max_right {
max_left
} else {
max_right
};

if overlap_min > overlap_max {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the formula naturally degrades to sum when there's no overlap (overlap_range = 0 gives overlap_left = overlap_right = 0, so the result is ndv_left + ndv_right), so this is a short-circuit optimization, we might want to add that as a comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I've returned actually but as you've said comment makes sense here. Thanks for heads up I've added a comment

return Some(ndv_left + ndv_right);
}

let overlap_range = overlap_max.distance(overlap_min)? as f64;

let overlap_left = overlap_range / range_left as f64;
let overlap_right = overlap_range / range_right as f64;

let intersection = f64::max(
overlap_left * ndv_left as f64,
overlap_right * ndv_right as f64,
);
let only_left = (1.0 - overlap_left) * ndv_left as f64;
let only_right = (1.0 - overlap_right) * ndv_right as f64;

Some((intersection + only_left + only_right).round() as usize)
}

fn stats_union(mut left: Statistics, right: Statistics) -> Statistics {
let Statistics {
num_rows: right_num_rows,
Expand Down Expand Up @@ -890,6 +976,7 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use datafusion_common::stats::Precision;
use datafusion_physical_expr::equivalence::convert_to_orderings;
use datafusion_physical_expr::expressions::col;

Expand Down Expand Up @@ -1014,7 +1101,7 @@ mod tests {
total_byte_size: Precision::Exact(52),
column_statistics: vec![
ColumnStatistics {
distinct_count: Precision::Absent,
distinct_count: Precision::Inexact(6),
max_value: Precision::Exact(ScalarValue::Int64(Some(34))),
min_value: Precision::Exact(ScalarValue::Int64(Some(-4))),
sum_value: Precision::Exact(ScalarValue::Int64(Some(84))),
Expand Down Expand Up @@ -1043,6 +1130,197 @@ mod tests {
assert_eq!(result, expected);
}

#[test]
fn test_union_distinct_count() {
// (left_ndv, left_min, left_max, right_ndv, right_min, right_max, expected)
type NdvTestCase = (
Precision<usize>,
Option<i64>,
Option<i64>,
Precision<usize>,
Option<i64>,
Option<i64>,
Precision<usize>,
);
let cases: Vec<NdvTestCase> = vec![
// disjoint ranges: NDV = 5 + 3
(
Precision::Exact(5),
Some(0),
Some(10),
Precision::Exact(3),
Some(20),
Some(30),
Precision::Inexact(8),
),
// identical ranges: intersection = max(10, 8) = 10
(
Precision::Exact(10),
Some(0),
Some(100),
Precision::Exact(8),
Some(0),
Some(100),
Precision::Inexact(10),
),
// partial overlap: 50 + 50 + 25 = 125
(
Precision::Exact(100),
Some(0),
Some(100),
Precision::Exact(50),
Some(50),
Some(150),
Precision::Inexact(125),
),
// right contained in left: 50 + 50 + 0 = 100
(
Precision::Exact(100),
Some(0),
Some(100),
Precision::Exact(50),
Some(25),
Some(75),
Precision::Inexact(100),
),
// both constant, same value
(
Precision::Exact(1),
Some(5),
Some(5),
Precision::Exact(1),
Some(5),
Some(5),
Precision::Inexact(1),
),
// both constant, different values
(
Precision::Exact(1),
Some(5),
Some(5),
Precision::Exact(1),
Some(10),
Some(10),
Precision::Inexact(2),
),
// left constant within right range
(
Precision::Exact(1),
Some(5),
Some(5),
Precision::Exact(10),
Some(0),
Some(10),
Precision::Inexact(10),
),
// left constant outside right range
(
Precision::Exact(1),
Some(20),
Some(20),
Precision::Exact(10),
Some(0),
Some(10),
Precision::Inexact(11),
),
// right constant within left range
(
Precision::Exact(10),
Some(0),
Some(10),
Precision::Exact(1),
Some(5),
Some(5),
Precision::Inexact(10),
),
// right constant outside left range
(
Precision::Exact(10),
Some(0),
Some(10),
Precision::Exact(1),
Some(20),
Some(20),
Precision::Inexact(11),
),
// missing min/max falls back to sum (exact + exact)
(
Precision::Exact(10),
None,
None,
Precision::Exact(5),
None,
None,
Precision::Inexact(15),
),
// missing min/max falls back to sum (exact + inexact)
(
Precision::Exact(10),
None,
None,
Precision::Inexact(5),
None,
None,
Precision::Inexact(15),
),
// missing min/max falls back to sum (inexact + inexact)
(
Precision::Inexact(7),
None,
None,
Precision::Inexact(3),
None,
None,
Precision::Inexact(10),
),
// one side absent
(
Precision::Exact(10),
None,
None,
Precision::Absent,
None,
None,
Precision::Absent,
),
// one side absent (inexact + absent)
(
Precision::Inexact(4),
None,
None,
Precision::Absent,
None,
None,
Precision::Absent,
),
];

for (
i,
(left_ndv, left_min, left_max, right_ndv, right_min, right_max, expected),
) in cases.into_iter().enumerate()
{
let to_sv = |v| Precision::Exact(ScalarValue::Int64(Some(v)));
let left = ColumnStatistics {
distinct_count: left_ndv,
min_value: left_min.map(to_sv).unwrap_or(Precision::Absent),
max_value: left_max.map(to_sv).unwrap_or(Precision::Absent),
..Default::default()
};
let right = ColumnStatistics {
distinct_count: right_ndv,
min_value: right_min.map(to_sv).unwrap_or(Precision::Absent),
max_value: right_max.map(to_sv).unwrap_or(Precision::Absent),
..Default::default()
};
assert_eq!(
union_distinct_count(&left, &right),
expected,
"case {i} failed"
);
}
}

#[tokio::test]
async fn test_union_equivalence_properties() -> Result<()> {
let schema = create_test_schema()?;
Expand Down
Loading