Skip to content

Commit e4f4dba

Browse files
committed
Merge min-max stats in-place
Signed-off-by: Adam Gutglick <adamgsal@gmail.com>
1 parent b09205a commit e4f4dba

File tree

1 file changed

+290
-43
lines changed

1 file changed

+290
-43
lines changed

datafusion/common/src/stats.rs

Lines changed: 290 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -669,58 +669,41 @@ impl Statistics {
669669
where
670670
I: IntoIterator<Item = &'a Statistics>,
671671
{
672-
let items: Vec<&Statistics> = items.into_iter().collect();
673-
674-
if items.is_empty() {
672+
let mut items = items.into_iter();
673+
let Some(first) = items.next() else {
675674
return Ok(Statistics::new_unknown(schema));
676-
}
677-
if items.len() == 1 {
678-
return Ok(items[0].clone());
675+
};
676+
let Some(second) = items.next() else {
677+
return Ok(first.clone());
678+
};
679+
680+
let num_cols = first.column_statistics.len();
681+
let mut num_rows = first.num_rows;
682+
let mut total_byte_size = first.total_byte_size;
683+
let mut column_statistics = first.column_statistics.clone();
684+
for col_stats in &mut column_statistics {
685+
cast_sum_value_to_sum_type_in_place(&mut col_stats.sum_value);
679686
}
680687

681-
let num_cols = items[0].column_statistics.len();
682-
// Validate all items have the same number of columns
683-
for (i, stat) in items.iter().enumerate().skip(1) {
688+
// Merge the remaining items in a single pass.
689+
for (i, stat) in std::iter::once(second).chain(items).enumerate() {
684690
if stat.column_statistics.len() != num_cols {
685691
return _plan_err!(
686692
"Cannot merge statistics with different number of columns: {} vs {} (item {})",
687693
num_cols,
688694
stat.column_statistics.len(),
689-
i
695+
i + 1
690696
);
691697
}
692-
}
693-
694-
// Aggregate usize fields (cheap arithmetic)
695-
let mut num_rows = Precision::Exact(0usize);
696-
let mut total_byte_size = Precision::Exact(0usize);
697-
for stat in &items {
698698
num_rows = num_rows.add(&stat.num_rows);
699699
total_byte_size = total_byte_size.add(&stat.total_byte_size);
700-
}
701-
702-
let first = items[0];
703-
let mut column_statistics: Vec<ColumnStatistics> = first
704-
.column_statistics
705-
.iter()
706-
.map(|cs| ColumnStatistics {
707-
null_count: cs.null_count,
708-
max_value: cs.max_value.clone(),
709-
min_value: cs.min_value.clone(),
710-
sum_value: cs.sum_value.cast_to_sum_type(),
711-
distinct_count: cs.distinct_count,
712-
byte_size: cs.byte_size,
713-
})
714-
.collect();
715-
716-
// Accumulate all statistics in a single pass.
717-
// Uses precision_add for sum (reuses the lhs accumulator for
718-
// direct numeric addition), while preserving the NDV update
719-
// ordering required by estimate_ndv_with_overlap.
720-
for stat in items.iter().skip(1) {
721-
for (col_idx, col_stats) in column_statistics.iter_mut().enumerate() {
722-
let item_cs = &stat.column_statistics[col_idx];
723700

701+
// Uses precision_add for sum (reuses the lhs accumulator for
702+
// direct numeric addition), while preserving the NDV update
703+
// ordering required by estimate_ndv_with_overlap.
704+
for (col_stats, item_cs) in
705+
column_statistics.iter_mut().zip(&stat.column_statistics)
706+
{
724707
col_stats.null_count = col_stats.null_count.add(&item_cs.null_count);
725708

726709
// NDV must be computed before min/max update (needs pre-merge ranges)
@@ -734,10 +717,12 @@ impl Statistics {
734717
),
735718
_ => Precision::Absent,
736719
};
737-
col_stats.min_value = col_stats.min_value.min(&item_cs.min_value);
738-
col_stats.max_value = col_stats.max_value.max(&item_cs.max_value);
739-
let item_sum_value = item_cs.sum_value.cast_to_sum_type();
740-
precision_add(&mut col_stats.sum_value, &item_sum_value);
720+
precision_min(&mut col_stats.min_value, &item_cs.min_value);
721+
precision_max(&mut col_stats.max_value, &item_cs.max_value);
722+
precision_add_for_sum_in_place(
723+
&mut col_stats.sum_value,
724+
&item_cs.sum_value,
725+
);
741726
col_stats.byte_size = col_stats.byte_size.add(&item_cs.byte_size);
742727
}
743728
}
@@ -840,6 +825,126 @@ pub fn estimate_ndv_with_overlap(
840825
Some((intersection + only_left + only_right).round() as usize)
841826
}
842827

828+
/// Returns the minimum precision while not allocating a new value,
829+
/// mirrors the semantics of `PartialOrd`.
830+
#[inline]
831+
fn precision_min<T>(lhs: &mut Precision<T>, rhs: &Precision<T>)
832+
where
833+
T: Debug + Clone + PartialEq + Eq + PartialOrd,
834+
{
835+
*lhs = match (std::mem::take(lhs), rhs) {
836+
(Precision::Exact(left), Precision::Exact(right)) => {
837+
if left <= *right {
838+
Precision::Exact(left)
839+
} else {
840+
Precision::Exact(right.clone())
841+
}
842+
}
843+
(Precision::Exact(left), Precision::Inexact(right))
844+
| (Precision::Inexact(left), Precision::Exact(right))
845+
| (Precision::Inexact(left), Precision::Inexact(right)) => {
846+
if left <= *right {
847+
Precision::Inexact(left)
848+
} else {
849+
Precision::Inexact(right.clone())
850+
}
851+
}
852+
(_, _) => Precision::Absent,
853+
};
854+
}
855+
856+
/// Returns the maximum precision while not allocating a new value,
857+
/// mirrors the semantics of `PartialOrd`.
858+
#[inline]
859+
fn precision_max<T>(lhs: &mut Precision<T>, rhs: &Precision<T>)
860+
where
861+
T: Debug + Clone + PartialEq + Eq + PartialOrd,
862+
{
863+
*lhs = match (std::mem::take(lhs), rhs) {
864+
(Precision::Exact(left), Precision::Exact(right)) => {
865+
if left >= *right {
866+
Precision::Exact(left)
867+
} else {
868+
Precision::Exact(right.clone())
869+
}
870+
}
871+
(Precision::Exact(left), Precision::Inexact(right))
872+
| (Precision::Inexact(left), Precision::Exact(right))
873+
| (Precision::Inexact(left), Precision::Inexact(right)) => {
874+
if left >= *right {
875+
Precision::Inexact(left)
876+
} else {
877+
Precision::Inexact(right.clone())
878+
}
879+
}
880+
(_, _) => Precision::Absent,
881+
};
882+
}
883+
884+
#[inline]
885+
fn cast_sum_value_to_sum_type_in_place(value: &mut Precision<ScalarValue>) {
886+
let (is_exact, inner) = match std::mem::take(value) {
887+
Precision::Exact(v) => (true, v),
888+
Precision::Inexact(v) => (false, v),
889+
Precision::Absent => return,
890+
};
891+
let source_type = inner.data_type();
892+
let target_type = Precision::<ScalarValue>::sum_data_type(&source_type);
893+
894+
let wrap_precision_fn: fn(ScalarValue) -> Precision<ScalarValue> = if is_exact {
895+
Precision::Exact
896+
} else {
897+
Precision::Inexact
898+
};
899+
900+
*value = if source_type == target_type {
901+
wrap_precision_fn(inner)
902+
} else {
903+
inner
904+
.cast_to(&target_type)
905+
.map(wrap_precision_fn)
906+
.unwrap_or(Precision::Absent)
907+
};
908+
}
909+
910+
#[inline]
911+
fn precision_add_for_sum_in_place(
912+
lhs: &mut Precision<ScalarValue>,
913+
rhs: &Precision<ScalarValue>,
914+
) {
915+
match rhs {
916+
Precision::Exact(value) => {
917+
let source_type = value.data_type();
918+
let target_type = Precision::<ScalarValue>::sum_data_type(&source_type);
919+
if source_type == target_type {
920+
precision_add(lhs, rhs);
921+
} else {
922+
let rhs = value
923+
.cast_to(&target_type)
924+
.map(Precision::Exact)
925+
.unwrap_or(Precision::Absent);
926+
precision_add(lhs, &rhs);
927+
}
928+
}
929+
Precision::Inexact(value) => {
930+
let source_type = value.data_type();
931+
let target_type = Precision::<ScalarValue>::sum_data_type(&source_type);
932+
if source_type == target_type {
933+
precision_add(lhs, rhs);
934+
} else {
935+
let rhs = value
936+
.cast_to(&target_type)
937+
.map(Precision::Inexact)
938+
.unwrap_or(Precision::Absent);
939+
precision_add(lhs, &rhs);
940+
}
941+
}
942+
Precision::Absent => {
943+
*lhs = Precision::Absent;
944+
}
945+
}
946+
}
947+
843948
/// Creates an estimate of the number of rows in the output using the given
844949
/// optional value and exactness flag.
845950
fn check_num_rows(value: Option<usize>, is_exact: bool) -> Precision<usize> {
@@ -2624,4 +2729,146 @@ mod tests {
26242729
Precision::Inexact(ScalarValue::Int64(Some(1500)))
26252730
);
26262731
}
2732+
2733+
#[test]
2734+
fn test_precision_min_in_place() {
2735+
// Exact vs Exact: keeps the smaller
2736+
let mut lhs = Precision::Exact(10);
2737+
precision_min(&mut lhs, &Precision::Exact(20));
2738+
assert_eq!(lhs, Precision::Exact(10));
2739+
2740+
let mut lhs = Precision::Exact(20);
2741+
precision_min(&mut lhs, &Precision::Exact(10));
2742+
assert_eq!(lhs, Precision::Exact(10));
2743+
2744+
// Equal exact values
2745+
let mut lhs = Precision::Exact(5);
2746+
precision_min(&mut lhs, &Precision::Exact(5));
2747+
assert_eq!(lhs, Precision::Exact(5));
2748+
2749+
// Mixed exact/inexact: result is Inexact with smaller value
2750+
let mut lhs = Precision::Exact(10);
2751+
precision_min(&mut lhs, &Precision::Inexact(20));
2752+
assert_eq!(lhs, Precision::Inexact(10));
2753+
2754+
let mut lhs = Precision::Inexact(10);
2755+
precision_min(&mut lhs, &Precision::Exact(5));
2756+
assert_eq!(lhs, Precision::Inexact(5));
2757+
2758+
// Inexact vs Inexact
2759+
let mut lhs = Precision::Inexact(30);
2760+
precision_min(&mut lhs, &Precision::Inexact(20));
2761+
assert_eq!(lhs, Precision::Inexact(20));
2762+
2763+
// Absent makes result Absent
2764+
let mut lhs = Precision::Exact(10);
2765+
precision_min(&mut lhs, &Precision::Absent);
2766+
assert_eq!(lhs, Precision::Absent);
2767+
2768+
let mut lhs = Precision::<i32>::Absent;
2769+
precision_min(&mut lhs, &Precision::Exact(10));
2770+
assert_eq!(lhs, Precision::Absent);
2771+
}
2772+
2773+
#[test]
2774+
fn test_precision_max_in_place() {
2775+
// Exact vs Exact: keeps the larger
2776+
let mut lhs = Precision::Exact(10);
2777+
precision_max(&mut lhs, &Precision::Exact(20));
2778+
assert_eq!(lhs, Precision::Exact(20));
2779+
2780+
let mut lhs = Precision::Exact(20);
2781+
precision_max(&mut lhs, &Precision::Exact(10));
2782+
assert_eq!(lhs, Precision::Exact(20));
2783+
2784+
// Equal exact values
2785+
let mut lhs = Precision::Exact(5);
2786+
precision_max(&mut lhs, &Precision::Exact(5));
2787+
assert_eq!(lhs, Precision::Exact(5));
2788+
2789+
// Mixed exact/inexact: result is Inexact with larger value
2790+
let mut lhs = Precision::Exact(10);
2791+
precision_max(&mut lhs, &Precision::Inexact(20));
2792+
assert_eq!(lhs, Precision::Inexact(20));
2793+
2794+
let mut lhs = Precision::Inexact(10);
2795+
precision_max(&mut lhs, &Precision::Exact(5));
2796+
assert_eq!(lhs, Precision::Inexact(10));
2797+
2798+
// Inexact vs Inexact
2799+
let mut lhs = Precision::Inexact(20);
2800+
precision_max(&mut lhs, &Precision::Inexact(30));
2801+
assert_eq!(lhs, Precision::Inexact(30));
2802+
2803+
// Absent makes result Absent
2804+
let mut lhs = Precision::Exact(10);
2805+
precision_max(&mut lhs, &Precision::Absent);
2806+
assert_eq!(lhs, Precision::Absent);
2807+
2808+
let mut lhs = Precision::<i32>::Absent;
2809+
precision_max(&mut lhs, &Precision::Exact(10));
2810+
assert_eq!(lhs, Precision::Absent);
2811+
}
2812+
2813+
#[test]
2814+
fn test_cast_sum_value_to_sum_type_in_place_widens_int32() {
2815+
let mut value = Precision::Exact(ScalarValue::Int32(Some(42)));
2816+
cast_sum_value_to_sum_type_in_place(&mut value);
2817+
assert_eq!(value, Precision::Exact(ScalarValue::Int64(Some(42))));
2818+
}
2819+
2820+
#[test]
2821+
fn test_cast_sum_value_to_sum_type_in_place_preserves_int64() {
2822+
// Int64 is already the sum type for Int64, no widening needed
2823+
let mut value = Precision::Exact(ScalarValue::Int64(Some(100)));
2824+
cast_sum_value_to_sum_type_in_place(&mut value);
2825+
assert_eq!(value, Precision::Exact(ScalarValue::Int64(Some(100))));
2826+
}
2827+
2828+
#[test]
2829+
fn test_cast_sum_value_to_sum_type_in_place_inexact() {
2830+
let mut value = Precision::Inexact(ScalarValue::Int32(Some(42)));
2831+
cast_sum_value_to_sum_type_in_place(&mut value);
2832+
assert_eq!(value, Precision::Inexact(ScalarValue::Int64(Some(42))));
2833+
}
2834+
2835+
#[test]
2836+
fn test_cast_sum_value_to_sum_type_in_place_absent() {
2837+
let mut value = Precision::<ScalarValue>::Absent;
2838+
cast_sum_value_to_sum_type_in_place(&mut value);
2839+
assert_eq!(value, Precision::Absent);
2840+
}
2841+
2842+
#[test]
2843+
fn test_precision_add_for_sum_in_place_same_type() {
2844+
// Int64 + Int64: no widening needed, straight add
2845+
let mut lhs = Precision::Exact(ScalarValue::Int64(Some(10)));
2846+
let rhs = Precision::Exact(ScalarValue::Int64(Some(20)));
2847+
precision_add_for_sum_in_place(&mut lhs, &rhs);
2848+
assert_eq!(lhs, Precision::Exact(ScalarValue::Int64(Some(30))));
2849+
}
2850+
2851+
#[test]
2852+
fn test_precision_add_for_sum_in_place_widens_rhs() {
2853+
// lhs is already Int64 (widened), rhs is Int32 -> gets cast to Int64
2854+
let mut lhs = Precision::Exact(ScalarValue::Int64(Some(10)));
2855+
let rhs = Precision::Exact(ScalarValue::Int32(Some(5)));
2856+
precision_add_for_sum_in_place(&mut lhs, &rhs);
2857+
assert_eq!(lhs, Precision::Exact(ScalarValue::Int64(Some(15))));
2858+
}
2859+
2860+
#[test]
2861+
fn test_precision_add_for_sum_in_place_inexact() {
2862+
let mut lhs = Precision::Inexact(ScalarValue::Int64(Some(10)));
2863+
let rhs = Precision::Inexact(ScalarValue::Int32(Some(5)));
2864+
precision_add_for_sum_in_place(&mut lhs, &rhs);
2865+
assert_eq!(lhs, Precision::Inexact(ScalarValue::Int64(Some(15))));
2866+
}
2867+
2868+
#[test]
2869+
fn test_precision_add_for_sum_in_place_absent_rhs() {
2870+
let mut lhs = Precision::Exact(ScalarValue::Int64(Some(10)));
2871+
precision_add_for_sum_in_place(&mut lhs, &Precision::Absent);
2872+
assert_eq!(lhs, Precision::Absent);
2873+
}
26272874
}

0 commit comments

Comments
 (0)