Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 49 additions & 6 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ use crate::replica_context::ReplicaContext;
use crate::subscription::module_subscription_actor::{commit_and_broadcast_event, ModuleSubscriptions};
use crate::subscription::module_subscription_manager::{from_tx_offset, TransactionOffset};
use crate::util::asyncify;
use crate::util::prometheus_handle::IntGaugeExt;
use chrono::{DateTime, Utc};
use core::mem;
use parking_lot::{Mutex, MutexGuard};
use smallvec::SmallVec;
use spacetimedb_client_api_messages::energy::EnergyQuanta;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId};
Expand Down Expand Up @@ -714,7 +716,20 @@ impl InstanceEnv {
return Err(NodesError::WouldBlockTransaction(super::AbiCall::ProcedureHttpRequest));
}

// TODO(procedure-metrics): record size in bytes of request.
// Record in metrics that we're starting an HTTP request.
DB_METRICS
.procedure_num_http_requests
.with_label_values(self.database_identity())
.inc();
DB_METRICS
.procedure_http_request_size_bytes
.with_label_values(self.database_identity())
.inc_by((request.size_in_bytes() + body.len()) as _);
// Make a guard for the `in_progress` metric that will be decremented on exit.
let _in_progress_metric = DB_METRICS
.procedure_num_in_progress_http_requests
.with_label_values(self.database_identity())
.inc_scope();

