@@ -4,7 +4,7 @@ use std::{
44 marker:: PhantomData ,
55 num:: NonZeroU64 ,
66 sync:: Arc ,
7- time:: Duration ,
7+ time:: { Duration , Instant } ,
88} ;
99
1010use anyhow:: Context ;
@@ -31,12 +31,20 @@ use hotshot_types::{
3131 message:: UpgradeLock ,
3232 network:: NetworkConfig ,
3333 new_protocol:: CoordinatorEvent ,
34+ simple_certificate:: CertificatePair ,
3435 storage_metrics:: StorageMetricsValue ,
35- traits:: { metrics:: Metrics , network:: ConnectedNetwork } ,
36+ traits:: {
37+ metrics:: { Counter , Gauge , Histogram , Metrics } ,
38+ network:: ConnectedNetwork ,
39+ } ,
3640} ;
3741use parking_lot:: Mutex ;
3842use request_response:: RequestResponseConfig ;
39- use tokio:: { spawn, sync:: mpsc:: channel, task:: JoinHandle } ;
43+ use tokio:: {
44+ spawn,
45+ sync:: { mpsc:: channel, watch} ,
46+ task:: JoinHandle ,
47+ } ;
4048use tracing:: { Instrument , Level } ;
4149use url:: Url ;
4250
@@ -351,7 +359,26 @@ where
351359 metrics,
352360 ) ;
353361
354- // Spawn event handling loop.
362+ // Shared between the event loop and the background decide processor.
363+ let event_consumer = Arc :: new ( event_consumer) ;
364+
365+ // Wakes the background decide processor. `watch` coalesces: the processor is cursor-driven,
366+ // so it only needs the latest decided view.
367+ let ( decide_tx, decide_rx) = watch:: channel :: < DecideSignal > ( None ) ;
368+
369+ // Background decide processor: query-service ingestion + GC, decoupled from the event loop.
370+ ctx. spawn (
371+ "decide processor" ,
372+ process_decided_events_task (
373+ persistence. clone ( ) ,
374+ event_consumer. clone ( ) ,
375+ decide_rx,
376+ anchor_view,
377+ DecideProcessorMetrics :: new ( metrics) ,
378+ ) ,
379+ ) ;
380+
381+ // Event loop. On a decide this only does the leaf write, then signals `decide_tx`.
355382 ctx. spawn (
356383 "event handler" ,
357384 handle_events (
@@ -363,7 +390,7 @@ where
363390 external_event_handler,
364391 Some ( event_streamer. clone ( ) ) ,
365392 event_consumer,
366- anchor_view ,
393+ decide_tx ,
367394 ) ,
368395 ) ;
369396
@@ -525,35 +552,58 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence> Drop for SequencerCon
525552 }
526553}
527554
555+ /// Latest decided view and its (optional) deciding QC, sent from the event loop to the background
556+ /// decide processor. `None` is the initial/no-op value of the `watch` channel.
557+ type DecideSignal = Option < ( ViewNumber , Option < Arc < CertificatePair < SeqTypes > > > ) > ;
558+
559+ /// Metrics for the background decide processor. `backlog` (decided - processed) is the key signal:
560+ /// sustained growth means staging tables accumulate (no data lost, but disk grows).
561+ struct DecideProcessorMetrics {
562+ last_decided : Arc < dyn Gauge > ,
563+ last_processed : Arc < dyn Gauge > ,
564+ backlog : Arc < dyn Gauge > ,
565+ duration : Arc < dyn Histogram > ,
566+ failures : Arc < dyn Counter > ,
567+ }
568+
569+ impl DecideProcessorMetrics {
570+ fn new ( metrics : & ( impl Metrics + ?Sized ) ) -> Self {
571+ let metrics = metrics. subgroup ( "decide_processor" . into ( ) ) ;
572+ Self {
573+ last_decided : metrics
574+ . create_gauge ( "last_decided" . into ( ) , Some ( "view" . into ( ) ) )
575+ . into ( ) ,
576+ last_processed : metrics
577+ . create_gauge ( "last_processed" . into ( ) , Some ( "view" . into ( ) ) )
578+ . into ( ) ,
579+ backlog : metrics
580+ . create_gauge ( "backlog" . into ( ) , Some ( "view" . into ( ) ) )
581+ . into ( ) ,
582+ duration : metrics
583+ . create_histogram ( "process_duration" . into ( ) , Some ( "seconds" . into ( ) ) )
584+ . into ( ) ,
585+ failures : metrics. create_counter ( "failures" . into ( ) , None ) . into ( ) ,
586+ }
587+ }
588+ }
589+
528590#[ tracing:: instrument( skip_all, fields( node_id) ) ]
529591#[ allow( clippy:: too_many_arguments) ]
530- async fn handle_events < N , P > (
592+ async fn handle_events < N , P , C > (
531593 consensus_handle : Arc < ConsensusHandle < SeqTypes , ConsensusNode < N , P > > > ,
532594 node_id : u64 ,
533595 mut events : impl Stream < Item = CoordinatorEvent < SeqTypes > > + Unpin ,
534596 persistence : Arc < P > ,
535597 state_signer : Arc < RwLock < StateSigner < SequencerApiVersion > > > ,
536598 external_event_handler : ExternalEventHandler ,
537599 events_streamer : Option < Arc < RwLock < EventsStreamer < SeqTypes > > > > ,
538- event_consumer : impl PersistenceEventConsumer + ' static ,
539- anchor_view : Option < ViewNumber > ,
600+ event_consumer : Arc < C > ,
601+ decide_tx : watch :: Sender < DecideSignal > ,
540602) where
541603 N : ConnectedNetwork < PubKey > ,
542604 P : SequencerPersistence ,
605+ C : PersistenceEventConsumer + ' static ,
543606{
544- if let Some ( view) = anchor_view {
545- // Process and clean up any leaves that we may have persisted last time we were running but
546- // failed to handle due to a shutdown.
547- if let Err ( err) = persistence
548- . append_decided_leaves ( view, vec ! [ ] , None , & event_consumer)
549- . await
550- {
551- tracing:: warn!(
552- "failed to process decided leaves, chain may not be up to date: {err:#}"
553- ) ;
554- }
555- }
556-
557607 while let Some ( event) = events. next ( ) . await {
558608 tracing:: debug!( node_id, ?event, "consensus event" ) ;
559609
@@ -576,7 +626,17 @@ async fn handle_events<N, P>(
576626 _ => { } ,
577627 }
578628
579- let persistence_fut = persistence. handle_event ( & event, & event_consumer) ;
629+ // Critical path: only persist the decided leaves, then signal the background processor.
630+ // Signalling after the persist future means it never reads ahead of committed state.
631+ let persistence_fut = async {
632+ if let Some ( signal) = persistence
633+ . persist_event ( & event, event_consumer. as_ref ( ) )
634+ . await
635+ {
636+ // A closed receiver only happens during shutdown.
637+ let _ = decide_tx. send ( Some ( signal) ) ;
638+ }
639+ } ;
580640
581641 let state_signer_fut = async {
582642 state_signer
@@ -602,6 +662,99 @@ async fn handle_events<N, P>(
602662 }
603663}
604664
665+ const PROCESS_RETRY_INTERVAL : Duration = Duration :: from_secs ( 30 ) ;
666+
667+ /// Turns persisted decided leaves into query-service decide events and GCs processed data.
668+ /// Decoupled from [`handle_events`] so slow ingestion/GC can't stall (or drop) consensus events;
669+ /// cursor-driven, so it can lag without losing data.
670+ #[ tracing:: instrument( skip_all) ]
671+ async fn process_decided_events_task < P , C > (
672+ persistence : Arc < P > ,
673+ consumer : Arc < C > ,
674+ mut decide_rx : watch:: Receiver < DecideSignal > ,
675+ anchor_view : Option < ViewNumber > ,
676+ metrics : DecideProcessorMetrics ,
677+ ) where
678+ P : SequencerPersistence ,
679+ C : PersistenceEventConsumer + ' static ,
680+ {
681+ // Highest view confirmed processed, for the backlog gauge. Floored at the anchor view; the
682+ // cursor reported below raises it.
683+ let mut last_processed = anchor_view. map ( |v| v. u64 ( ) ) . unwrap_or ( 0 ) ;
684+
685+ // Process leaves persisted before a previous shutdown but not yet handled.
686+ if let Some ( view) = anchor_view {
687+ match persistence
688+ . process_decided_events ( view, None , consumer. as_ref ( ) )
689+ . await
690+ {
691+ Ok ( processed) => {
692+ if let Some ( v) = processed {
693+ last_processed = last_processed. max ( v. u64 ( ) ) ;
694+ }
695+ } ,
696+ Err ( err) => tracing:: warn!(
697+ "failed to process decided leaves on startup, chain may not be up to date: {err:#}"
698+ ) ,
699+ }
700+ }
701+
702+ // Reused on a timeout to re-attempt the most recent decide when no new one has arrived.
703+ let mut latest: DecideSignal = None ;
704+
705+ loop {
706+ // Wait for the next decide, retrying the most recent one if none arrives within the timeout.
707+ match tokio:: time:: timeout ( PROCESS_RETRY_INTERVAL , decide_rx. changed ( ) ) . await {
708+ Ok ( Ok ( ( ) ) ) => latest = decide_rx. borrow_and_update ( ) . clone ( ) ,
709+ Ok ( Err ( _) ) => {
710+ tracing:: info!( "decide signal channel closed, stopping decide processor" ) ;
711+ return ;
712+ } ,
713+ Err ( _) => { } , // Timed out; fall through to retry `latest`.
714+ }
715+
716+ let Some ( ( view, deciding_qc) ) = latest. clone ( ) else {
717+ continue ;
718+ } ;
719+ let decided = view. u64 ( ) ;
720+ metrics. last_decided . set ( decided as usize ) ;
721+ metrics
722+ . backlog
723+ . set ( decided. saturating_sub ( last_processed) as usize ) ;
724+
725+ let start = Instant :: now ( ) ;
726+ let result = persistence
727+ . process_decided_events ( view, deciding_qc, consumer. as_ref ( ) )
728+ . await ;
729+ metrics. duration . add_point ( start. elapsed ( ) . as_secs_f64 ( ) ) ;
730+
731+ match result {
732+ Ok ( processed) => {
733+ // Advance from the real cursor, not `decided`: if ingestion/GC lagged, `processed`
734+ // stays behind and the backlog gauge reflects it.
735+ if let Some ( v) = processed {
736+ last_processed = last_processed. max ( v. u64 ( ) ) ;
737+ }
738+ // reset latest if we have processed all the decided leaves
739+ if let Some ( ( view, _) ) = latest. clone ( )
740+ && last_processed >= view. u64 ( )
741+ {
742+ latest = None ;
743+ }
744+ metrics. last_processed . set ( last_processed as usize ) ;
745+ metrics
746+ . backlog
747+ . set ( decided. saturating_sub ( last_processed) as usize ) ;
748+ } ,
749+ Err ( err) => {
750+ // Cursor not advanced, so this range is retried next iteration; no data is lost.
751+ metrics. failures . add ( 1 ) ;
752+ tracing:: warn!( ?view, "deferred decide processing failed: {err:#}" ) ;
753+ } ,
754+ }
755+ }
756+ }
757+
605758#[ derive( Debug , Default , Clone ) ]
606759#[ allow( clippy:: type_complexity) ]
607760pub ( crate ) struct TaskList ( Arc < Mutex < Vec < ( String , JoinHandle < ( ) > ) > > > ) ;
0 commit comments