@@ -15,15 +15,16 @@ use crate::db::db_metrics::DB_METRICS;
1515use crate :: db:: relational_db:: { MutTx , RelationalDB , Tx } ;
1616use crate :: error:: DBError ;
1717use crate :: estimation:: estimate_rows_scanned;
18- use crate :: execution_context:: Workload ;
18+ use crate :: execution_context:: { Workload , WorkloadType } ;
1919use crate :: host:: module_host:: { DatabaseUpdate , EventStatus , ModuleEvent } ;
2020use crate :: messages:: websocket:: Subscribe ;
2121use crate :: subscription:: execute_plans;
2222use crate :: subscription:: query:: is_subscribe_to_all_tables;
23+ use crate :: util:: prometheus_handle:: IntGaugeExt ;
2324use crate :: vm:: check_row_limit;
2425use crate :: worker_metrics:: WORKER_METRICS ;
2526use parking_lot:: RwLock ;
26- use prometheus:: IntGauge ;
27+ use prometheus:: { Histogram , HistogramTimer , IntCounter , IntGauge } ;
2728use spacetimedb_client_api_messages:: websocket:: {
2829 self as ws, BsatnFormat , FormatSwitch , JsonFormat , SubscribeMulti , SubscribeSingle , TableUpdate , Unsubscribe ,
2930 UnsubscribeMulti ,
@@ -100,6 +101,28 @@ impl SubscriptionGauges {
100101 }
101102}
102103
104+ pub struct SubscriptionMetrics {
105+ pub lock_waiters : IntGauge ,
106+ pub lock_wait_time : Histogram ,
107+ pub compilation_time : Histogram ,
108+ pub num_queries_subscribed : IntCounter ,
109+ pub num_new_queries_subscribed : IntCounter ,
110+ pub num_queries_evaluated : IntCounter ,
111+ }
112+
113+ impl SubscriptionMetrics {
114+ pub fn new ( db : & Identity , workload : & WorkloadType ) -> Self {
115+ Self {
116+ lock_waiters : DB_METRICS . subscription_lock_waiters . with_label_values ( db, workload) ,
117+ lock_wait_time : DB_METRICS . subscription_lock_wait_time . with_label_values ( db, workload) ,
118+ compilation_time : DB_METRICS . subscription_compile_time . with_label_values ( db, workload) ,
119+ num_queries_subscribed : DB_METRICS . num_queries_subscribed . with_label_values ( db) ,
120+ num_new_queries_subscribed : DB_METRICS . num_new_queries_subscribed . with_label_values ( db) ,
121+ num_queries_evaluated : DB_METRICS . num_queries_evaluated . with_label_values ( db, workload) ,
122+ }
123+ }
124+ }
125+
103126type AssertTxFn = Arc < dyn Fn ( & Tx ) > ;
104127type SubscriptionUpdate = FormatSwitch < TableUpdate < BsatnFormat > , TableUpdate < JsonFormat > > ;
105128type FullSubscriptionUpdate = FormatSwitch < ws:: DatabaseUpdate < BsatnFormat > , ws:: DatabaseUpdate < JsonFormat > > ;
@@ -453,14 +476,21 @@ impl ModuleSubscriptions {
453476 )
454477 } ;
455478
479+ let subscription_metrics = SubscriptionMetrics :: new ( & self . owner_identity , & WorkloadType :: Unsubscribe ) ;
480+
456481 // Always lock the db before the subscription lock to avoid deadlocks.
457482 let tx = scopeguard:: guard ( self . relational_db . begin_tx ( Workload :: Unsubscribe ) , |tx| {
458483 let ( tx_metrics, reducer) = self . relational_db . release_tx ( tx) ;
459484 self . relational_db . report ( & reducer, & tx_metrics, None ) ;
460485 } ) ;
461486
462487 let removed_queries = {
463- let mut subscriptions = self . subscriptions . write ( ) ;
488+ let mut subscriptions = {
489+ // How contended is the lock?
490+ let _wait_guard = subscription_metrics. lock_waiters . inc_scope ( ) ;
491+ let _wait_timer = subscription_metrics. lock_wait_time . start_timer ( ) ;
492+ self . subscriptions . write ( )
493+ } ;
464494
465495 return_on_err ! (
466496 subscriptions. remove_subscription( ( sender. id. identity, sender. id. connection_id) , request. query_id) ,
@@ -481,6 +511,11 @@ impl ModuleSubscriptions {
481511 None
482512 ) ;
483513
514+ // How many queries did we evaluate?
515+ subscription_metrics
516+ . num_queries_evaluated
517+ . inc_by ( removed_queries. len ( ) as _ ) ;
518+
484519 // Note: to make sure transaction updates are consistent, we need to put this in the broadcast
485520 // queue while we are still holding a read-lock on the database.
486521
@@ -513,12 +548,14 @@ impl ModuleSubscriptions {
513548 ///
514549 /// Instead we generate two hashes and outside of the tx lock.
515550 /// If either one is currently tracked, we can avoid recompilation.
551+ #[ allow( clippy:: type_complexity) ]
516552 fn compile_queries (
517553 & self ,
518554 sender : Identity ,
519555 queries : impl IntoIterator < Item = Box < str > > ,
520556 num_queries : usize ,
521- ) -> Result < ( Vec < Arc < Plan > > , AuthCtx , TxId ) , DBError > {
557+ metrics : & SubscriptionMetrics ,
558+ ) -> Result < ( Vec < Arc < Plan > > , AuthCtx , TxId , HistogramTimer ) , DBError > {
522559 let mut subscribe_to_all_tables = false ;
523560 let mut plans = Vec :: with_capacity ( num_queries) ;
524561 let mut query_hashes = Vec :: with_capacity ( num_queries) ;
@@ -540,7 +577,15 @@ impl ModuleSubscriptions {
540577 let ( tx_metrics, reducer) = self . relational_db . release_tx ( tx) ;
541578 self . relational_db . report ( & reducer, & tx_metrics, None ) ;
542579 } ) ;
543- let guard = self . subscriptions . read ( ) ;
580+
581+ let compile_timer = metrics. compilation_time . start_timer ( ) ;
582+
583+ let guard = {
584+ // How contended is the lock?
585+ let _wait_guard = metrics. lock_waiters . inc_scope ( ) ;
586+ let _wait_timer = metrics. lock_wait_time . start_timer ( ) ;
587+ self . subscriptions . read ( )
588+ } ;
544589
545590 if subscribe_to_all_tables {
546591 plans. extend (
@@ -550,6 +595,8 @@ impl ModuleSubscriptions {
550595 ) ;
551596 }
552597
598+ let mut new_queries = 0 ;
599+
553600 for ( sql, hash, hash_with_param) in query_hashes {
554601 if let Some ( unit) = guard. query ( & hash) {
555602 plans. push ( unit) ;
@@ -564,10 +611,14 @@ impl ModuleSubscriptions {
564611 }
565612 } ) ?,
566613 ) ) ;
614+ new_queries += 1 ;
567615 }
568616 }
569617
570- Ok ( ( plans, auth, scopeguard:: ScopeGuard :: into_inner ( tx) ) )
618+ // How many queries in this subscription are not cached?
619+ metrics. num_new_queries_subscribed . inc_by ( new_queries) ;
620+
621+ Ok ( ( plans, auth, scopeguard:: ScopeGuard :: into_inner ( tx) , compile_timer) )
571622 }
572623
573624 /// Send a message to a client connection.
@@ -607,8 +658,19 @@ impl ModuleSubscriptions {
607658 } ;
608659
609660 let num_queries = request. query_strings . len ( ) ;
610- let ( queries, auth, tx) = return_on_err ! (
611- self . compile_queries( sender. id. identity, request. query_strings, num_queries) ,
661+
662+ let subscription_metrics = SubscriptionMetrics :: new ( & self . owner_identity , & WorkloadType :: Subscribe ) ;
663+
664+ // How many queries make up this subscription?
665+ subscription_metrics. num_queries_subscribed . inc_by ( num_queries as _ ) ;
666+
667+ let ( queries, auth, tx, compile_timer) = return_on_err ! (
668+ self . compile_queries(
669+ sender. id. identity,
670+ request. query_strings,
671+ num_queries,
672+ & subscription_metrics
673+ ) ,
612674 send_err_msg,
613675 None
614676 ) ;
@@ -622,21 +684,41 @@ impl ModuleSubscriptions {
622684 // an `commit_and_broadcast_event` grabs a read lock on `subscriptions` while it still has a
623685 // write lock on the db.
624686 let queries = {
625- let mut subscriptions = self . subscriptions . write ( ) ;
687+ let mut subscriptions = {
688+ // How contended is the lock?
689+ let _wait_guard = subscription_metrics. lock_waiters . inc_scope ( ) ;
690+ let _wait_timer = subscription_metrics. lock_wait_time . start_timer ( ) ;
691+ self . subscriptions . write ( )
692+ } ;
626693
627694 subscriptions. add_subscription_multi ( sender. clone ( ) , queries, request. query_id ) ?
628695 } ;
629696
697+ // Record how long it took to compile the subscription
698+ drop ( compile_timer) ;
699+
630700 let Ok ( ( update, metrics) ) =
631701 self . evaluate_queries ( sender. clone ( ) , & queries, & tx, & auth, TableUpdateType :: Subscribe )
632702 else {
633703 // If we fail the query, we need to remove the subscription.
634- let mut subscriptions = self . subscriptions . write ( ) ;
635- subscriptions. remove_subscription ( ( sender. id . identity , sender. id . connection_id ) , request. query_id ) ?;
704+ let mut subscriptions = {
705+ // How contended is the lock?
706+ let _wait_guard = subscription_metrics. lock_waiters . inc_scope ( ) ;
707+ let _wait_timer = subscription_metrics. lock_wait_time . start_timer ( ) ;
708+ self . subscriptions . write ( )
709+ } ;
710+ {
711+ let _compile_timer = subscription_metrics. compilation_time . start_timer ( ) ;
712+ subscriptions. remove_subscription ( ( sender. id . identity , sender. id . connection_id ) , request. query_id ) ?;
713+ }
714+
636715 send_err_msg ( "Internal error evaluating queries" . into ( ) ) ;
637716 return Ok ( None ) ;
638717 } ;
639718
719+ // How many queries did we actually evaluate?
720+ subscription_metrics. num_queries_evaluated . inc_by ( queries. len ( ) as _ ) ;
721+
640722 #[ cfg( test) ]
641723 if let Some ( assert) = _assert {
642724 assert ( & tx) ;
@@ -674,7 +756,17 @@ impl ModuleSubscriptions {
674756 _assert : Option < AssertTxFn > ,
675757 ) -> Result < ExecutionMetrics , DBError > {
676758 let num_queries = subscription. query_strings . len ( ) ;
677- let ( queries, auth, tx) = self . compile_queries ( sender. id . identity , subscription. query_strings , num_queries) ?;
759+ let subscription_metrics = SubscriptionMetrics :: new ( & self . owner_identity , & WorkloadType :: Subscribe ) ;
760+
761+ // How many queries make up this subscription?
762+ subscription_metrics. num_queries_subscribed . inc_by ( num_queries as _ ) ;
763+
764+ let ( queries, auth, tx, compile_timer) = self . compile_queries (
765+ sender. id . identity ,
766+ subscription. query_strings ,
767+ num_queries,
768+ & subscription_metrics,
769+ ) ?;
678770 let tx = scopeguard:: guard ( tx, |tx| {
679771 let ( tx_metrics, reducer) = self . relational_db . release_tx ( tx) ;
680772 self . relational_db . report ( & reducer, & tx_metrics, None ) ;
@@ -692,6 +784,9 @@ impl ModuleSubscriptions {
692784 & auth,
693785 ) ?;
694786
787+ // Record how long it took to compile the subscription
788+ drop ( compile_timer) ;
789+
695790 let tx = DeltaTx :: from ( & * tx) ;
696791 let ( database_update, metrics) = match sender. config . protocol {
697792 Protocol :: Binary => execute_plans ( & queries, & tx, TableUpdateType :: Subscribe )
@@ -703,8 +798,18 @@ impl ModuleSubscriptions {
703798 // It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
704799 // This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
705800 // but that should not pose an issue.
706- let mut subscriptions = self . subscriptions . write ( ) ;
707- subscriptions. set_legacy_subscription ( sender. clone ( ) , queries. into_iter ( ) ) ;
801+ {
802+ let _compile_timer = subscription_metrics. compilation_time . start_timer ( ) ;
803+
804+ let mut subscriptions = {
805+ // How contended is the lock?
806+ let _wait_guard = subscription_metrics. lock_waiters . inc_scope ( ) ;
807+ let _wait_timer = subscription_metrics. lock_wait_time . start_timer ( ) ;
808+ self . subscriptions . write ( )
809+ } ;
810+
811+ subscriptions. set_legacy_subscription ( sender. clone ( ) , queries. into_iter ( ) ) ;
812+ }
708813
709814 #[ cfg( test) ]
710815 if let Some ( assert) = _assert {
@@ -745,9 +850,17 @@ impl ModuleSubscriptions {
745850 mut event : ModuleEvent ,
746851 tx : MutTx ,
747852 ) -> Result < Result < ( Arc < ModuleEvent > , ExecutionMetrics ) , WriteConflict > , DBError > {
853+ let subscription_metrics = SubscriptionMetrics :: new ( & self . owner_identity , & WorkloadType :: Update ) ;
854+
748855 // Take a read lock on `subscriptions` before committing tx
749856 // else it can result in subscriber receiving duplicate updates.
750- let subscriptions = self . subscriptions . read ( ) ;
857+ let subscriptions = {
858+ // How contended is the lock?
859+ let _wait_guard = subscription_metrics. lock_waiters . inc_scope ( ) ;
860+ let _wait_timer = subscription_metrics. lock_wait_time . start_timer ( ) ;
861+ self . subscriptions . read ( )
862+ } ;
863+
751864 let stdb = & self . relational_db ;
752865 // Downgrade mutable tx.
753866 // We'll later ensure tx is released/cleaned up once out of scope.
0 commit comments