Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,36 @@ metrics_group!(
#[help = "Number of subscriptions via the legacy api"]
#[labels(database_identity: Identity)]
pub num_legacy_subscriptions: IntGaugeVec,

#[name = spacetime_subscription_compile_time_sec]
#[help = "How much time (in seconds) do we spend compiling subscriptions"]
#[labels(db: Identity, workload: WorkloadType)]
pub subscription_compile_time: HistogramVec,

#[name = spacetime_subscription_lock_num_waiters]
#[help = "The number of clients waiting to acquire the subscription lock"]
#[labels(db: Identity, workload: WorkloadType)]
pub subscription_lock_waiters: IntGaugeVec,

#[name = spacetime_subscription_lock_wait_time_sec]
#[help = "How much time (in seconds) do we spend waiting to acquire the subscription lock"]
#[labels(db: Identity, workload: WorkloadType)]
pub subscription_lock_wait_time: HistogramVec,

#[name = spacetime_num_queries_subscribed]
#[help = "How many total queries make up each subscribe call"]
#[labels(db: Identity)]
pub num_queries_subscribed: IntCounterVec,

#[name = spacetime_num_new_queries_subscribed]
#[help = "How many new (uncached) queries are make up each subscribe call"]
#[labels(db: Identity)]
pub num_new_queries_subscribed: IntCounterVec,

#[name = spacetime_num_queries_evaluated]
#[help = "How many queries are evaluated in each subscribe and unsubscribe"]
#[labels(db: Identity, workload: WorkloadType)]
pub num_queries_evaluated: IntCounterVec,
}
);

Expand Down
143 changes: 128 additions & 15 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ use crate::db::db_metrics::DB_METRICS;
use crate::db::relational_db::{MutTx, RelationalDB, Tx};
use crate::error::DBError;
use crate::estimation::estimate_rows_scanned;
use crate::execution_context::Workload;
use crate::execution_context::{Workload, WorkloadType};
use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent};
use crate::messages::websocket::Subscribe;
use crate::subscription::execute_plans;
use crate::subscription::query::is_subscribe_to_all_tables;
use crate::util::prometheus_handle::IntGaugeExt;
use crate::vm::check_row_limit;
use crate::worker_metrics::WORKER_METRICS;
use parking_lot::RwLock;
use prometheus::IntGauge;
use prometheus::{Histogram, HistogramTimer, IntCounter, IntGauge};
use spacetimedb_client_api_messages::websocket::{
self as ws, BsatnFormat, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, TableUpdate, Unsubscribe,
UnsubscribeMulti,
Expand Down Expand Up @@ -100,6 +101,28 @@ impl SubscriptionGauges {
}
}

pub struct SubscriptionMetrics {
pub lock_waiters: IntGauge,
pub lock_wait_time: Histogram,
pub compilation_time: Histogram,
pub num_queries_subscribed: IntCounter,
pub num_new_queries_subscribed: IntCounter,
pub num_queries_evaluated: IntCounter,
}

impl SubscriptionMetrics {
pub fn new(db: &Identity, workload: &WorkloadType) -> Self {
Self {
lock_waiters: DB_METRICS.subscription_lock_waiters.with_label_values(db, workload),
lock_wait_time: DB_METRICS.subscription_lock_wait_time.with_label_values(db, workload),
compilation_time: DB_METRICS.subscription_compile_time.with_label_values(db, workload),
num_queries_subscribed: DB_METRICS.num_queries_subscribed.with_label_values(db),
num_new_queries_subscribed: DB_METRICS.num_new_queries_subscribed.with_label_values(db),
num_queries_evaluated: DB_METRICS.num_queries_evaluated.with_label_values(db, workload),
}
}
}

