@@ -15,15 +15,16 @@ use crate::db::db_metrics::DB_METRICS;
1515use crate :: db:: relational_db:: { MutTx , RelationalDB , Tx } ;
1616use crate :: error:: DBError ;
1717use crate :: estimation:: estimate_rows_scanned;
18- use crate :: execution_context:: Workload ;
18+ use crate :: execution_context:: { Workload , WorkloadType } ;
1919use crate :: host:: module_host:: { DatabaseUpdate , EventStatus , ModuleEvent } ;
2020use crate :: messages:: websocket:: Subscribe ;
2121use crate :: subscription:: execute_plans;
2222use crate :: subscription:: query:: is_subscribe_to_all_tables;
23+ use crate :: util:: prometheus_handle:: IntGaugeExt ;
2324use crate :: vm:: check_row_limit;
2425use crate :: worker_metrics:: WORKER_METRICS ;
2526use parking_lot:: RwLock ;
26- use prometheus:: IntGauge ;
27+ use prometheus:: { Histogram , HistogramTimer , IntCounter , IntGauge } ;
2728use spacetimedb_client_api_messages:: websocket:: {
2829 self as ws, BsatnFormat , FormatSwitch , JsonFormat , SubscribeMulti , SubscribeSingle , TableUpdate , Unsubscribe ,
2930 UnsubscribeMulti ,
@@ -100,6 +101,26 @@ impl SubscriptionGauges {
100101 }
101102}
102103
104+ pub struct SubscriptionMetrics {
105+ pub lock_waiters : IntGauge ,
106+ pub lock_wait_time : Histogram ,
107+ pub compilation_time : Histogram ,
108+ pub num_queries_subscribed : IntCounter ,
109+ pub num_queries_evaluated : IntCounter ,
110+ }
111+
112+ impl SubscriptionMetrics {
113+ pub fn new ( db : & Identity , workload : & WorkloadType ) -> Self {
114+ Self {
115+ lock_waiters : DB_METRICS . subscription_lock_waiters . with_label_values ( db) ,
116+ lock_wait_time : DB_METRICS . subscription_lock_wait_time . with_label_values ( db) ,
117+ compilation_time : DB_METRICS . subscription_compile_time . with_label_values ( db, workload) ,
118+ num_queries_subscribed : DB_METRICS . num_queries_subscribed . with_label_values ( db) ,
119+ num_queries_evaluated : DB_METRICS . num_queries_evaluated . with_label_values ( db, workload) ,
120+ }
121+ }
122+ }
123+
103124type AssertTxFn = Arc < dyn Fn ( & Tx ) > ;
104125type SubscriptionUpdate = FormatSwitch < TableUpdate < BsatnFormat > , TableUpdate < JsonFormat > > ;
105126type FullSubscriptionUpdate = FormatSwitch < ws:: DatabaseUpdate < BsatnFormat > , ws:: DatabaseUpdate < JsonFormat > > ;
@@ -453,14 +474,21 @@ impl ModuleSubscriptions {
453474 )
454475 } ;
455476
477+ let subscription_metrics = SubscriptionMetrics :: new ( & self . owner_identity , & WorkloadType :: Unsubscribe ) ;
478+
456479 // Always lock the db before the subscription lock to avoid deadlocks.
457480 let tx = scopeguard:: guard ( self . relational_db . begin_tx ( Workload :: Unsubscribe ) , |tx| {
458481 let ( tx_metrics, reducer) = self . relational_db . release_tx ( tx) ;
459482 self . relational_db . report ( & reducer, & tx_metrics, None ) ;
460483 } ) ;
461484
462485 let removed_queries = {
463- let mut subscriptions = self . subscriptions . write ( ) ;
486+ let mut subscriptions = {
487+ // How contended is the lock?
488+ let _wait_guard = subscription_metrics. lock_waiters . inc_scope ( ) ;
489+ let _wait_timer = subscription_metrics. lock_wait_time . start_timer ( ) ;
490+ self . subscriptions . write ( )
491+ } ;
464492
465493 return_on_err ! (
466494 subscriptions. remove_subscription( ( sender. id. identity, sender. id. connection_id) , request. query_id) ,
@@ -481,6 +509,10 @@ impl ModuleSubscriptions {
481509 None
482510 ) ;
483511
512+ subscription_metrics
513+ . num_queries_evaluated
514+ . inc_by ( removed_queries. len ( ) as _ ) ;
515+
484516 // Note: to make sure transaction updates are consistent, we need to put this in the broadcast
485517 // queue while we are still holding a read-lock on the database.
486518
@@ -513,12 +545,14 @@ impl ModuleSubscriptions {
513545 ///
514546 /// Instead we generate two hashes and outside of the tx lock.
515547 /// If either one is currently tracked, we can avoid recompilation.
548+ #[ allow( clippy:: type_complexity) ]
516549 fn compile_queries (
517550 & self ,
518551 sender : Identity ,
519552 queries : impl IntoIterator < Item = Box < str > > ,
520553 num_queries : usize ,
521- ) -> Result < ( Vec < Arc < Plan > > , AuthCtx , TxId ) , DBError > {
554+ metrics : & SubscriptionMetrics ,
555+ ) -> Result < ( Vec < Arc < Plan > > , AuthCtx , TxId , HistogramTimer ) , DBError > {
522556 let mut subscribe_to_all_tables = false ;
523557 let mut plans = Vec :: with_capacity ( num_queries) ;
524558 let mut query_hashes = Vec :: with_capacity ( num_queries) ;
@@ -540,7 +574,14 @@ impl ModuleSubscriptions {
540574 let ( tx_metrics, reducer) = self . relational_db . release_tx ( tx) ;
541575 self . relational_db . report ( & reducer, & tx_metrics, None ) ;
542576 } ) ;
543- let guard = self . subscriptions . read ( ) ;
577+
578+ let compile_timer = metrics. compilation_time . start_timer ( ) ;
579+
580+ let guard = {
581+ let _wait_guard = metrics. lock_waiters . inc_scope ( ) ;
582+ let _wait_timer = metrics. lock_wait_time . start_timer ( ) ;
583+ self . subscriptions . read ( )
584+ } ;
544585
545586 if subscribe_to_all_tables {
546587 plans. extend (
@@ -550,6 +591,8 @@ impl ModuleSubscriptions {
550591 ) ;
551592 }
552593
594+ let mut new_queries = 0 ;
595+
553596 for ( sql, hash, hash_with_param) in query_hashes {
554597 if let Some ( unit) = guard. query ( & hash) {
555598 plans. push ( unit) ;
@@ -564,10 +607,16 @@ impl ModuleSubscriptions {
564607 }
565608 } ) ?,
566609 ) ) ;
610+ new_queries += 1 ;
567611 }
568612 }
569613
570- Ok ( ( plans, auth, scopeguard:: ScopeGuard :: into_inner ( tx) ) )
614+ DB_METRICS
615+ . num_new_queries_subscribed
616+ . with_label_values ( & self . owner_identity )
617+ . inc_by ( new_queries) ;
618+
619+ Ok ( ( plans, auth, scopeguard:: ScopeGuard :: into_inner ( tx) , compile_timer) )
571620 }
572621
573622 /// Send a message to a client connection.
@@ -607,8 +656,19 @@ impl ModuleSubscriptions {
607656 } ;
608657
609658 let num_queries = request. query_strings . len ( ) ;
610- let ( queries, auth, tx) = return_on_err ! (
611- self . compile_queries( sender. id. identity, request. query_strings, num_queries) ,
659+
660+ let subscription_metrics = SubscriptionMetrics :: new ( & self . owner_identity , & WorkloadType :: Subscribe ) ;
661+
662+ // How many queries make up this subscription?
663+ subscription_metrics. num_queries_subscribed . inc_by ( num_queries as _ ) ;
664+
665+ let ( queries, auth, tx, compile_timer) = return_on_err ! (
666+ self . compile_queries(
667+ sender. id. identity,
668+ request. query_strings,
669+ num_queries,
670+ & subscription_metrics
671+ ) ,
612672 send_err_msg,
613673 None
614674 ) ;
@@ -622,21 +682,41 @@ impl ModuleSubscriptions {
622682 // an `commit_and_broadcast_event` grabs a read lock on `subscriptions` while it still has a
623683 // write lock on the db.
624684 let queries = {
625- let mut subscriptions = self . subscriptions . write ( ) ;
685+ let mut subscriptions = {
686+ // How contended is the lock?
687+ let _wait_guard = subscription_metrics. lock_waiters . inc_scope ( ) ;
688+ let _wait_timer = subscription_metrics. lock_wait_time . start_timer ( ) ;
689+ self . subscriptions . write ( )
690+ } ;
626691
627692 subscriptions. add_subscription_multi ( sender. clone ( ) , queries, request. query_id ) ?
628693 } ;
629694
695+ // Record how long it took to compile the subscription
696+ drop ( compile_timer) ;
697+
630698 let Ok ( ( update, metrics) ) =
631699 self . evaluate_queries ( sender. clone ( ) , & queries, & tx, & auth, TableUpdateType :: Subscribe )
632700 else {
633701 // If we fail the query, we need to remove the subscription.
634- let mut subscriptions = self . subscriptions . write ( ) ;
635- subscriptions. remove_subscription ( ( sender. id . identity , sender. id . connection_id ) , request. query_id ) ?;
702+ let mut subscriptions = {
703+ // How contended is the lock?
704+ let _wait_guard = subscription_metrics. lock_waiters . inc_scope ( ) ;
705+ let _wait_timer = subscription_metrics. lock_wait_time . start_timer ( ) ;
706+ self . subscriptions . write ( )
707+ } ;
708+ {
709+ let _compile_timer = subscription_metrics. compilation_time . start_timer ( ) ;
710+ subscriptions. remove_subscription ( ( sender. id . identity , sender. id . connection_id ) , request. query_id ) ?;
711+ }
712+
636713 send_err_msg ( "Internal error evaluating queries" . into ( ) ) ;
637714 return Ok ( None ) ;
638715 } ;
639716
717+ // How many queries did we actually evaluate?
718+ subscription_metrics. num_queries_evaluated . inc_by ( queries. len ( ) as _ ) ;
719+
640720 #[ cfg( test) ]
641721 if let Some ( assert) = _assert {
642722 assert ( & tx) ;
@@ -674,7 +754,17 @@ impl ModuleSubscriptions {
674754 _assert : Option < AssertTxFn > ,
675755 ) -> Result < ExecutionMetrics , DBError > {
676756 let num_queries = subscription. query_strings . len ( ) ;
677- let ( queries, auth, tx) = self . compile_queries ( sender. id . identity , subscription. query_strings , num_queries) ?;
757+ let subscription_metrics = SubscriptionMetrics :: new ( & self . owner_identity , & WorkloadType :: Subscribe ) ;
758+
759+ // How many queries make up this subscription?
760+ subscription_metrics. num_queries_subscribed . inc_by ( num_queries as _ ) ;
761+
762+ let ( queries, auth, tx, compile_timer) = self . compile_queries (
763+ sender. id . identity ,
764+ subscription. query_strings ,
765+ num_queries,
766+ & subscription_metrics,
767+ ) ?;
678768 let tx = scopeguard:: guard ( tx, |tx| {
679769 let ( tx_metrics, reducer) = self . relational_db . release_tx ( tx) ;
680770 self . relational_db . report ( & reducer, & tx_metrics, None ) ;
@@ -692,6 +782,9 @@ impl ModuleSubscriptions {
692782 & auth,
693783 ) ?;
694784
785+ // Record how long it took to compile the subscription
786+ drop ( compile_timer) ;
787+
695788 let tx = DeltaTx :: from ( & * tx) ;
696789 let ( database_update, metrics) = match sender. config . protocol {
697790 Protocol :: Binary => execute_plans ( & queries, & tx, TableUpdateType :: Subscribe )
@@ -703,8 +796,18 @@ impl ModuleSubscriptions {
703796 // It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
704797 // This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
705798 // but that should not pose an issue.
706- let mut subscriptions = self . subscriptions . write ( ) ;
707- subscriptions. set_legacy_subscription ( sender. clone ( ) , queries. into_iter ( ) ) ;
799+ {
800+ let _compile_timer = subscription_metrics. compilation_time . start_timer ( ) ;
801+
802+ let mut subscriptions = {
803+ // How contended is the lock?
804+ let _wait_guard = subscription_metrics. lock_waiters . inc_scope ( ) ;
805+ let _wait_timer = subscription_metrics. lock_wait_time . start_timer ( ) ;
806+ self . subscriptions . write ( )
807+ } ;
808+
809+ subscriptions. set_legacy_subscription ( sender. clone ( ) , queries. into_iter ( ) ) ;
810+ }
708811
709812 #[ cfg( test) ]
710813 if let Some ( assert) = _assert {
@@ -745,9 +848,16 @@ impl ModuleSubscriptions {
745848 mut event : ModuleEvent ,
746849 tx : MutTx ,
747850 ) -> Result < Result < ( Arc < ModuleEvent > , ExecutionMetrics ) , WriteConflict > , DBError > {
851+ let subscription_metrics = SubscriptionMetrics :: new ( & self . owner_identity , & WorkloadType :: Subscribe ) ;
852+
748853 // Take a read lock on `subscriptions` before committing tx
749854 // else it can result in subscriber receiving duplicate updates.
750- let subscriptions = self . subscriptions . read ( ) ;
855+ let subscriptions = {
856+ let _wait_guard = subscription_metrics. lock_waiters . inc_scope ( ) ;
857+ let _wait_timer = subscription_metrics. lock_wait_time . start_timer ( ) ;
858+ self . subscriptions . read ( )
859+ } ;
860+
751861 let stdb = & self . relational_db ;
752862 // Downgrade mutable tx.
753863 // We'll later ensure tx is released/cleaned up once out of scope.
0 commit comments