@@ -8,11 +8,13 @@ use crate::replica_context::ReplicaContext;
88use crate :: subscription:: module_subscription_actor:: { commit_and_broadcast_event, ModuleSubscriptions } ;
99use crate :: subscription:: module_subscription_manager:: { from_tx_offset, TransactionOffset } ;
1010use crate :: util:: asyncify;
11+ use crate :: util:: prometheus_handle:: IntGaugeExt ;
1112use chrono:: { DateTime , Utc } ;
1213use core:: mem;
1314use parking_lot:: { Mutex , MutexGuard } ;
1415use smallvec:: SmallVec ;
1516use spacetimedb_client_api_messages:: energy:: EnergyQuanta ;
17+ use spacetimedb_datastore:: db_metrics:: DB_METRICS ;
1618use spacetimedb_datastore:: execution_context:: Workload ;
1719use spacetimedb_datastore:: locking_tx_datastore:: state_view:: StateView ;
1820use spacetimedb_datastore:: locking_tx_datastore:: { FuncCallType , MutTxId } ;
@@ -722,7 +724,20 @@ impl InstanceEnv {
722724 return Err ( NodesError :: WouldBlockTransaction ( super :: AbiCall :: ProcedureHttpRequest ) ) ;
723725 }
724726
725- // TODO(procedure-metrics): record size in bytes of request.
727+ // Record in metrics that we're starting an HTTP request.
728+ DB_METRICS
729+ . procedure_num_http_requests
730+ . with_label_values ( self . database_identity ( ) )
731+ . inc ( ) ;
732+ DB_METRICS
733+ . procedure_http_request_size_bytes
734+ . with_label_values ( self . database_identity ( ) )
735+ . inc_by ( ( request. size_in_bytes ( ) + body. len ( ) ) as _ ) ;
736+ // Make a guard for the `in_progress` metric that will be decremented on exit.
737+ let _in_progress_metric = DB_METRICS
738+ . procedure_num_in_progress_http_requests
739+ . with_label_values ( self . database_identity ( ) )
740+ . inc_scope ( ) ;
726741
727742 fn http_error < E : ToString > ( err : E ) -> NodesError {
728743 NodesError :: HttpError ( err. to_string ( ) )
@@ -752,21 +767,49 @@ impl InstanceEnv {
752767 // TODO(perf): Stash a long-lived `Client` in the env somewhere, rather than building a new one for each call.
753768 let execute_fut = reqwest:: Client :: new ( ) . execute ( reqwest) ;
754769
755- Ok ( async move {
756- let response = execute_fut. await . map_err ( http_error ) ?;
770+ let response_fut = async {
771+ let response = execute_fut. await ?;
757772
758773 // Download the response body, which in all likelihood will be a stream,
759774 // as reqwest seems to prefer that.
760775 let ( response, body) = http:: Response :: from ( response) . into_parts ( ) ;
761- let body = http_body_util:: BodyExt :: collect ( body)
776+
777+ let body = http_body_util:: BodyExt :: collect ( body) . await ?. to_bytes ( ) ;
778+
779+ Ok ( ( response, body) )
780+ } ;
781+
782+ let database_identity = * self . database_identity ( ) ;
783+
784+ Ok ( async move {
785+ let ( response, body) = response_fut
762786 . await
763- . map_err ( http_error) ?
764- . to_bytes ( ) ;
787+ . inspect_err ( |err : & reqwest:: Error | {
788+ // Report the request's failure in our metrics as either a timeout or a misc. failure, as appropriate.
789+ if err. is_timeout ( ) {
790+ DB_METRICS
791+ . procedure_num_timeout_http_requests
792+ . with_label_values ( & database_identity)
793+ . inc ( ) ;
794+ } else {
795+ DB_METRICS
796+ . procedure_num_failed_http_requests
797+ . with_label_values ( & database_identity)
798+ . inc ( ) ;
799+ }
800+ } )
801+ . map_err ( http_error) ?;
765802
766803 // Transform the `http::Response` into our `spacetimedb_lib::http::Response` type,
767804 // which has a stable BSATN encoding to pass across the WASM boundary.
768805 let response = convert_http_response ( response) ;
769806
807+ // Record the response size in bytes.
808+ DB_METRICS
809+ . procedure_http_response_size_bytes
810+ . with_label_values ( & database_identity)
811+ . inc_by ( ( response. size_in_bytes ( ) + body. len ( ) ) as _ ) ;
812+
770813 Ok ( ( response, body) )
771814 } )
772815 }
0 commit comments