type AssertTxFn = Arc<dyn Fn(&Tx)>;
type SubscriptionUpdate = FormatSwitch<TableUpdate<BsatnFormat>, TableUpdate<JsonFormat>>;
type FullSubscriptionUpdate = FormatSwitch<ws::DatabaseUpdate<BsatnFormat>, ws::DatabaseUpdate<JsonFormat>>;
Expand Down Expand Up @@ -453,14 +476,21 @@ impl ModuleSubscriptions {
)
};

let subscription_metrics = SubscriptionMetrics::new(&self.owner_identity, &WorkloadType::Unsubscribe);

// Always lock the db before the subscription lock to avoid deadlocks.
let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| {
let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
self.relational_db.report(&reducer, &tx_metrics, None);
});

let removed_queries = {
let mut subscriptions = self.subscriptions.write();
let mut subscriptions = {
// How contended is the lock?
let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
self.subscriptions.write()
};

return_on_err!(
subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id),
Expand All @@ -481,6 +511,11 @@ impl ModuleSubscriptions {
None
);

// How many queries did we evaluate?
subscription_metrics
.num_queries_evaluated
.inc_by(removed_queries.len() as _);

// Note: to make sure transaction updates are consistent, we need to put this in the broadcast
// queue while we are still holding a read-lock on the database.

Expand Down Expand Up @@ -513,12 +548,14 @@ impl ModuleSubscriptions {
///
/// Instead we generate two hashes and outside of the tx lock.
/// If either one is currently tracked, we can avoid recompilation.
#[allow(clippy::type_complexity)]
fn compile_queries(
&self,
sender: Identity,
queries: impl IntoIterator<Item = Box<str>>,
num_queries: usize,
) -> Result<(Vec<Arc<Plan>>, AuthCtx, TxId), DBError> {
metrics: &SubscriptionMetrics,
) -> Result<(Vec<Arc<Plan>>, AuthCtx, TxId, HistogramTimer), DBError> {
let mut subscribe_to_all_tables = false;
let mut plans = Vec::with_capacity(num_queries);
let mut query_hashes = Vec::with_capacity(num_queries);
Expand All @@ -540,7 +577,15 @@ impl ModuleSubscriptions {
let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
self.relational_db.report(&reducer, &tx_metrics, None);
});
let guard = self.subscriptions.read();

let compile_timer = metrics.compilation_time.start_timer();

let guard = {
// How contended is the lock?
let _wait_guard = metrics.lock_waiters.inc_scope();
let _wait_timer = metrics.lock_wait_time.start_timer();
self.subscriptions.read()
};

if subscribe_to_all_tables {
plans.extend(
Expand All @@ -550,6 +595,8 @@ impl ModuleSubscriptions {
);
}

let mut new_queries = 0;

for (sql, hash, hash_with_param) in query_hashes {
if let Some(unit) = guard.query(&hash) {
plans.push(unit);
Expand All @@ -564,10 +611,14 @@ impl ModuleSubscriptions {
}
})?,
));
new_queries += 1;
}
}

Ok((plans, auth, scopeguard::ScopeGuard::into_inner(tx)))
// How many queries in this subscription are not cached?
metrics.num_new_queries_subscribed.inc_by(new_queries);

Ok((plans, auth, scopeguard::ScopeGuard::into_inner(tx), compile_timer))
}

/// Send a message to a client connection.
Expand Down Expand Up @@ -607,8 +658,19 @@ impl ModuleSubscriptions {
};

let num_queries = request.query_strings.len();
let (queries, auth, tx) = return_on_err!(
self.compile_queries(sender.id.identity, request.query_strings, num_queries),

let subscription_metrics = SubscriptionMetrics::new(&self.owner_identity, &WorkloadType::Subscribe);

// How many queries make up this subscription?
subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);

