Skip to content

feat(aggregate transform): Add support for event timestamp-based aggregation#24421

Open
kaarolch wants to merge 19 commits intovectordotdev:masterfrom
kaarolch:events_based_aggr
Open

feat(aggregate transform): Add support for event timestamp-based aggregation#24421
kaarolch wants to merge 19 commits intovectordotdev:masterfrom
kaarolch:events_based_aggr

Conversation

@kaarolch
Copy link
Copy Markdown
Contributor

Summary

This PR adds event-time aggregation support to the aggregate transform, addressing issues where metrics with different source timestamps but the same processing time are incorrectly aggregated together.

I made this PR to address some gaps in #23694. Thank you @adiwab for providing initial implementation.

Problem

Currently, the aggregate transform uses system processing time to bucket metrics. This causes issues when:

  • Multiple events have different source timestamps but arrive at the same processing time, or have the same source timestamp and after processing have different tag series.
  • In the first case they get aggregated into the same bucket and sent downstream with identical timestamps and in the second one they could be shipped in different batch.
  • Systems like Datadog overwrite the earlier values, resulting in data loss

Solution

Introduced a new time_source configuration option with two modes:

  • SystemTime (default): Existing behavior, maintains backward compatibility
  • EventTime: Uses metric timestamps for bucketing, with watermark-based out-of-order event rejection

Key Changes

Configuration Options:

  • time_source: Choose between SystemTime (default) or EventTime aggregation
  • allowed_lateness_ms: Grace period for accepting late-arriving events (default: 0)
  • use_system_time_for_missing_timestamps: Fallback behavior for events without timestamps (default: false, drops events)
  • max_future_ms: Maximum allowed future timestamp offset to reject clock-skewed events (default: 10000ms)

Implementation:

  • Event-time bucketing based on metric timestamps rounded down to interval_ms boundaries
  • Watermark tracking to identify and reject out-of-order events past the grace period
  • Support for all aggregation modes.
  • Separate bucket storage (event_time_buckets, event_time_prev_buckets, event_time_multi_buckets) for event-time mode
  • Dropped event metrics via AggregateEventDropped internal event

Vector configuration

api:
  enabled: true
log_schema:
  level: debug
sources:
  http_metrics:
    type: http_server
    address: "0.0.0.0:8080"
    decoding:
      codec: influxdb
    path: "/api/v1/write"
transforms:
  aggregate_metrics:
    type: aggregate
    inputs:
      - http_metrics
    time_source: EventTime
    interval_ms: 5000
sinks:
  console_debug:
    type: console
    inputs:
      -  aggregate_metrics
    encoding:
      codec: json
    buffer:
       max_events: 10000
       type: memory
       when_full: drop_newest

How did you test this PR?

Added 6 new unit tests covering event-time aggregation scenarios:

  • Basic incremental aggregation within same bucket
  • Out-of-order event rejection with watermark
  • Multiple time buckets
  • Missing timestamp rejection
  • Absolute gauge (latest value selection)
  • Multiple different metrics in same bucket

I've used Sonnet 4.5 to create some scripts that push influxdb metrics to vector with multiple values:

[TEST 1] Basic Aggregation - 3 buckets
============================================================

Bucket 1 (timestamp 07:29:11):
 ✓ test_counter=10.0 @ 07:29:11
 ✓ test_counter=20.0 @ 07:29:12
 ✓ test_counter=30.0 @ 07:29:13
 Expected aggregate: 60.0

Bucket 2 (timestamp 07:29:21):
 ✓ test_counter=10.0 @ 07:29:21
 ✓ test_counter=20.0 @ 07:29:22
 ✓ test_counter=30.0 @ 07:29:23
 Expected aggregate: 60.0

Bucket 3 (timestamp 07:29:31):
 ✓ test_counter=10.0 @ 07:29:31
 ✓ test_counter=20.0 @ 07:29:32
 ✓ test_counter=30.0 @ 07:29:33
 Expected aggregate: 60.0

[TEST 2] Out-of-Order Rejection
============================================================

Bucket 1 (timestamp 07:29:31):
 ✓ test_ooo=10.0 @ 07:29:31
 ✓ test_ooo=20.0 @ 07:29:32
 Expected aggregate: 30.0

Bucket 2 (timestamp 07:29:41):
 ✓ test_ooo=15.0 @ 07:29:41
 ✓ test_ooo=25.0 @ 07:29:42
 Expected aggregate: 40.0

Out-of-order event (timestamp 07:29:33):
 Sending to already-flushed bucket - should be DROPPED
 ✓ test_ooo=999.0 @ 07:29:33

 Check Vector output - 999.0 should NOT appear!

