Skip to content

Commit 519f887

Browse files
committed
fix: ensure conclusion event batch is written flushed from cache as a whole
we split the batch into models/mids and were writing both pieces to disk separately. The entire batch is orded by conclusion_event_order, but each group could overlap. So we could flush the cache for the first write and then we'd have a cache that was "going backward" in conclusion_event_order. On restart, those events were missed and events failed to aggregate correctly
1 parent 67c6cfc commit 519f887

1 file changed

Lines changed: 16 additions & 9 deletions

File tree

pipeline/src/aggregator/mod.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,8 @@ impl Aggregator {
229229
debug!(
230230
?max_conclusion_event_order,
231231
?max_event_state_order,
232-
"highwater marks. setting max_conclusion_event_order to None to ensure no events missed"
232+
"aggregator highwater marks"
233233
);
234-
// replay events in case of a bad shutdown / failure to flush cache to disk
235-
let max_conclusion_event_order = None;
236234

237235
// Spawn actor
238236
let (broadcast_tx, _broadcast_rx) = broadcast::channel(1_000);
@@ -300,7 +298,7 @@ impl Aggregator {
300298
let models = self.patch_models(models).await.context("patching models")?;
301299
let models = self.validate_models(models).context("validating models")?;
302300
let mut models = self
303-
.store_event_states(models)
301+
.store_event_states(models, false)
304302
.await
305303
.context("storing models")?;
306304

@@ -431,7 +429,7 @@ impl Aggregator {
431429
.await
432430
.context("validating mids")?;
433431
let mut mids = self
434-
.store_event_states(mids)
432+
.store_event_states(mids, true)
435433
.await
436434
.context("storing mids")?;
437435
let mut ordered_events = Vec::with_capacity(models.len() + mids.len());
@@ -772,7 +770,11 @@ impl Aggregator {
772770
}
773771

774772
#[instrument(skip_all)]
775-
async fn store_event_states(&mut self, event_states: DataFrame) -> Result<Vec<RecordBatch>> {
773+
async fn store_event_states(
774+
&mut self,
775+
event_states: DataFrame,
776+
allow_cache_flush: bool,
777+
) -> Result<Vec<RecordBatch>> {
776778
let row_count = event_states.clone().count().await?;
777779
if row_count == 0 {
778780
return Ok(vec![RecordBatch::new_empty(
@@ -831,10 +833,14 @@ impl Aggregator {
831833
.await
832834
.context("writing to mem table data")?;
833835

834-
let count = self.count_cache().await?;
835-
// If we have enough data cached in memory write it out to persistent store
836-
if count >= self.max_cached_rows {
836+
if allow_cache_flush {
837837
self.flush_cache().await?;
838+
let count = self.count_cache().await?;
839+
tracing::debug!(%count, will_flush = %count >= self.max_cached_rows, "counts for mem table");
840+
// If we have enough data cached in memory write it out to persistent store
841+
if count >= self.max_cached_rows {
842+
self.flush_cache().await?;
843+
}
838844
}
839845
ordered.collect().await.context("collecting ordered events")
840846
}
@@ -1022,6 +1028,7 @@ impl Handler<NewConclusionEventsMsg> for Aggregator {
10221028
);
10231029
let batch = self.process_conclusion_events_batch(message.events).await?;
10241030
if batch.num_rows() > 0 {
1031+
tracing::trace!(event_count = %batch.num_rows(), "sending events to subscribers");
10251032
let _ = self.broadcast_tx.send(batch);
10261033
}
10271034
Ok(())

0 commit comments

Comments
 (0)