Skip to content

Commit 6e0dde0

Browse files
authored
fix(stats): widen sum_value integer arithmetic to SUM-compatible types (#20865)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #20826. ## Rationale for this change As discussed in the review thread on #20768 and tracked by #20826, `sum_value` should not keep narrow integer column types during stats aggregation, because merge/multiply paths can overflow before values are widened. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? This PR updates statistics `sum_value` arithmetic to match SUM-style widening for small integer types, and applies that behavior consistently across merge and multiplication paths. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent e62533b commit 6e0dde0

5 files changed

Lines changed: 304 additions & 39 deletions

File tree

datafusion/common/src/stats.rs

Lines changed: 123 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,24 @@ impl Precision<usize> {
203203
}
204204

205205
impl Precision<ScalarValue> {
206+
fn sum_data_type(data_type: &DataType) -> DataType {
207+
match data_type {
208+
DataType::Int8 | DataType::Int16 | DataType::Int32 => DataType::Int64,
209+
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 => DataType::UInt64,
210+
_ => data_type.clone(),
211+
}
212+
}
213+
214+
fn cast_scalar_to_sum_type(value: &ScalarValue) -> Result<ScalarValue> {
215+
let source_type = value.data_type();
216+
let target_type = Self::sum_data_type(&source_type);
217+
if source_type == target_type {
218+
Ok(value.clone())
219+
} else {
220+
value.cast_to(&target_type)
221+
}
222+
}
223+
206224
/// Calculates the sum of two (possibly inexact) [`ScalarValue`] values,
207225
/// conservatively propagating exactness information. If one of the input
208226
/// values is [`Precision::Absent`], the result is `Absent` too.
@@ -228,6 +246,31 @@ impl Precision<ScalarValue> {
228246
}
229247
}
230248

249+
/// Casts integer values to the wider SQL `SUM` return type.
250+
///
251+
/// This narrows overflow risk when `sum_value` statistics are merged:
252+
/// `Int8/Int16/Int32 -> Int64` and `UInt8/UInt16/UInt32 -> UInt64`.
253+
pub fn cast_to_sum_type(&self) -> Precision<ScalarValue> {
254+
match (self.is_exact(), self.get_value()) {
255+
(Some(true), Some(value)) => Self::cast_scalar_to_sum_type(value)
256+
.map(Precision::Exact)
257+
.unwrap_or(Precision::Absent),
258+
(Some(false), Some(value)) => Self::cast_scalar_to_sum_type(value)
259+
.map(Precision::Inexact)
260+
.unwrap_or(Precision::Absent),
261+
(_, _) => Precision::Absent,
262+
}
263+
}
264+
265+
/// SUM-style addition with integer widening to match SQL `SUM` return
266+
/// types for smaller integral inputs.
267+
pub fn add_for_sum(&self, other: &Precision<ScalarValue>) -> Precision<ScalarValue> {
268+
let mut lhs = self.cast_to_sum_type();
269+
let rhs = other.cast_to_sum_type();
270+
precision_add(&mut lhs, &rhs);
271+
lhs
272+
}
273+
231274
/// Calculates the difference of two (possibly inexact) [`ScalarValue`] values,
232275
/// conservatively propagating exactness information. If one of the input
233276
/// values is [`Precision::Absent`], the result is `Absent` too.
@@ -620,7 +663,7 @@ impl Statistics {
620663
/// assert_eq!(merged.column_statistics[0].max_value,
621664
/// Precision::Exact(ScalarValue::from(200)));
622665
/// assert_eq!(merged.column_statistics[0].sum_value,
623-
/// Precision::Exact(ScalarValue::from(1500)));
666+
/// Precision::Exact(ScalarValue::Int64(Some(1500))));
624667
/// ```
625668
pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result<Statistics>
626669
where
@@ -664,7 +707,7 @@ impl Statistics {
664707
null_count: cs.null_count,
665708
max_value: cs.max_value.clone(),
666709
min_value: cs.min_value.clone(),
667-
sum_value: cs.sum_value.clone(),
710+
sum_value: cs.sum_value.cast_to_sum_type(),
668711
distinct_count: cs.distinct_count,
669712
byte_size: cs.byte_size,
670713
})
@@ -693,7 +736,8 @@ impl Statistics {
693736
};
694737
col_stats.min_value = col_stats.min_value.min(&item_cs.min_value);
695738
col_stats.max_value = col_stats.max_value.max(&item_cs.max_value);
696-
precision_add(&mut col_stats.sum_value, &item_cs.sum_value);
739+
let item_sum_value = item_cs.sum_value.cast_to_sum_type();
740+
precision_add(&mut col_stats.sum_value, &item_sum_value);
697741
col_stats.byte_size = col_stats.byte_size.add(&item_cs.byte_size);
698742
}
699743
}
@@ -877,7 +921,15 @@ pub struct ColumnStatistics {
877921
pub max_value: Precision<ScalarValue>,
878922
/// Minimum value of column
879923
pub min_value: Precision<ScalarValue>,
880-
/// Sum value of a column
924+
/// Sum value of a column.
925+
///
926+
/// For integral columns, values should be kept in SUM-compatible widened
927+
/// types (`Int8/Int16/Int32 -> Int64`, `UInt8/UInt16/UInt32 -> UInt64`) to
928+
/// reduce overflow risk during statistics propagation.
929+
///
930+
/// Callers should prefer [`ColumnStatistics::with_sum_value`] for setting
931+
/// this field and [`Precision<ScalarValue>::add_for_sum`] /
932+
/// [`Precision<ScalarValue>::cast_to_sum_type`] for sum arithmetic.
881933
pub sum_value: Precision<ScalarValue>,
882934
/// Number of distinct values
883935
pub distinct_count: Precision<usize>,
@@ -942,7 +994,19 @@ impl ColumnStatistics {
942994

943995
/// Set the sum value
944996
pub fn with_sum_value(mut self, sum_value: Precision<ScalarValue>) -> Self {
945-
self.sum_value = sum_value;
997+
self.sum_value = match sum_value {
998+
Precision::Exact(value) => {
999+
Precision::<ScalarValue>::cast_scalar_to_sum_type(&value)
1000+
.map(Precision::Exact)
1001+
.unwrap_or(Precision::Absent)
1002+
}
1003+
Precision::Inexact(value) => {
1004+
Precision::<ScalarValue>::cast_scalar_to_sum_type(&value)
1005+
.map(Precision::Inexact)
1006+
.unwrap_or(Precision::Absent)
1007+
}
1008+
Precision::Absent => Precision::Absent,
1009+
};
9461010
self
9471011
}
9481012

@@ -1095,6 +1159,45 @@ mod tests {
10951159
assert_eq!(precision.add(&Precision::Absent), Precision::Absent);
10961160
}
10971161

1162+
#[test]
1163+
fn test_add_for_sum_scalar_integer_widening() {
1164+
let precision = Precision::Exact(ScalarValue::Int32(Some(42)));
1165+
1166+
assert_eq!(
1167+
precision.add_for_sum(&Precision::Exact(ScalarValue::Int32(Some(23)))),
1168+
Precision::Exact(ScalarValue::Int64(Some(65))),
1169+
);
1170+
assert_eq!(
1171+
precision.add_for_sum(&Precision::Inexact(ScalarValue::Int32(Some(23)))),
1172+
Precision::Inexact(ScalarValue::Int64(Some(65))),
1173+
);
1174+
}
1175+
1176+
#[test]
1177+
fn test_add_for_sum_prevents_int32_overflow() {
1178+
let lhs = Precision::Exact(ScalarValue::Int32(Some(i32::MAX)));
1179+
let rhs = Precision::Exact(ScalarValue::Int32(Some(1)));
1180+
1181+
assert_eq!(
1182+
lhs.add_for_sum(&rhs),
1183+
Precision::Exact(ScalarValue::Int64(Some(i64::from(i32::MAX) + 1))),
1184+
);
1185+
}
1186+
1187+
#[test]
1188+
fn test_add_for_sum_scalar_unsigned_integer_widening() {
1189+
let precision = Precision::Exact(ScalarValue::UInt32(Some(42)));
1190+
1191+
assert_eq!(
1192+
precision.add_for_sum(&Precision::Exact(ScalarValue::UInt32(Some(23)))),
1193+
Precision::Exact(ScalarValue::UInt64(Some(65))),
1194+
);
1195+
assert_eq!(
1196+
precision.add_for_sum(&Precision::Inexact(ScalarValue::UInt32(Some(23)))),
1197+
Precision::Inexact(ScalarValue::UInt64(Some(65))),
1198+
);
1199+
}
1200+
10981201
#[test]
10991202
fn test_sub() {
11001203
let precision1 = Precision::Exact(42);
@@ -1340,7 +1443,7 @@ mod tests {
13401443
);
13411444
assert_eq!(
13421445
col1_stats.sum_value,
1343-
Precision::Exact(ScalarValue::Int32(Some(1100)))
1446+
Precision::Exact(ScalarValue::Int64(Some(1100)))
13441447
); // 500 + 600
13451448

13461449
let col2_stats = &summary_stats.column_statistics[1];
@@ -1355,7 +1458,7 @@ mod tests {
13551458
);
13561459
assert_eq!(
13571460
col2_stats.sum_value,
1358-
Precision::Exact(ScalarValue::Int32(Some(2200)))
1461+
Precision::Exact(ScalarValue::Int64(Some(2200)))
13591462
); // 1000 + 1200
13601463
}
13611464

@@ -1997,6 +2100,16 @@ mod tests {
19972100
assert_eq!(col_stats.byte_size, Precision::Exact(8192));
19982101
}
19992102

2103+
#[test]
2104+
fn test_with_sum_value_builder_widens_small_integers() {
2105+
let col_stats = ColumnStatistics::new_unknown()
2106+
.with_sum_value(Precision::Exact(ScalarValue::UInt32(Some(123))));
2107+
assert_eq!(
2108+
col_stats.sum_value,
2109+
Precision::Exact(ScalarValue::UInt64(Some(123)))
2110+
);
2111+
}
2112+
20002113
#[test]
20012114
fn test_with_fetch_scales_byte_size() {
20022115
// Test that byte_size is scaled by the row ratio in with_fetch
@@ -2144,7 +2257,7 @@ mod tests {
21442257
);
21452258
assert_eq!(
21462259
col1_stats.sum_value,
2147-
Precision::Exact(ScalarValue::Int32(Some(1100)))
2260+
Precision::Exact(ScalarValue::Int64(Some(1100)))
21482261
);
21492262

21502263
let col2_stats = &summary_stats.column_statistics[1];
@@ -2159,7 +2272,7 @@ mod tests {
21592272
);
21602273
assert_eq!(
21612274
col2_stats.sum_value,
2162-
Precision::Exact(ScalarValue::Int32(Some(2200)))
2275+
Precision::Exact(ScalarValue::Int64(Some(2200)))
21632276
);
21642277
}
21652278

@@ -2508,7 +2621,7 @@ mod tests {
25082621
);
25092622
assert_eq!(
25102623
col_stats.sum_value,
2511-
Precision::Inexact(ScalarValue::Int32(Some(1500)))
2624+
Precision::Inexact(ScalarValue::Int64(Some(1500)))
25122625
);
25132626
}
25142627
}

