@@ -18,6 +18,7 @@ use super::datastore::{
1818} ;
1919use super :: db_metrics:: DB_METRICS ;
2020use crate :: db:: datastore:: system_tables:: StModuleRow ;
21+ use crate :: db:: MetricsRecorderQueue ;
2122use crate :: error:: { DBError , DatabaseError , RestoreSnapshotError } ;
2223use crate :: execution_context:: { ReducerContext , Workload , WorkloadType } ;
2324use crate :: messages:: control_db:: HostType ;
@@ -110,14 +111,17 @@ pub struct RelationalDB {
110111 /// `Some` if `durability` is `Some`, `None` otherwise.
111112 disk_size_fn : Option < DiskSizeFn > ,
112113
114+ /// A map from workload types to their cached prometheus counters.
115+ workload_type_to_exec_counters : Arc < EnumMap < WorkloadType , ExecutionCounters > > ,
116+
117+ /// An async queue for recording transaction metrics off the main thread
118+ metrics_recorder_queue : Option < MetricsRecorderQueue > ,
119+
113120 // DO NOT ADD FIELDS AFTER THIS.
114121 // By default, fields are dropped in declaration order.
115122 // We want to release the file lock last.
116123 // TODO(noa): is this lockfile still necessary now that we have data-dir?
117124 _lock : LockFile ,
118-
119- /// A map from workload types to their cached prometheus counters.
120- workload_type_to_exec_counters : Arc < EnumMap < WorkloadType , ExecutionCounters > > ,
121125}
122126
123127#[ derive( Clone ) ]
@@ -231,6 +235,7 @@ impl RelationalDB {
231235 inner : Locking ,
232236 durability : Option < ( Arc < Durability > , DiskSizeFn ) > ,
233237 snapshot_repo : Option < Arc < SnapshotRepository > > ,
238+ metrics_recorder_queue : Option < MetricsRecorderQueue > ,
234239 ) -> Self {
235240 let ( durability, disk_size_fn) = durability. unzip ( ) ;
236241 let snapshot_worker =
@@ -249,8 +254,10 @@ impl RelationalDB {
249254 row_count_fn : default_row_count_fn ( database_identity) ,
250255 disk_size_fn,
251256
252- _lock : lock,
253257 workload_type_to_exec_counters,
258+ metrics_recorder_queue,
259+
260+ _lock : lock,
254261 }
255262 }
256263
@@ -324,6 +331,10 @@ impl RelationalDB {
324331 /// If restoring from an existing database, the `snapshot_repo` must
325332 /// store views of the same sequence of TXes as the `history`.
326333 ///
334+ /// - `metrics_recorder_queue`
335+ ///
336+ /// The send side of a queue for recording transaction metrics.
337+ ///
327338 /// # Return values
328339 ///
329340 /// Alongside `Self`, [`ConnectedClients`] is returned, which is the set of
@@ -333,13 +344,15 @@ impl RelationalDB {
333344 /// gracefully. The caller is responsible for disconnecting the clients.
334345 ///
335346 /// [ModuleHost]: crate::host::module_host::ModuleHost
347+ #[ allow( clippy:: too_many_arguments) ]
336348 pub fn open (
337349 root : & ReplicaDir ,
338350 database_identity : Identity ,
339351 owner_identity : Identity ,
340352 history : impl durability:: History < TxData = Txdata > ,
341353 durability : Option < ( Arc < Durability > , DiskSizeFn ) > ,
342354 snapshot_repo : Option < Arc < SnapshotRepository > > ,
355+ metrics_recorder_queue : Option < MetricsRecorderQueue > ,
343356 page_pool : PagePool ,
344357 ) -> Result < ( Self , ConnectedClients ) , DBError > {
345358 log:: trace!( "[{}] DATABASE: OPEN" , database_identity) ;
@@ -373,6 +386,7 @@ impl RelationalDB {
373386 inner,
374387 durability,
375388 snapshot_repo,
389+ metrics_recorder_queue,
376390 ) ;
377391
378392 if let Some ( meta) = db. metadata ( ) ? {
@@ -749,6 +763,11 @@ impl RelationalDB {
749763 Ok ( AlgebraicValue :: decode ( col_ty, & mut & * bytes) ?)
750764 }
751765
766+ /// Returns the execution counters for this database.
767+ pub fn exec_counter_map ( & self ) -> Arc < EnumMap < WorkloadType , ExecutionCounters > > {
768+ self . workload_type_to_exec_counters . clone ( )
769+ }
770+
752771 /// Returns the execution counters for `workload_type` for this database.
753772 pub fn exec_counters_for ( & self , workload_type : WorkloadType ) -> & ExecutionCounters {
754773 & self . workload_type_to_exec_counters [ workload_type]
@@ -988,7 +1007,7 @@ impl RelationalDB {
9881007 let mut tx = self . begin_tx ( workload) ;
9891008 let res = f ( & mut tx) ;
9901009 let ( tx_metrics, reducer) = self . release_tx ( tx) ;
991- self . report_tx_metricses ( & reducer, None , None , & tx_metrics) ;
1010+ self . report_read_tx_metrics ( reducer, tx_metrics) ;
9921011 res
9931012 }
9941013
@@ -999,11 +1018,11 @@ impl RelationalDB {
9991018 {
10001019 if res. is_err ( ) {
10011020 let ( tx_metrics, reducer) = self . rollback_mut_tx ( tx) ;
1002- self . report ( & reducer, & tx_metrics, None ) ;
1021+ self . report_mut_tx_metrics ( reducer, tx_metrics, None ) ;
10031022 } else {
10041023 match self . commit_tx ( tx) . map_err ( E :: from) ? {
10051024 Some ( ( tx_data, tx_metrics, reducer) ) => {
1006- self . report ( & reducer, & tx_metrics, Some ( & tx_data) ) ;
1025+ self . report_mut_tx_metrics ( reducer, tx_metrics, Some ( tx_data) ) ;
10071026 }
10081027 None => panic ! ( "TODO: retry?" ) ,
10091028 }
@@ -1018,7 +1037,7 @@ impl RelationalDB {
10181037 match res {
10191038 Err ( e) => {
10201039 let ( tx_metrics, reducer) = self . rollback_mut_tx ( tx) ;
1021- self . report ( & reducer, & tx_metrics, None ) ;
1040+ self . report_mut_tx_metrics ( reducer, tx_metrics, None ) ;
10221041
10231042 Err ( e)
10241043 }
@@ -1042,17 +1061,22 @@ impl RelationalDB {
10421061 /// Reports the `TxMetrics`s passed.
10431062 ///
10441063 /// Should only be called after the tx lock has been fully released.
1045- pub ( crate ) fn report_tx_metricses (
1064+ pub ( crate ) fn report_tx_metrics (
10461065 & self ,
1047- reducer : & str ,
1048- tx_data : Option < & TxData > ,
1049- metrics_mut : Option < & TxMetrics > ,
1050- metrics_read : & TxMetrics ,
1066+ reducer : String ,
1067+ tx_data : Option < Arc < TxData > > ,
1068+ metrics_for_writer : Option < TxMetrics > ,
1069+ metrics_for_reader : Option < TxMetrics > ,
10511070 ) {
1052- if let Some ( metrics_mut) = metrics_mut {
1053- self . report ( reducer, metrics_mut, tx_data) ;
1071+ if let Some ( recorder) = & self . metrics_recorder_queue {
1072+ recorder. send_metrics (
1073+ reducer,
1074+ metrics_for_writer,
1075+ metrics_for_reader,
1076+ tx_data,
1077+ self . exec_counter_map ( ) ,
1078+ ) ;
10541079 }
1055- self . report ( reducer, metrics_read, None ) ;
10561080 }
10571081}
10581082
@@ -1403,8 +1427,13 @@ impl RelationalDB {
14031427 }
14041428
14051429 /// Reports the metrics for `reducer`, using counters provided by `db`.
1406- pub fn report ( & self , reducer : & str , metrics : & TxMetrics , tx_data : Option < & TxData > ) {
1407- metrics. report ( tx_data, reducer, |wl : WorkloadType | self . exec_counters_for ( wl) ) ;
1430+ pub fn report_mut_tx_metrics ( & self , reducer : String , metrics : TxMetrics , tx_data : Option < TxData > ) {
1431+ self . report_tx_metrics ( reducer, tx_data. map ( Arc :: new) , Some ( metrics) , None ) ;
1432+ }
1433+
1434+ /// Reports subscription metrics for `reducer`, using counters provided by `db`.
1435+ pub fn report_read_tx_metrics ( & self , reducer : String , metrics : TxMetrics ) {
1436+ self . report_tx_metrics ( reducer, None , None , Some ( metrics) ) ;
14081437 }
14091438
14101439 /// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var`
@@ -1779,7 +1808,7 @@ pub mod tests_utils {
17791808 expected_num_clients : usize ,
17801809 ) -> Result < Self , DBError > {
17811810 let dir = TempReplicaDir :: new ( ) ?;
1782- let db = Self :: open_db ( & dir, history, None , None , expected_num_clients) ?;
1811+ let db = Self :: open_db ( & dir, history, None , None , None , expected_num_clients) ?;
17831812 Ok ( Self {
17841813 db,
17851814 durable : None ,
@@ -1870,7 +1899,7 @@ pub mod tests_utils {
18701899 }
18711900
18721901 fn in_memory_internal ( root : & ReplicaDir ) -> Result < RelationalDB , DBError > {
1873- Self :: open_db ( root, EmptyHistory :: new ( ) , None , None , 0 )
1902+ Self :: open_db ( root, EmptyHistory :: new ( ) , None , None , None , 0 )
18741903 }
18751904
18761905 fn durable_internal (
@@ -1884,7 +1913,7 @@ pub mod tests_utils {
18841913 let snapshot_repo = want_snapshot_repo
18851914 . then ( || open_snapshot_repo ( root. snapshots ( ) , Identity :: ZERO , 0 ) )
18861915 . transpose ( ) ?;
1887- let db = Self :: open_db ( root, history, Some ( ( durability, disk_size_fn) ) , snapshot_repo, 0 ) ?;
1916+ let db = Self :: open_db ( root, history, Some ( ( durability, disk_size_fn) ) , snapshot_repo, None , 0 ) ?;
18881917
18891918 Ok ( ( db, local) )
18901919 }
@@ -1894,6 +1923,7 @@ pub mod tests_utils {
18941923 history : impl durability:: History < TxData = Txdata > ,
18951924 durability : Option < ( Arc < Durability > , DiskSizeFn ) > ,
18961925 snapshot_repo : Option < Arc < SnapshotRepository > > ,
1926+ metrics_recorder_queue : Option < MetricsRecorderQueue > ,
18971927 expected_num_clients : usize ,
18981928 ) -> Result < RelationalDB , DBError > {
18991929 let ( db, connected_clients) = RelationalDB :: open (
@@ -1903,6 +1933,7 @@ pub mod tests_utils {
19031933 history,
19041934 durability,
19051935 snapshot_repo,
1936+ metrics_recorder_queue,
19061937 PagePool :: new_for_test ( ) ,
19071938 ) ?;
19081939 assert_eq ! ( connected_clients. len( ) , expected_num_clients) ;
@@ -2151,6 +2182,7 @@ mod tests {
21512182 EmptyHistory :: new ( ) ,
21522183 None ,
21532184 None ,
2185+ None ,
21542186 PagePool :: new_for_test ( ) ,
21552187 ) {
21562188 Ok ( _) => {
0 commit comments