Skip to content

fix: aggregator restart missing events#737

Merged
smrz2001 merged 8 commits into
mainfrom
fix/aggregator-restart
Oct 1, 2025
Merged

fix: aggregator restart missing events#737
smrz2001 merged 8 commits into
mainfrom
fix/aggregator-restart

Conversation

@dav1do

@dav1do dav1do commented Aug 30, 2025

Copy link
Copy Markdown
Collaborator

Fix issue where the aggregator could restart and miss events, causing validation errors on valid streams.

This includes two changes:

  • First, we make the aggregator process_conclusion_events_batch and join_event_states idempotent and exclude duplicate incoming events by preferring the on disk events when they are duplicated in the batch. This means we could go backward/see events again and not cause errors.
  • Second, fix the issue where the memory cache seemed to be "going backwards", in that we'd flush events to disk and then we'd have events with a lower conclusion_event_order in memory, so on restart we'd skip them, as we started from our previous on disk max. As we'd have missing events (usually init), this would cause aggregator errors and streams would be full of validation errors. This was due to the fact that we write events from the batch to disk in phases (Models, MIDs). Although our batch was from [X, Y], the two internal batches could have interleaving order, so if the first batch caused a cache flush, we'd leave the remaining events in memory. Now we only flush the cache once per conclusion events batch, so they are all written to disk together. This means our cache table can grow even more over it's allotted size, but it's short lived and worth it.

With this, we shouldn't need the shutdown task to flush but I don't see any harm in keeping it so it's still there.

dav1do added 2 commits August 30, 2025 08:36
restart from the beginning of conclusion events to avoid skipping on restarts. need to figure out why memory table contains conclusion_event_order that is less than the data written to event_states still, and hopefully restart where we left off. for now, we just ignore things we've seen before.
@dav1do dav1do temporarily deployed to tnet-prod-2024 August 30, 2025 17:13 — with GitHub Actions Inactive
@dav1do dav1do marked this pull request as ready for review August 31, 2025 16:02
@dav1do dav1do requested a review from a team as a code owner August 31, 2025 16:02
@dav1do dav1do requested review from m0ar, smrz2001 and stephhuynh18 and removed request for a team August 31, 2025 16:02
@dav1do dav1do temporarily deployed to tnet-prod-2024 August 31, 2025 16:15 — with GitHub Actions Inactive
… whole

we split the batch into models/mids and were writing both pieces to disk separately. The entire batch is orded by conclusion_event_order, but each group could overlap. So we could flush the cache for the first write and then we'd have a cache that was "going backward" in conclusion_event_order. On restart, those events were missed and events failed to aggregate correctly
@dav1do dav1do force-pushed the fix/aggregator-restart branch from 519f887 to 492c7b3 Compare September 1, 2025 16:26

@m0ar m0ar left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good, just some clarification requests and nitpicks ✨

let models = self.validate_models(models).context("validating models")?;
let mut models = self
.store_event_states(models)
.store_event_states(models, false)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on why we prevent flush here, but not for the mids below?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is the crux of the fix of what I tried to explain in the second bullet point (verbosely and imprecisely). basically, we put the entire batch of conclusion events into memory (models here and MIDs below) and only flush once so that we can't end up with the in memory order being behind the on disk order.

.with_column_renamed("event_height", "previous_height")?;

Ok(conclusion_events
let conclusion_events = conclusion_events

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we lose a pattern-matching error check here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I just shadowed the name while doing some more filtering (to be idempotent if we received the conclusion event a second time).. instead of returning Ok(select_things().await?) we now assign and then return Ok(conclusion_events) at the end.

Comment on lines +836 to +843
if allow_cache_flush {
self.flush_cache().await?;
let count = self.count_cache().await?;
tracing::debug!(%count, will_flush = %count >= self.max_cached_rows, "counts for mem table");
// If we have enough data cached in memory write it out to persistent store
if count >= self.max_cached_rows {
self.flush_cache().await?;
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to document why this check is important. I don't really understand it outside of testing use cases, in particular the block on l301 🤔

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I think this comment is reasonable but open to improvements: If we have enough data cached in memory write it out to persistent store

I'm trying to figure out how to clarify.. we used to flush the cache every time we got here if we had more rows in the mem table than our max_cached_rows value. Now, I added a allow_cache_flush flag because we call this twice while processing a single batch of conclusion events, and we don't want to flush the cache in the middle of the batch and now do it only once at the end.

If we only processed the conclusion events in order this wouldn't be necessary, but the conclusion_event order is the arrival on the node, which only deals with stream ordering (hence the guarantee that any conclusion_event is after all events in its stream but not necessarily its model, as we accept and persist events without the model present).

Comment thread pipeline/src/aggregator/mod.rs Outdated
Comment on lines +3814 to +3815
// splice the 3 events together making sure each vec isn't reordered but not all in a row
let events = &events;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this comment refers to the right thing

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, yep, I modified this behavior... removed and moved the model into the middle to be more likely to replicate the real cause

@dav1do dav1do added this pull request to the merge queue Sep 30, 2025
@github-merge-queue github-merge-queue Bot removed this pull request from the merge queue due to failed status checks Sep 30, 2025
@m0ar m0ar added this pull request to the merge queue Sep 30, 2025
@github-merge-queue github-merge-queue Bot removed this pull request from the merge queue due to failed status checks Sep 30, 2025
@m0ar m0ar added this pull request to the merge queue Sep 30, 2025
@github-merge-queue github-merge-queue Bot removed this pull request from the merge queue due to failed status checks Sep 30, 2025
@smrz2001 smrz2001 added this pull request to the merge queue Oct 1, 2025
Merged via the queue into main with commit 6c11a03 Oct 1, 2025
20 checks passed
@smrz2001 smrz2001 deleted the fix/aggregator-restart branch October 1, 2025 02:04
@smrz2001 smrz2001 mentioned this pull request Oct 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants