Skip to content

Commit 706e8be

Browse files
committed
feat: fix AVG sliding windows edge case
1 parent 44e585c commit 706e8be

2 files changed

Lines changed: 83 additions & 15 deletions

File tree

datafusion/functions-aggregate/src/average.rs

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -519,9 +519,16 @@ impl Accumulator for AvgAccumulator {
519519
}
520520

521521
fn evaluate(&mut self) -> Result<ScalarValue> {
522-
Ok(ScalarValue::Float64(
523-
self.sum.map(|f| f / self.count as f64),
524-
))
522+
// In sliding-window mode `retract_batch` can bring `count` back to 0
523+
// while `sum` remains `Some(..)` (possibly zero or a floating-point
524+
// residual). Guard against that so the frame with no non-NULL values
525+
// yields NULL rather than NaN / ±Inf.
526+
let avg = if self.count == 0 {
527+
None
528+
} else {
529+
self.sum.map(|f| f / self.count as f64)
530+
};
531+
Ok(ScalarValue::Float64(avg))
525532
}
526533

527534
fn size(&self) -> usize {
@@ -584,17 +591,23 @@ impl<T: DecimalType + ArrowNumericType + Debug> Accumulator for DecimalAvgAccumu
584591
}
585592

586593
fn evaluate(&mut self) -> Result<ScalarValue> {
587-
let v = self
588-
.sum
589-
.map(|v| {
590-
DecimalAverager::<T>::try_new(
591-
self.sum_scale,
592-
self.target_precision,
593-
self.target_scale,
594-
)?
595-
.avg(v, T::Native::from_usize(self.count as usize).unwrap())
596-
})
597-
.transpose()?;
594+
// `count == 0` can occur in sliding-window mode after `retract_batch`
595+
// removes every contributing value. Return NULL rather than dividing
596+
// by zero (which would panic for integer decimal types).
597+
let v = if self.count == 0 {
598+
None
599+
} else {
600+
self.sum
601+
.map(|v| {
602+
DecimalAverager::<T>::try_new(
603+
self.sum_scale,
604+
self.target_precision,
605+
self.target_scale,
606+
)?
607+
.avg(v, T::Native::from_usize(self.count as usize).unwrap())
608+
})
609+
.transpose()?
610+
};
598611

599612
ScalarValue::new_primitive::<T>(
600613
v,
@@ -670,7 +683,14 @@ impl Accumulator for DurationAvgAccumulator {
670683
}
671684

672685
fn evaluate(&mut self) -> Result<ScalarValue> {
673-
let avg = self.sum.map(|sum| sum / self.count as i64);
686+
// Guard against `count == 0` which can happen in sliding-window mode
687+
// after every contributing value has been retracted. Without this
688+
// check we would integer-divide by zero.
689+
let avg = if self.count == 0 {
690+
None
691+
} else {
692+
self.sum.map(|sum| sum / self.count as i64)
693+
};
674694

675695
match self.result_unit {
676696
TimeUnit::Second => Ok(ScalarValue::DurationSecond(avg)),

datafusion/sqllogictest/test_files/window.slt

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6236,6 +6236,54 @@ INNER JOIN issue_20194_t2 t2
62366236
----
62376237
6774502793 10040029 1
62386238

6239+
# AVG over a sliding window must yield NULL when the frame has no non-NULL
6240+
# values — including frames that became empty via `retract_batch`. Covers
6241+
# Float64, Decimal, and the narrow-frame retract-to-empty case.
6242+
query IR
6243+
SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
6244+
FROM (VALUES(1,1),(2,2),(3,CAST(NULL AS INT)),(4,CAST(NULL AS INT))) t(i,v)
6245+
ORDER BY i;
6246+
----
6247+
1 1.5
6248+
2 2
6249+
3 NULL
6250+
4 NULL
6251+
6252+
# All-NULL input — every frame is empty.
6253+
query IR
6254+
SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
6255+
FROM (VALUES(1,CAST(NULL AS INT)),(2,CAST(NULL AS INT))) t(i,v)
6256+
ORDER BY i;
6257+
----
6258+
1 NULL
6259+
2 NULL
6260+
6261+
# Narrow sliding frame that drains to empty each row.
6262+
query IR
6263+
SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING)
6264+
FROM (VALUES(1,CAST(NULL AS INT)),(2,1),(3,CAST(NULL AS INT)),(4,CAST(NULL AS INT))) t(i,v)
6265+
ORDER BY i;
6266+
----
6267+
1 NULL
6268+
2 NULL
6269+
3 1
6270+
4 NULL
6271+
6272+
# Decimal variant — the integer-division path would otherwise panic on an
6273+
# empty frame.
6274+
query IR
6275+
SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
6276+
FROM (VALUES(1,CAST(1.5 AS DECIMAL(10,2))),
6277+
(2,CAST(2.5 AS DECIMAL(10,2))),
6278+
(3,CAST(NULL AS DECIMAL(10,2))),
6279+
(4,CAST(NULL AS DECIMAL(10,2)))) t(i,v)
6280+
ORDER BY i;
6281+
----
6282+
1 2
6283+
2 2.5
6284+
3 NULL
6285+
4 NULL
6286+
62396287
# Config reset
62406288
statement ok
62416289
reset datafusion.execution.batch_size;

0 commit comments

Comments
 (0)