datafusion/datasource/src/statistics.rs

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ fn sort_columns_from_physical_sort_exprs(
293293
since = "47.0.0",
294294
note = "Please use `get_files_with_limit` and `compute_all_files_statistics` instead"
295295
)]
296-
#[expect(unused)]
296+
#[cfg_attr(not(test), expect(unused))]
297297
pub async fn get_statistics_with_limit(
298298
all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
299299
file_schema: SchemaRef,
@@ -329,7 +329,7 @@ pub async fn get_statistics_with_limit(
329329
col_stats_set[index].null_count = file_column.null_count;
330330
col_stats_set[index].max_value = file_column.max_value;
331331
col_stats_set[index].min_value = file_column.min_value;
332-
col_stats_set[index].sum_value = file_column.sum_value;
332+
col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type();
333333
}
334334

335335
// If the number of rows exceeds the limit, we can stop processing
@@ -374,7 +374,7 @@ pub async fn get_statistics_with_limit(
374374
col_stats.null_count = col_stats.null_count.add(file_nc);
375375
col_stats.max_value = col_stats.max_value.max(file_max);
376376
col_stats.min_value = col_stats.min_value.min(file_min);
377-
col_stats.sum_value = col_stats.sum_value.add(file_sum);
377+
col_stats.sum_value = col_stats.sum_value.add_for_sum(file_sum);
378378
col_stats.byte_size = col_stats.byte_size.add(file_sbs);
379379
}
380380

@@ -497,3 +497,78 @@ pub fn add_row_stats(
497497
) -> Precision<usize> {
498498
file_num_rows.add(&num_rows)
499499
}
500+
501+
#[cfg(test)]
502+
mod tests {
503+
use super::*;
504+
use crate::PartitionedFile;
505+
use arrow::datatypes::{DataType, Field, Schema};
506+
use futures::stream;
507+
508+
fn file_stats(sum: u32) -> Statistics {
509+
Statistics {
510+
num_rows: Precision::Exact(1),
511+
total_byte_size: Precision::Exact(4),
512+
column_statistics: vec![ColumnStatistics {
513+
null_count: Precision::Exact(0),
514+
max_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
515+
min_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
516+
sum_value: Precision::Exact(ScalarValue::UInt32(Some(sum))),
517+
distinct_count: Precision::Exact(1),
518+
byte_size: Precision::Exact(4),
519+
}],
520+
}
521+
}
522+
523+
#[tokio::test]
524+
#[expect(deprecated)]
525+
async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type()
526+
-> Result<()> {
527+
let schema =
528+
Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32, true)]));
529+
530+
let files = stream::iter(vec![Ok((
531+
PartitionedFile::new("f1.parquet", 1),
532+
Arc::new(file_stats(100)),
533+
))]);
534+
535+
let (_group, stats) =
536+
get_statistics_with_limit(files, schema, None, false).await?;
537+
538+
assert_eq!(
539+
stats.column_statistics[0].sum_value,
540+
Precision::Exact(ScalarValue::UInt64(Some(100)))
541+
);
542+
543+
Ok(())
544+
}
545+
546+
#[tokio::test]
547+
#[expect(deprecated)]
548+
async fn test_get_statistics_with_limit_merges_sum_with_unsigned_widening()
549+
-> Result<()> {
550+
let schema =
551+
Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt32, true)]));
552+
553+
let files = stream::iter(vec![
554+
Ok((
555+
PartitionedFile::new("f1.parquet", 1),
556+
Arc::new(file_stats(100)),
557+
)),
558+
Ok((
559+
PartitionedFile::new("f2.parquet", 1),
560+
Arc::new(file_stats(200)),
561+
)),
562+
]);
563+
564+
let (_group, stats) =
565+
get_statistics_with_limit(files, schema, None, true).await?;
566+
567+
assert_eq!(
568+
stats.column_statistics[0].sum_value,
569+
Precision::Exact(ScalarValue::UInt64(Some(300)))
570+
);
571+
572+
Ok(())
573+
}
574+
}

