feat(aggregate transform): Add support for event timestamp-based aggregation#24421
feat(aggregate transform): Add support for event timestamp-based aggregation#24421kaarolch wants to merge 19 commits intovectordotdev:masterfrom
Conversation
Resolves conflict in src/transforms/aggregate.rs by integrating upstream's InnerMode refactor (prev_map/multi_map moved into enum variants) with the event-time aggregation feature (TimeSource, event_time_buckets, watermark-based flushing). Made-with: Cursor
|
I've added more test around event based timestamp. |
|
@pront I saw you were active in the related issue Can you look on above PR? |
|
Is there any recommendation for this PR, I know PR has 1,2k new lines but it's hard to split them to smaller changes. |
|
@kaarolch — heads up, while running this PR's build in production we hit a memory leak in Production impact we measured: ~140 MB/hour of growth per pod with I drafted a fix that scopes both the insert and the retain to PR is here: kaarolch#1 — opened against your
|
|
@codex review |
Hey @kaarolch, we have a pretty big backlog. So large PRs take even longer to be review and approved. I kicked off a codex review for now, please fix any issues that may arise there. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8574e57ba1
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Addresses several bugs surfaced during review of the event-time aggregation feature: * Stop populating `event_time_prev_buckets` in non-`Diff` modes. The map is read only by `Diff` and was previously inserted (and not evicted) on every flush in `Auto`/`Sum`/`Latest`/etc., growing memory linearly with (unique series in interval) x (intervals since startup). * Drain remaining event-time buckets when the input stream closes. `flush_event_time_buckets` now accepts a `force` flag and a new `flush_final` entry-point is wired into the input-closed arm so in-flight metrics in still-open windows are emitted on shutdown or topology reload, matching system-time semantics. * Reject events for already-emitted windows. Watermark now records the exclusive end (`bucket_key + interval_ms`) of the highest flushed bucket, and `is_too_late` no longer subtracts `allowed_lateness_ms`. `allowed_lateness_ms` keeps its role of delaying bucket close at flush time; once a window is emitted it stays closed. This prevents late events from re-creating closed buckets and emitting duplicate partial aggregates. * Drop events whose (kind, value) is incompatible with the configured mode (for example an `Incremental` event arriving at a `Mean` aggregator) without materialising a bucket. The previous code path always created an empty `event_time_buckets` entry, which then flushed and advanced the watermark, silently rejecting valid in-order events for earlier buckets. Compatibility is now decided up front by `will_be_stored`, dropped events emit `AggregateEventDropped` (rather than the misleading `AggregateEventRecorded`), and `event_time_multi_buckets` is only touched in `Mean`/`Stdev` mode. Adds tests for edge-case behaviour, event-time `Mean` and `Stdev` happy-path flushing, plus an updated changelog entry describing shipped behaviour. Co-authored-by: Cursor <cursoragent@cursor.com>
|
@syedg1 I saw your PR to my branch was closed? I've check your PR and try to address extra edge cases in the last commit. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0942121d68
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
Rust integer `/` truncates toward zero, so negative millis (e.g. just before the Unix epoch) were aligned to bucket 0 instead of the correct window start. Use `div_euclid` for consistent half-open window alignment. Adds a regression test for timestamp -1 ms at interval 10s. Co-authored-by: Cursor <cursoragent@cursor.com>
| @@ -0,0 +1,16 @@ | |||
| The `aggregate` transform now supports **event-time aggregation** via the new `time_source` configuration option. When set to `EventTime`, metrics are bucketed by their embedded timestamps rather than by processing time. This addresses cases where events with different source timestamps but the same processing time were incorrectly collapsed together (e.g. losing samples in sinks like Datadog that overwrite earlier values for an identical timestamp). | |||
There was a problem hiding this comment.
This changelog is way too verbose for a user. Let's rewrite with the user in mind. Details and non-obvious behavior should be documented in the config docs.
There was a problem hiding this comment.
changelog.md was compacted and I've also updated docs.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f9a6fcf381
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
`flush_event_time_buckets` previously did `bucket_map.clone().into_iter()` unconditionally, allocating a full duplicate HashMap of every series in each flushed bucket -- even though only `Diff` mode reads the original map afterward (to subtract against the next bucket). Split the per-bucket emission on mode: - Diff iterates `&bucket_map` (no HashMap clone, only the per-entry clones that `Metric::from_parts` already requires) and then moves `bucket_map` into `event_time_prev_buckets`. - Other modes consume `bucket_map` via `into_iter()` directly, removing both the HashMap allocation and every per-entry clone. This also folds the trailing prev-bucket insert into the Diff arm, so the `matches!(self.config.mode, AggregationMode::Diff)` check is hoisted out of the hot loop. Co-authored-by: Cursor <cursoragent@cursor.com>
…ucket Existing event-time Diff tests all use a single series per bucket, so they wouldn't catch a regression in the multi-series iteration of `flush_event_time_buckets` (the loop that now borrows `&bucket_map` and then moves the original map into `event_time_prev_buckets`). Add `event_time_diff_multiple_series_same_bucket`: three distinct series across two consecutive buckets, asserting per-series that the first flush emits raw values and the second flush emits per-series deltas. A failure to retain every series in `event_time_prev_buckets` would surface as the second-bucket emissions being raw values instead of deltas. Co-authored-by: Cursor <cursoragent@cursor.com>

Summary
This PR adds event-time aggregation support to the
aggregatetransform, addressing issues where metrics with different source timestamps but the same processing time are incorrectly aggregated together.I made this PR to address some gaps in #23694. Thank you @adiwab for providing initial implementation.
Problem
Currently, the
aggregatetransform uses system processing time to bucket metrics. This causes issues when:Solution
Introduced a new
time_sourceconfiguration option with two modes:SystemTime(default): Existing behavior, maintains backward compatibilityEventTime: Uses metric timestamps for bucketing, with watermark-based out-of-order event rejectionKey Changes
Configuration Options:
time_source: Choose betweenSystemTime(default) orEventTimeaggregationallowed_lateness_ms: Grace period for accepting late-arriving events (default: 0)use_system_time_for_missing_timestamps: Fallback behavior for events without timestamps (default: false, drops events)max_future_ms: Maximum allowed future timestamp offset to reject clock-skewed events (default: 10000ms)Implementation:
interval_msboundariesevent_time_buckets,event_time_prev_buckets,event_time_multi_buckets) for event-time modeAggregateEventDroppedinternal eventVector configuration
How did you test this PR?
Added 6 new unit tests covering event-time aggregation scenarios:
I've used Sonnet 4.5 to create some scripts that push influxdb metrics to vector with multiple values:
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References
Notes
@vectordotdev/vectorto reach out to us regarding this PR.pre-pushhook, please see this template.make fmtmake check-clippy(if there are failures it's possible some of them can be fixed withmake clippy-fix)make testgit merge origin masterandgit push.Cargo.lock), pleaserun
make build-licensesto regenerate the license inventory and commit the changes (if any). More details here.