bug: flush aggregator cache on shutdown#733
Conversation
5d6fc8f to
b2d2133
Compare
b2d2133 to
6e92ff6
Compare
| } | ||
|
|
||
| async fn flush_cache(&self) -> Result<usize> { | ||
| let cnt = self.count_cache().await?; |
There was a problem hiding this comment.
we double count in the base case (check count, if over our threshold, flush where we count again for logging purposes).. it's probably not a bit deal, but pointing it out
There was a problem hiding this comment.
Myeah, I agree but it seems very fast and will never be much above 10k events anyway. The ease of debugging might come in handy, so that weighs heavier in this case imo.
| /// Called when the actor is shutting down to allow for cleanup operations. | ||
| /// This method is called after all pending messages have been processed | ||
| /// but before the actor loop exits. | ||
| async fn shutdown(&mut self); |
There was a problem hiding this comment.
As I thought about why we used have a cache table at all (to make sure the parquet files aren't tiny), I realized this could lead to a similar result where we write smaller batches of events. For now, I think that's fine. Ideally, we're able to sort out making the aggregator idempotent on resume.
There was a problem hiding this comment.
The shutdown write will always be smaller than the 10k cache limit of course, but a full batch write yields fairly tiny parquet files anyway, between 25-62 events long. The shutdown flush files will be (arbitrarily?) short in comparison, but I think that's okay if it means the aggregator is reliable after shutdown.
I plotted histograms of the file length distributions after an automatic 10k flush, and the same with an additional shutdown flush of about 4k events:

|
Closing in favour of #735 (non-fork) |
Note: WIP
Closes #728