@@ -229,10 +229,8 @@ impl Aggregator {
229229 debug ! (
230230 ?max_conclusion_event_order,
231231 ?max_event_state_order,
232- "highwater marks. setting max_conclusion_event_order to None to ensure no events missed "
232+ "aggregator highwater marks"
233233 ) ;
234- // replay events in case of a bad shutdown / failure to flush cache to disk
235- let max_conclusion_event_order = None ;
236234
237235 // Spawn actor
238236 let ( broadcast_tx, _broadcast_rx) = broadcast:: channel ( 1_000 ) ;
@@ -300,7 +298,7 @@ impl Aggregator {
300298 let models = self . patch_models ( models) . await . context ( "patching models" ) ?;
301299 let models = self . validate_models ( models) . context ( "validating models" ) ?;
302300 let mut models = self
303- . store_event_states ( models)
301+ . store_event_states ( models, false )
304302 . await
305303 . context ( "storing models" ) ?;
306304
@@ -431,7 +429,7 @@ impl Aggregator {
431429 . await
432430 . context ( "validating mids" ) ?;
433431 let mut mids = self
434- . store_event_states ( mids)
432+ . store_event_states ( mids, true )
435433 . await
436434 . context ( "storing mids" ) ?;
437435 let mut ordered_events = Vec :: with_capacity ( models. len ( ) + mids. len ( ) ) ;
@@ -772,7 +770,11 @@ impl Aggregator {
772770 }
773771
774772 #[ instrument( skip_all) ]
775- async fn store_event_states ( & mut self , event_states : DataFrame ) -> Result < Vec < RecordBatch > > {
773+ async fn store_event_states (
774+ & mut self ,
775+ event_states : DataFrame ,
776+ allow_cache_flush : bool ,
777+ ) -> Result < Vec < RecordBatch > > {
776778 let row_count = event_states. clone ( ) . count ( ) . await ?;
777779 if row_count == 0 {
778780 return Ok ( vec ! [ RecordBatch :: new_empty(
@@ -831,10 +833,14 @@ impl Aggregator {
831833 . await
832834 . context ( "writing to mem table data" ) ?;
833835
834- let count = self . count_cache ( ) . await ?;
835- // If we have enough data cached in memory write it out to persistent store
836- if count >= self . max_cached_rows {
836+ if allow_cache_flush {
837837 self . flush_cache ( ) . await ?;
838+ let count = self . count_cache ( ) . await ?;
839+ tracing:: debug!( %count, will_flush = %count >= self . max_cached_rows, "counts for mem table" ) ;
840+ // If we have enough data cached in memory write it out to persistent store
841+ if count >= self . max_cached_rows {
842+ self . flush_cache ( ) . await ?;
843+ }
838844 }
839845 ordered. collect ( ) . await . context ( "collecting ordered events" )
840846 }
@@ -1022,6 +1028,7 @@ impl Handler<NewConclusionEventsMsg> for Aggregator {
10221028 ) ;
10231029 let batch = self . process_conclusion_events_batch ( message. events ) . await ?;
10241030 if batch. num_rows ( ) > 0 {
1031+ tracing:: trace!( event_count = %batch. num_rows( ) , "sending events to subscribers" ) ;
10251032 let _ = self . broadcast_tx . send ( batch) ;
10261033 }
10271034 Ok ( ( ) )
@@ -2049,9 +2056,7 @@ mod tests {
20492056 . once ( )
20502057 . with ( predicate:: eq ( SubscribeSinceMsg {
20512058 projection : None ,
2052- filters : None ,
2053- // TODO: need to figure out the memory table ordering bug so we can restart correctly in the future
2054- // filters: Some(vec![gt_expression("conclusion_event_order", 1)]),
2059+ filters : Some ( vec ! [ gt_expression( "conclusion_event_order" , 1 ) ] ) ,
20552060 limit : None ,
20562061 } ) )
20572062 . return_once ( |_msg| {
0 commit comments