Skip to content

Commit c128511

Browse files
committed
feat: fix AVG sliding windows edge case
1 parent ccc67e9 commit c128511

2 files changed

Lines changed: 83 additions & 15 deletions

File tree

datafusion/functions-aggregate/src/average.rs

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -519,9 +519,16 @@ impl Accumulator for AvgAccumulator {
519519
}
520520

521521
fn evaluate(&mut self) -> Result<ScalarValue> {
522-
Ok(ScalarValue::Float64(
523-
self.sum.map(|f| f / self.count as f64),
524-
))
522+
// In sliding-window mode `retract_batch` can bring `count` back to 0
523+
// while `sum` remains `Some(..)` (possibly zero or a floating-point
524+
// residual). Guard against that so the frame with no non-NULL values
525+
// yields NULL rather than NaN / ±Inf.
526+
let avg = if self.count == 0 {
527+
None
528+
} else {
529+
self.sum.map(|f| f / self.count as f64)
530+
};
531+
Ok(ScalarValue::Float64(avg))
525532
}
526533

527534
fn size(&self) -> usize {
@@ -584,17 +591,23 @@ impl<T: DecimalType + ArrowNumericType + Debug> Accumulator for DecimalAvgAccumu
584591
}
585592

586593
fn evaluate(&mut self) -> Result<ScalarValue> {
587-
let v = self
588-
.sum
589-
.map(|v| {
590-
DecimalAverager::<T>::try_new(
591-
self.sum_scale,
592-
self.target_precision,
593-
self.target_scale,
594-
)?
595-
.avg(v, T::Native::from_usize(self.count as usize).unwrap())
596-
})
597-
.transpose()?;
594+
// `count == 0` can occur in sliding-window mode after `retract_batch`
595+
// removes every contributing value. Return NULL rather than dividing
596+
// by zero (which would panic for integer decimal types).
597+
let v = if self.count == 0 {
598+
None
599+
} else {
600+
self.sum
601+
.map(|v| {
602+
DecimalAverager::<T>::try_new(
603+
self.sum_scale,
604+
self.target_precision,
605+
self.target_scale,
606+
)?
607+
.avg(v, T::Native::from_usize(self.count as usize).unwrap())
608+
})
609+
.transpose()?
610+
};
598611

599612
ScalarValue::new_primitive::<T>(
600613
v,
@@ -670,7 +683,14 @@ impl Accumulator for DurationAvgAccumulator {
670683
}
671684

672685
fn evaluate(&mut self) -> Result<ScalarValue> {
673-
let avg = self.sum.map(|sum| sum / self.count as i64);
686+
// Guard against `count == 0` which can happen in sliding-window mode
687+
// after every contributing value has been retracted. Without this
688+
// check we would integer-divide by zero.
689+
let avg = if self.count == 0 {
690+
None
691+
} else {
692+
self.sum.map(|sum| sum / self.count as i64)
693+
};
674694

675695
match self.result_unit {
676696
TimeUnit::Second => Ok(ScalarValue::DurationSecond(avg)),

datafusion/sqllogictest/test_files/window.slt

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6456,6 +6456,54 @@ FROM (
64566456
2 1
64576457
3 1
64586458

6459+
# AVG over a sliding window must yield NULL when the frame has no non-NULL
6460+
# values — including frames that became empty via `retract_batch`. Covers
6461+
# Float64, Decimal, and the narrow-frame retract-to-empty case.
6462+
query IR
6463+
SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
6464+
FROM (VALUES(1,1),(2,2),(3,CAST(NULL AS INT)),(4,CAST(NULL AS INT))) t(i,v)
6465+
ORDER BY i;
6466+
----
6467+
1 1.5
6468+
2 2
6469+
3 NULL
6470+
4 NULL
6471+
6472+
# All-NULL input — every frame is empty.
6473+
query IR
6474+
SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
6475+
FROM (VALUES(1,CAST(NULL AS INT)),(2,CAST(NULL AS INT))) t(i,v)
6476+
ORDER BY i;
6477+
----
6478+
1 NULL
6479+
2 NULL
6480+
6481+
# Narrow sliding frame that drains to empty each row.
6482+
query IR
6483+
SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING)
6484+
FROM (VALUES(1,CAST(NULL AS INT)),(2,1),(3,CAST(NULL AS INT)),(4,CAST(NULL AS INT))) t(i,v)
6485+
ORDER BY i;
6486+
----
6487+
1 NULL
6488+
2 NULL
6489+
3 1
6490+
4 NULL
6491+
6492+
# Decimal variant — the integer-division path would otherwise panic on an
6493+
# empty frame.
6494+
query IR
6495+
SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
6496+
FROM (VALUES(1,CAST(1.5 AS DECIMAL(10,2))),
6497+
(2,CAST(2.5 AS DECIMAL(10,2))),
6498+
(3,CAST(NULL AS DECIMAL(10,2))),
6499+
(4,CAST(NULL AS DECIMAL(10,2)))) t(i,v)
6500+
ORDER BY i;
6501+
----
6502+
1 2
6503+
2 2.5
6504+
3 NULL
6505+
4 NULL
6506+
64596507
# Config reset
64606508
statement ok
64616509
reset datafusion.execution.batch_size;

0 commit comments

Comments
 (0)