Skip to content

bug: flush aggregator cache on shutdown#733

Closed
m0ar wants to merge 3 commits into
ceramicnetwork:mainfrom
m0ar:m0ar/fix-aggregator-state-loss
Closed

bug: flush aggregator cache on shutdown#733
m0ar wants to merge 3 commits into
ceramicnetwork:mainfrom
m0ar:m0ar/fix-aggregator-state-loss

Conversation

@m0ar

@m0ar m0ar commented Aug 15, 2025

Copy link
Copy Markdown
Collaborator

Note: WIP

Closes #728

@m0ar m0ar force-pushed the m0ar/fix-aggregator-state-loss branch from 5d6fc8f to b2d2133 Compare August 19, 2025 15:53
@m0ar m0ar force-pushed the m0ar/fix-aggregator-state-loss branch from b2d2133 to 6e92ff6 Compare August 22, 2025 14:47
@m0ar m0ar changed the title Fix aggregator state loss bug: flush aggregator cache on shutdown Aug 22, 2025

@dav1do dav1do left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. @smrz2001 said he can take a look at the actions.

}

async fn flush_cache(&self) -> Result<usize> {
let cnt = self.count_cache().await?;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we double count in the base case (check count, if over our threshold, flush where we count again for logging purposes).. it's probably not a bit deal, but pointing it out

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Myeah, I agree but it seems very fast and will never be much above 10k events anyway. The ease of debugging might come in handy, so that weighs heavier in this case imo.

Comment thread actor/src/lib.rs
/// Called when the actor is shutting down to allow for cleanup operations.
/// This method is called after all pending messages have been processed
/// but before the actor loop exits.
async fn shutdown(&mut self);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I thought about why we used have a cache table at all (to make sure the parquet files aren't tiny), I realized this could lead to a similar result where we write smaller batches of events. For now, I think that's fine. Ideally, we're able to sort out making the aggregator idempotent on resume.

@m0ar m0ar Aug 27, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shutdown write will always be smaller than the 10k cache limit of course, but a full batch write yields fairly tiny parquet files anyway, between 25-62 events long. The shutdown flush files will be (arbitrarily?) short in comparison, but I think that's okay if it means the aggregator is reliable after shutdown.

I plotted histograms of the file length distributions after an automatic 10k flush, and the same with an additional shutdown flush of about 4k events:
image

@m0ar

m0ar commented Aug 27, 2025

Copy link
Copy Markdown
Collaborator Author

Closing in favour of #735 (non-fork)

@m0ar m0ar closed this Aug 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

BUG: sporadic pipeline data loss

2 participants