fn http_error<E: ToString>(err: E) -> NodesError {
NodesError::HttpError(err.to_string())
Expand Down Expand Up @@ -744,21 +759,49 @@ impl InstanceEnv {
// TODO(perf): Stash a long-lived `Client` in the env somewhere, rather than building a new one for each call.
let execute_fut = reqwest::Client::new().execute(reqwest);

Ok(async move {
let response = execute_fut.await.map_err(http_error)?;
let response_fut = async {
let response = execute_fut.await?;

// Download the response body, which in all likelihood will be a stream,
// as reqwest seems to prefer that.
let (response, body) = http::Response::from(response).into_parts();
let body = http_body_util::BodyExt::collect(body)

let body = http_body_util::BodyExt::collect(body).await?.to_bytes();

Ok((response, body))
};

let database_identity = *self.database_identity();

Ok(async move {
let (response, body) = response_fut
.await
.map_err(http_error)?
.to_bytes();
.inspect_err(|err: &reqwest::Error| {
// Report the request's failure in our metrics as either a timeout or a misc. failure, as appropriate.
if err.is_timeout() {
DB_METRICS
.procedure_num_timeout_http_requests
.with_label_values(&database_identity)
.inc();
} else {
DB_METRICS
.procedure_num_failed_http_requests
.with_label_values(&database_identity)
.inc();
}
})
.map_err(http_error)?;

// Transform the `http::Response` into our `spacetimedb_lib::http::Response` type,
// which has a stable BSATN encoding to pass across the WASM boundary.
let response = convert_http_response(response);

// Record the response size in bytes.
DB_METRICS
.procedure_http_response_size_bytes
.with_label_values(&database_identity)
.inc_by((response.size_in_bytes() + body.len()) as _);

Ok((response, body))
})
}
Expand Down
55 changes: 55 additions & 0 deletions crates/datastore/src/db_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,61 @@ metrics_group!(
#[help = "How many queries are evaluated in each subscribe and unsubscribe"]
#[labels(db: Identity, workload: WorkloadType)]
pub num_queries_evaluated: IntCounterVec,

#[name = spacetime_procedure_http_request_size_bytes]
#[help = "Size in bytes of HTTP requests performed by procedures running in databases.

An individual HTTP request's size in bytes is the sum of the sizes of the URI, header names, header values and body."]
#[labels(db: Identity)]
pub procedure_http_request_size_bytes: IntCounterVec,

#[name = spacetime_procedure_http_response_size_bytes]
#[help = "Size in bytes of HTTP responses to requests performed by procedures running in databases.

An individual HTTP response's size in bytes is the sum of the sizes of the header names, header values and body."]
#[labels(db: Identity)]
pub procedure_http_response_size_bytes: IntCounterVec,

#[name = spacetime_procedure_num_http_requests]
#[help = "Number of HTTP requests performed by procedures running in databases.

Should be the sum of `spacetime_procedure_num_successful_http_requests`,
`spacetime_procedure_num_failed_http_requests`, `spacetime_procedure_num_timeout_http_requests`
and `spacetime_procedure_num_in_progress_http_requests`."]
#[labels(db: Identity)]
pub procedure_num_http_requests: IntCounterVec,

#[name = spacetime_procedure_num_successful_http_requests]
#[help = "Number of HTTP requests performed by procedures which terminate successfully, returning a response.

Each HTTP request performed by a database will be counted either here, in `spacetime_procedure_num_failed_http_requests`,
`spacetime_procedure_num_timeout_http_requests` or in `spacetime_procedure_num_in_progress_http_requests`."]
#[labels(db: Identity)]
pub procedure_num_successful_http_requests: IntCounterVec,

#[name = spacetime_procedure_num_failed_http_requests]
#[help = "Number of HTTP requests performed by procedures which fail for reasons other than a timeout.

Each HTTP request performed by a database will be counted either here, in `spacetime_procedure_num_successful_http_requests`,
`spacetime_procedure_num_timeout_http_requests` or in `spacetime_procedure_num_in_progress_http_requests`."]
#[labels(db: Identity)]
pub procedure_num_failed_http_requests: IntCounterVec,

#[name = spacetime_procedure_num_timeout_http_requests]
#[help = "Number of HTTP requests performed by procedures which fail due to a timeout.

Each HTTP request performed by a database will be counted either here, in `spacetime_procedure_num_successful_http_requests`,
`spacetime_procedure_num_failed_http_requests`, or in `spacetime_procedure_num_in_progress_http_requests`."]
#[labels(db: Identity)]
pub procedure_num_timeout_http_requests: IntCounterVec,

#[name = spacetime_procedure_num_in_progress_http_requests]
#[help = "Number of HTTP requests currently in progress within procedures.

Each HTTP request performed by a database will be counted either here, in `spacetime_procedure_num_successful_http_requests`,
`spacetime_procedure_num_failed_http_requests`, or in `spacetime_procedure_num_timeout_http_requests`."]
#[labels(db: Identity)]
pub procedure_num_in_progress_http_requests: IntGaugeVec,
}
);

Expand Down
34 changes: 34 additions & 0 deletions crates/lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ pub struct Request {
pub version: Version,
}

impl Request {
/// Return the size of this request's URI and [`Headers`]
/// for purposes of metrics reporting.
///
/// Ignores the size of the [`Method`] and [`Version`] as they are effectively constant.
///
/// As the body is stored externally to the `Request`, metrics reporting must count its size separately.
pub fn size_in_bytes(&self) -> usize {
self.uri.len() + self.headers.size_in_bytes()
}
}

/// Represents an HTTP method.
#[derive(Clone, SpacetimeType, PartialEq, Eq)]
#[sats(crate = crate, name = "HttpMethod")]
Expand Down Expand Up @@ -112,6 +124,17 @@ impl Headers {
pub fn into_iter(self) -> impl Iterator<Item = (Box<str>, Box<[u8]>)> {
IntoIterator::into_iter(self.entries).map(|HttpHeaderPair { name, value }| (name, value))
}

/// The sum of the lengths of all the header names and header values.
///
/// For headers with multiple values for the same header name,
/// the length of the header name is counted once for each occurence.
fn size_in_bytes(&self) -> usize {
self.entries
.iter()
.map(|HttpHeaderPair { name, value }| name.len() + value.len())
.sum::<usize>()
}
}

#[derive(Clone, SpacetimeType)]
Expand All @@ -131,3 +154,14 @@ pub struct Response {
/// A valid HTTP response status code, sourced from an already-validated `http::StatusCode`.
pub code: u16,
}

impl Response {
/// Return the size of this request's [`Headers`] for purposes of metrics reporting.
///
/// Ignores the size of the `code` and [`Version`] as they are effectively constant.
///
/// As the body is stored externally to the `Response`, metrics reporting must count its size separately.
pub fn size_in_bytes(&self) -> usize {
self.headers.size_in_bytes()
}
}
Loading