[TEST 3] Multiple Buckets (10s span = 2 buckets)
============================================================

Sending events across 10 seconds:
 ✓ test_multi=100.0 @ 07:29:46
 ✓ test_multi=200.0 @ 07:29:48
 ✓ test_multi=300.0 @ 07:29:51
 ✓ test_multi=400.0 @ 07:29:53

 Expected:
   Bucket 1 (0-5s): 300.0
   Bucket 2 (5-10s): 700.0

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Notes

  • Please read our Vector contributor resources.
  • Do not hesitate to use @vectordotdev/vector to reach out to us regarding this PR.
  • Some CI checks run only after we manually approve them.
    • We recommend adding a pre-push hook, please see this template.
    • Alternatively, we recommend running the following locally before pushing to the remote branch:
      • make fmt
      • make check-clippy (if there are failures it's possible some of them can be fixed with make clippy-fix)
      • make test
  • After a review is requested, please avoid force pushes to help us review incrementally.
    • Feel free to push as many commits as you want. They will be squashed into one before merging.
    • For example, you can run git merge origin master and git push.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run make build-licenses to regenerate the license inventory and commit the changes (if any). More details here.

@kaarolch kaarolch requested review from a team as code owners December 30, 2025 09:07
@github-actions github-actions Bot added domain: transforms Anything related to Vector's transform components domain: external docs Anything related to Vector's external, public documentation labels Dec 30, 2025
kaarolch and others added 5 commits December 30, 2025 19:59
Resolves conflict in src/transforms/aggregate.rs by integrating
upstream's InnerMode refactor (prev_map/multi_map moved into enum
variants) with the event-time aggregation feature (TimeSource,
event_time_buckets, watermark-based flushing).

Made-with: Cursor
@kaarolch
Copy link
Copy Markdown
Contributor Author

I've added more test around event based timestamp.

@kaarolch
Copy link
Copy Markdown
Contributor Author

@pront I saw you were active in the related issue Can you look on above PR?

@kaarolch
Copy link
Copy Markdown
Contributor Author

Is there any recommendation for this PR, I know PR has 1,2k new lines but it's hard to split them to smaller changes.

@syedg1
Copy link
Copy Markdown

syedg1 commented Apr 30, 2026

@kaarolch — heads up, while running this PR's build in production we hit a memory leak in event_time_prev_buckets. It's read in only one place (the flush loop, gated on AggregationMode::Diff), but on flush it's populated unconditionally, and the eviction (retain) is also Diff-only. So every non-Diff aggregator (including the default Auto mode for counters/gauges) moves a HashMap<MetricSeries, MetricEntry> into event_time_prev_buckets per flush interval and never removes it.

Production impact we measured: ~140 MB/hour of growth per pod with mode: Auto, time_source: EventTime, interval_ms: 10000 at a few hundred counters/sec — ~2.3 GB per pod over 16 hours, on a trajectory toward OOM.

I drafted a fix that scopes both the insert and the retain to Diff mode, plus two regression tests (one verifying event_time_prev_buckets stays empty in non-Diff modes after 50 flushes, one verifying Diff mode still retains the small rolling window it needs). All 31 tests in transforms::aggregate pass.

PR is here: kaarolch#1 — opened against your events_based_aggr branch so it can land inside this PR before merge upstream. Happy to iterate on it if you'd prefer a different shape; just wanted to get it in front of you given the production signal.

Screenshot 2026-04-30 at 3 08 04 PM

@pront
Copy link
Copy Markdown
Member

pront commented May 1, 2026

@codex review

@pront
Copy link
Copy Markdown
Member

pront commented May 1, 2026

Is there any recommendation for this PR, I know PR has 1,2k new lines but it's hard to split them to smaller changes.

Hey @kaarolch, we have a pretty big backlog. So large PRs take even longer to be review and approved. I kicked off a codex review for now, please fix any issues that may arise there.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 8574e57ba1

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/transforms/aggregate.rs Outdated
Comment thread src/transforms/aggregate.rs Outdated
Comment thread src/transforms/aggregate.rs Outdated
Addresses several bugs surfaced during review of the event-time
aggregation feature:

* Stop populating `event_time_prev_buckets` in non-`Diff` modes. The map
  is read only by `Diff` and was previously inserted (and not evicted)
  on every flush in `Auto`/`Sum`/`Latest`/etc., growing memory linearly
  with (unique series in interval) x (intervals since startup).

* Drain remaining event-time buckets when the input stream closes.
  `flush_event_time_buckets` now accepts a `force` flag and a new
  `flush_final` entry-point is wired into the input-closed arm so
  in-flight metrics in still-open windows are emitted on shutdown or
  topology reload, matching system-time semantics.

