Skip to content

Commit 7f2f78d

Browse files
authored
feat: fix AVG sliding windows wrong results with NULLs (#22139)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #22138 . ## Rationale for this change `AVG` used as a window aggregate can return `NaN` (and, for `Decimal` / `Duration`, panic on integer division by zero) when every value in the window frame is NULL. ```sql SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) FROM (VALUES (1,1), (2,2), (3,NULL), (4,NULL)) t(i,v); ``` | i | current output | expected (DuckDB/PgSQL) | |---|----------------|-------------------| | 1 | 1.5 | 1.5 | | 2 | 2.0 | 2.0 | | 3 | **NaN** | **NULL** | | 4 | **NaN** | **NULL** | Root cause: sliding-window execution calls `Accumulator::retract_batch` as rows leave the frame. Once every contributing value has been retracted, `self.count` drops back to `0` but `self.sum` stays `Some(0.0)` (or a tiny floating-point residual). `evaluate()` then computes `sum / 0`, which yields `NaN` on `Float64`, and would panic with integer division by zero on `DecimalAvgAccumulator` and `DurationAvgAccumulator`. The non-sliding aggregation path is unaffected because there `sum` becomes `Some(_)` only after at least one non-NULL value has been added, so `count == 0` implies `sum == None`. ## What changes are included in this PR? `datafusion/functions-aggregate/src/average.rs` — guard all three affected `evaluate()` implementations with an explicit `count == 0 → None` short-circuit: - `AvgAccumulator::evaluate` (Float64) - `DecimalAvgAccumulator::evaluate` (Decimal32/64/128/256) - `DurationAvgAccumulator::evaluate` (Duration*) This matches the idiom already used by sibling retractable accumulators (`variance.rs` uses an explicit `match self.count` before division; `sum.rs` uses a `(self.count != 0).then_some(..)` guard).
1 parent ccc67e9 commit 7f2f78d

2 files changed

Lines changed: 83 additions & 15 deletions

File tree

datafusion/functions-aggregate/src/average.rs

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -519,9 +519,16 @@ impl Accumulator for AvgAccumulator {
519519
}
520520

521521
fn evaluate(&mut self) -> Result<ScalarValue> {
522-
Ok(ScalarValue::Float64(
523-
self.sum.map(|f| f / self.count as f64),
524-
))
522+
// In sliding-window mode `retract_batch` can bring `count` back to 0
523+
// while `sum` remains `Some(..)` (possibly zero or a floating-point
524+
// residual). Guard against that so the frame with no non-NULL values
525+
// yields NULL rather than NaN / ±Inf.
526+
let avg = if self.count == 0 {
527+
None
528+
} else {
529+
self.sum.map(|f| f / self.count as f64)
530+
};
531+
Ok(ScalarValue::Float64(avg))
525532
}
526533

527534
fn size(&self) -> usize {
@@ -584,17 +591,23 @@ impl<T: DecimalType + ArrowNumericType + Debug> Accumulator for DecimalAvgAccumu
584591
}
585592

586593
fn evaluate(&mut self) -> Result<ScalarValue> {
587-
let v = self
588-
.sum
589-
.map(|v| {
590-
DecimalAverager::<T>::try_new(
591-
self.sum_scale,
592-
self.target_precision,
593-
self.target_scale,
594-
)?
595-
.avg(v, T::Native::from_usize(self.count as usize).unwrap())
596-
})
597-
.transpose()?;
594+
// `count == 0` can occur in sliding-window mode after `retract_batch`
595+
// removes every contributing value. Return NULL rather than dividing
596+
// by zero (which would panic for integer decimal types).
597+
let v = if self.count == 0 {
598+
None
599+
} else {
600+
self.sum
601+
.map(|v| {
602+
DecimalAverager::<T>::try_new(
603+
self.sum_scale,
604+
self.target_precision,
605+
self.target_scale,
606+
)?
607+
.avg(v, T::Native::from_usize(self.count as usize).unwrap())
608+
})
609+
.transpose()?
610+
};
598611

599612
ScalarValue::new_primitive::<T>(
600613
v,
@@ -670,7 +683,14 @@ impl Accumulator for DurationAvgAccumulator {
670683
}
671684

672685
fn evaluate(&mut self) -> Result<ScalarValue> {
673-
let avg = self.sum.map(|sum| sum / self.count as i64);
686+
// Guard against `count == 0` which can happen in sliding-window mode
687+
// after every contributing value has been retracted. Without this
688+
// check we would integer-divide by zero.
689+
let avg = if self.count == 0 {
690+
None
691+
} else {
692+
self.sum.map(|sum| sum / self.count as i64)
693+
};
674694

675695
match self.result_unit {
676696
TimeUnit::Second => Ok(ScalarValue::DurationSecond(avg)),

datafusion/sqllogictest/test_files/window.slt

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6456,6 +6456,54 @@ FROM (
64566456
2 1
64576457
3 1
64586458

6459+
# AVG over a sliding window must yield NULL when the frame has no non-NULL
6460+
# values — including frames that became empty via `retract_batch`. Covers
6461+
# Float64, Decimal, and the narrow-frame retract-to-empty case.
6462+
query IR
6463+
SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
6464+
FROM (VALUES(1,1),(2,2),(3,CAST(NULL AS INT)),(4,CAST(NULL AS INT))) t(i,v)
6465+
ORDER BY i;
6466+
----
6467+
1 1.5
6468+
2 2
6469+
3 NULL
6470+
4 NULL
6471+
6472+
# All-NULL input — every frame is empty.
6473+
query IR
6474+
SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
6475+
FROM (VALUES(1,CAST(NULL AS INT)),(2,CAST(NULL AS INT))) t(i,v)
6476+
ORDER BY i;
6477+
----
6478+
1 NULL
6479+
2 NULL
6480+
6481+
# Narrow sliding frame that drains to empty each row.
6482+
query IR
6483+
SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING)
6484+
FROM (VALUES(1,CAST(NULL AS INT)),(2,1),(3,CAST(NULL AS INT)),(4,CAST(NULL AS INT))) t(i,v)
6485+
ORDER BY i;
6486+
----
6487+
1 NULL
6488+
2 NULL
6489+
3 1
6490+
4 NULL
6491+
6492+
# Decimal variant — the integer-division path would otherwise panic on an
6493+
# empty frame.
6494+
query IR
6495+
SELECT i, AVG(v) OVER (ORDER BY i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
6496+
FROM (VALUES(1,CAST(1.5 AS DECIMAL(10,2))),
6497+
(2,CAST(2.5 AS DECIMAL(10,2))),
6498+
(3,CAST(NULL AS DECIMAL(10,2))),
6499+
(4,CAST(NULL AS DECIMAL(10,2)))) t(i,v)
6500+
ORDER BY i;
6501+
----
6502+
1 2
6503+
2 2.5
6504+
3 NULL
6505+
4 NULL
6506+
64596507
# Config reset
64606508
statement ok
64616509
reset datafusion.execution.batch_size;

0 commit comments

Comments
 (0)