let (queries, auth, tx, compile_timer) = return_on_err!(
self.compile_queries(
sender.id.identity,
request.query_strings,
num_queries,
&subscription_metrics
),
send_err_msg,
None
);
Expand All @@ -622,21 +684,41 @@ impl ModuleSubscriptions {
// an `commit_and_broadcast_event` grabs a read lock on `subscriptions` while it still has a
// write lock on the db.
let queries = {
let mut subscriptions = self.subscriptions.write();
let mut subscriptions = {
// How contended is the lock?
let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
self.subscriptions.write()
};

subscriptions.add_subscription_multi(sender.clone(), queries, request.query_id)?
};

// Record how long it took to compile the subscription
drop(compile_timer);

let Ok((update, metrics)) =
self.evaluate_queries(sender.clone(), &queries, &tx, &auth, TableUpdateType::Subscribe)
else {
// If we fail the query, we need to remove the subscription.
let mut subscriptions = self.subscriptions.write();
subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id)?;
let mut subscriptions = {
// How contended is the lock?
let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
self.subscriptions.write()
};
{
let _compile_timer = subscription_metrics.compilation_time.start_timer();
subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id)?;
}

send_err_msg("Internal error evaluating queries".into());
return Ok(None);
};

// How many queries did we actually evaluate?
subscription_metrics.num_queries_evaluated.inc_by(queries.len() as _);

#[cfg(test)]
if let Some(assert) = _assert {
assert(&tx);
Expand Down Expand Up @@ -674,7 +756,17 @@ impl ModuleSubscriptions {
_assert: Option<AssertTxFn>,
) -> Result<ExecutionMetrics, DBError> {
let num_queries = subscription.query_strings.len();
let (queries, auth, tx) = self.compile_queries(sender.id.identity, subscription.query_strings, num_queries)?;
let subscription_metrics = SubscriptionMetrics::new(&self.owner_identity, &WorkloadType::Subscribe);

// How many queries make up this subscription?
subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);

let (queries, auth, tx, compile_timer) = self.compile_queries(
sender.id.identity,
subscription.query_strings,
num_queries,
&subscription_metrics,
)?;
let tx = scopeguard::guard(tx, |tx| {
let (tx_metrics, reducer) = self.relational_db.release_tx(tx);
self.relational_db.report(&reducer, &tx_metrics, None);
Expand All @@ -692,6 +784,9 @@ impl ModuleSubscriptions {
&auth,
)?;

// Record how long it took to compile the subscription
drop(compile_timer);

let tx = DeltaTx::from(&*tx);
let (database_update, metrics) = match sender.config.protocol {
Protocol::Binary => execute_plans(&queries, &tx, TableUpdateType::Subscribe)
Expand All @@ -703,8 +798,18 @@ impl ModuleSubscriptions {
// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
// but that should not pose an issue.
let mut subscriptions = self.subscriptions.write();
subscriptions.set_legacy_subscription(sender.clone(), queries.into_iter());
{
let _compile_timer = subscription_metrics.compilation_time.start_timer();

let mut subscriptions = {
// How contended is the lock?
let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
self.subscriptions.write()
};

subscriptions.set_legacy_subscription(sender.clone(), queries.into_iter());
}

#[cfg(test)]
if let Some(assert) = _assert {
Expand Down Expand Up @@ -745,9 +850,17 @@ impl ModuleSubscriptions {
mut event: ModuleEvent,
tx: MutTx,
) -> Result<Result<(Arc<ModuleEvent>, ExecutionMetrics), WriteConflict>, DBError> {
let subscription_metrics = SubscriptionMetrics::new(&self.owner_identity, &WorkloadType::Update);

// Take a read lock on `subscriptions` before committing tx
// else it can result in subscriber receiving duplicate updates.
let subscriptions = self.subscriptions.read();
let subscriptions = {
// How contended is the lock?
let _wait_guard = subscription_metrics.lock_waiters.inc_scope();
let _wait_timer = subscription_metrics.lock_wait_time.start_timer();
self.subscriptions.read()
};

let stdb = &self.relational_db;
// Downgrade mutable tx.
// We'll later ensure tx is released/cleaned up once out of scope.
Expand Down
Loading