datafusion/physical-expr/src/projection.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -693,12 +693,15 @@ impl ProjectionExprs {
693693
Precision::Absent
694694
};
695695

696-
let sum_value = Precision::<ScalarValue>::from(stats.num_rows)
697-
.cast_to(&value.data_type())
698-
.ok()
699-
.map(|row_count| {
700-
Precision::Exact(value.clone()).multiply(&row_count)
696+
let widened_sum = Precision::Exact(value.clone()).cast_to_sum_type();
697+
let sum_value = widened_sum
698+
.get_value()
699+
.and_then(|sum| {
700+
Precision::<ScalarValue>::from(stats.num_rows)
701+
.cast_to(&sum.data_type())
702+
.ok()
701703
})
704+
.map(|row_count| widened_sum.multiply(&row_count))
702705
.unwrap_or(Precision::Absent);
703706

704707
ColumnStatistics {
@@ -2864,6 +2867,35 @@ pub(crate) mod tests {
28642867
Ok(())
28652868
}
28662869

2870+
#[test]
2871+
fn test_project_statistics_with_i32_literal_sum_widens_to_i64() -> Result<()> {
2872+
let input_stats = get_stats();
2873+
let input_schema = get_schema();
2874+
2875+
let projection = ProjectionExprs::new(vec![
2876+
ProjectionExpr {
2877+
expr: Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
2878+
alias: "constant".to_string(),
2879+
},
2880+
ProjectionExpr {
2881+
expr: Arc::new(Column::new("col0", 0)),
2882+
alias: "num".to_string(),
2883+
},
2884+
]);
2885+
2886+
let output_stats = projection.project_statistics(
2887+
input_stats,
2888+
&projection.project_schema(&input_schema)?,
2889+
)?;
2890+
2891+
assert_eq!(
2892+
output_stats.column_statistics[0].sum_value,
2893+
Precision::Exact(ScalarValue::Int64(Some(50)))
2894+
);
2895+
2896+
Ok(())
2897+
}
2898+
28672899
// Test statistics calculation for NULL literal (constant NULL column)
28682900
#[test]
28692901
fn test_project_statistics_with_null_literal() -> Result<()> {

0 commit comments

Comments
 (0)