Commit 0942121
fix(aggregate transform): address event-time edge cases
Addresses several bugs surfaced during review of the event-time
aggregation feature:
* Stop populating `event_time_prev_buckets` in non-`Diff` modes. The map
is read only by `Diff` and was previously inserted (and not evicted)
on every flush in `Auto`/`Sum`/`Latest`/etc., growing memory linearly
with (unique series in interval) x (intervals since startup).
* Drain remaining event-time buckets when the input stream closes.
`flush_event_time_buckets` now accepts a `force` flag and a new
`flush_final` entry-point is wired into the input-closed arm so
in-flight metrics in still-open windows are emitted on shutdown or
topology reload, matching system-time semantics.
* Reject events for already-emitted windows. Watermark now records the
exclusive end (`bucket_key + interval_ms`) of the highest flushed
bucket, and `is_too_late` no longer subtracts `allowed_lateness_ms`.
`allowed_lateness_ms` keeps its role of delaying bucket close at
flush time; once a window is emitted it stays closed. This prevents
late events from re-creating closed buckets and emitting duplicate
partial aggregates.
* Drop events whose (kind, value) is incompatible with the configured
mode (for example an `Incremental` event arriving at a `Mean`
aggregator) without materialising a bucket. The previous code path
always created an empty `event_time_buckets` entry, which then
flushed and advanced the watermark, silently rejecting valid
in-order events for earlier buckets. Compatibility is now decided
up front by `will_be_stored`, dropped events emit
`AggregateEventDropped` (rather than the misleading
`AggregateEventRecorded`), and `event_time_multi_buckets` is only
touched in `Mean`/`Stdev` mode.
Adds tests for edge-case behaviour, event-time `Mean` and `Stdev`
happy-path flushing, plus an updated changelog entry describing shipped
behaviour.
Co-authored-by: Cursor <cursoragent@cursor.com>1 parent 8574e57 commit 0942121
2 files changed
Lines changed: 545 additions & 102 deletions
Lines changed: 11 additions & 6 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
1 | | - | |
| 1 | + | |
2 | 2 | | |
3 | | - | |
| 3 | + | |
4 | 4 | | |
5 | | - | |
6 | | - | |
7 | | - | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
8 | 8 | | |
9 | | - | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
10 | 15 | | |
11 | 16 | | |
0 commit comments