* Reject events for already-emitted windows. Watermark now records the
  exclusive end (`bucket_key + interval_ms`) of the highest flushed
  bucket, and `is_too_late` no longer subtracts `allowed_lateness_ms`.
  `allowed_lateness_ms` keeps its role of delaying bucket close at
  flush time; once a window is emitted it stays closed. This prevents
  late events from re-creating closed buckets and emitting duplicate
  partial aggregates.

* Drop events whose (kind, value) is incompatible with the configured
  mode (for example an `Incremental` event arriving at a `Mean`
  aggregator) without materialising a bucket. The previous code path
  always created an empty `event_time_buckets` entry, which then
  flushed and advanced the watermark, silently rejecting valid
  in-order events for earlier buckets. Compatibility is now decided
  up front by `will_be_stored`, dropped events emit
  `AggregateEventDropped` (rather than the misleading
  `AggregateEventRecorded`), and `event_time_multi_buckets` is only
  touched in `Mean`/`Stdev` mode.

Adds tests for edge-case behaviour, event-time `Mean` and `Stdev`
happy-path flushing, plus an updated changelog entry describing shipped
behaviour.

Co-authored-by: Cursor <cursoragent@cursor.com>
@kaarolch
Copy link
Copy Markdown
Contributor Author

kaarolch commented May 4, 2026

@syedg1 I saw your PR to my branch was closed? I've check your PR and try to address extra edge cases in the last commit.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0942121d68

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/transforms/aggregate.rs Outdated
Rust integer `/` truncates toward zero, so negative millis (e.g. just
before the Unix epoch) were aligned to bucket 0 instead of the correct
window start. Use `div_euclid` for consistent half-open window alignment.

Adds a regression test for timestamp -1 ms at interval 10s.

Co-authored-by: Cursor <cursoragent@cursor.com>
@pront pront added the meta: awaiting author Pull requests that are awaiting their author. label May 4, 2026
@@ -0,0 +1,16 @@
The `aggregate` transform now supports **event-time aggregation** via the new `time_source` configuration option. When set to `EventTime`, metrics are bucketed by their embedded timestamps rather than by processing time. This addresses cases where events with different source timestamps but the same processing time were incorrectly collapsed together (e.g. losing samples in sinks like Datadog that overwrite earlier values for an identical timestamp).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changelog is way too verbose for a user. Let's rewrite with the user in mind. Details and non-obvious behavior should be documented in the config docs.

Copy link
Copy Markdown
Contributor Author

@kaarolch kaarolch May 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changelog.md was compacted and I've also updated docs.

@github-actions github-actions Bot removed the meta: awaiting author Pull requests that are awaiting their author. label May 4, 2026
@github-actions github-actions Bot added the docs review on hold The documentation team reviews PRs only after a PR is approved by the COSE team. label May 4, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f9a6fcf381

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread src/transforms/aggregate.rs Outdated
kaarolch and others added 2 commits May 4, 2026 22:48
`flush_event_time_buckets` previously did `bucket_map.clone().into_iter()`
unconditionally, allocating a full duplicate HashMap of every series in
each flushed bucket -- even though only `Diff` mode reads the original
map afterward (to subtract against the next bucket).

Split the per-bucket emission on mode:
- Diff iterates `&bucket_map` (no HashMap clone, only the per-entry
  clones that `Metric::from_parts` already requires) and then moves
  `bucket_map` into `event_time_prev_buckets`.
- Other modes consume `bucket_map` via `into_iter()` directly, removing
  both the HashMap allocation and every per-entry clone.

This also folds the trailing prev-bucket insert into the Diff arm, so
the `matches!(self.config.mode, AggregationMode::Diff)` check is hoisted
out of the hot loop.

Co-authored-by: Cursor <cursoragent@cursor.com>
…ucket

Existing event-time Diff tests all use a single series per bucket, so
they wouldn't catch a regression in the multi-series iteration of
`flush_event_time_buckets` (the loop that now borrows `&bucket_map`
and then moves the original map into `event_time_prev_buckets`).

Add `event_time_diff_multiple_series_same_bucket`: three distinct
series across two consecutive buckets, asserting per-series that the
first flush emits raw values and the second flush emits per-series
deltas. A failure to retain every series in `event_time_prev_buckets`
would surface as the second-bucket emissions being raw values instead
of deltas.

Co-authored-by: Cursor <cursoragent@cursor.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs review on hold The documentation team reviews PRs only after a PR is approved by the COSE team. domain: external docs Anything related to Vector's external, public documentation domain: transforms Anything related to Vector's transform components work in progress

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants