From 03e25c728aca81262ff879d1e9fd9ce342f351fd Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Mon, 19 May 2025 10:55:34 -0400 Subject: [PATCH 1/3] Add `client_connection_incoming_queue_length` metric It's a metric that tracks the size of the incoming per-client `message_queue`. We've been concerned about not having visibility into this queue's length, as currently we only have the length of the per-database reducer queue, but each client can only have a single reducer in that queue, and may have additional messages waiting in its per-client queue. The new metric, `spacetime_client_connection_incoming_queue_length`, is an `IntGaugeVec` with the labels: `db: Identity, client_identity: Identity, connection_id: ConnectionId`. My theory is that in our viewer we can inspect the average and the sum per database, and it also may be interesting to be able to look at individual clients, as e.g. the BitCraft mob monitor may be a notable outlier. This is not the same pattern as most of our other metrics, though, which tend to only offer per-database granularity. It's possible that this new metric should also have the last two labels removed, and be labeled only on `db: Identity`. --- crates/client-api/src/routes/subscribe.rs | 17 +++++++++++++++++ crates/core/src/worker_metrics/mod.rs | 5 +++++ 2 files changed, 22 insertions(+) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 51d51120ec0..aaa0463f875 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -228,6 +228,21 @@ async fn ws_client_actor_inner( let addr = client.module.info().database_identity; + let client_identity = client.sender().id.identity; + let connection_id = client.sender().id.connection_id; + + scopeguard::defer!( + if let Err(e) = WORKER_METRICS + .client_connection_incoming_queue_length + .remove_label_values(&addr, &client_identity, &connection_id) { + log::error!("Failed to `remove_label_values` for `client_connection_incoming_queue_length`: {e:?}"); + }; + ); + + let queue_length_metric = WORKER_METRICS + .client_connection_incoming_queue_length + .with_label_values(&addr, &client_identity, &connection_id); + loop { rx_buf.clear(); enum Item { @@ -236,6 +251,7 @@ async fn ws_client_actor_inner( } if let MaybeDone::Gone = *current_message { if let Some((message, timer)) = message_queue.pop_front() { + queue_length_metric.dec(); let client = client.clone(); let fut = async move { client.handle_message(message, timer).await }; current_message.set(MaybeDone::Future(fut)); @@ -362,6 +378,7 @@ async fn ws_client_actor_inner( match message { Item::Message(ClientMessage::Message(message)) => { let timer = Instant::now(); + queue_length_metric.inc(); message_queue.push_back((message, timer)) } Item::HandleResult(res) => { diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 23dfc9c8592..4fbc805266c 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -257,6 +257,11 @@ metrics_group!( #[help = "The cumulative number of bytes sent to clients"] #[labels(txn_type: WorkloadType, db: Identity)] pub bytes_sent_to_clients: IntCounterVec, + + #[name = spacetime_client_connection_incoming_queue_length] + #[help = "The number of client -> server WebSocket messages waiting in a client connection's incoming queue"] + #[labels(db: Identity, client_identity: Identity, connection_id: ConnectionId)] + pub client_connection_incoming_queue_length: IntGaugeVec, } ); From 4ec6c257d974c668c20b7b196680c7cdf20b0588 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Tue, 20 May 2025 14:18:31 -0400 Subject: [PATCH 2/3] Add `client_connection_outgoing_queue_length` metric Like the previous metric added in this branch, it's per-client, so we'll use it for testing, but likely not merge it into master. I'll follow up in a separate PR with a version that's per-database instead. --- crates/client-api/src/routes/subscribe.rs | 17 ++++++++++++++--- crates/core/src/client/client_connection.rs | 14 ++++++++++++++ crates/core/src/worker_metrics/mod.rs | 5 +++++ 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index aaa0463f875..54a02806dec 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -237,12 +237,22 @@ async fn ws_client_actor_inner( .remove_label_values(&addr, &client_identity, &connection_id) { log::error!("Failed to `remove_label_values` for `client_connection_incoming_queue_length`: {e:?}"); }; + + if let Err(e) = WORKER_METRICS + .client_connection_outgoing_queue_length + .remove_label_values(&addr, &client_identity, &connection_id) { + log::error!("Failed to `remove_label_values` for `client_connection_outgoing_queue_length`: {e:?}"); + } ); - let queue_length_metric = WORKER_METRICS + let incoming_queue_length_metric = WORKER_METRICS .client_connection_incoming_queue_length .with_label_values(&addr, &client_identity, &connection_id); + let outgoing_queue_length_metric = WORKER_METRICS + .client_connection_outgoing_queue_length + .with_label_values(&addr, &client_identity, &connection_id); + loop { rx_buf.clear(); enum Item { @@ -251,7 +261,7 @@ async fn ws_client_actor_inner( } if let MaybeDone::Gone = *current_message { if let Some((message, timer)) = message_queue.pop_front() { - queue_length_metric.dec(); + incoming_queue_length_metric.dec(); let client = client.clone(); let fut = async move { client.handle_message(message, timer).await }; current_message.set(MaybeDone::Future(fut)); @@ -285,6 +295,7 @@ async fn ws_client_actor_inner( // If we have an outgoing message to send, send it off. // No incoming `message` to handle, so `continue`. Some(n) = sendrx.recv_many(&mut rx_buf, 32).map(|n| (n != 0).then_some(n)) => { + outgoing_queue_length_metric.sub(n as _); if closed { // TODO: this isn't great. when we receive a close request from the peer, // tungstenite doesn't let us send any new messages on the socket, @@ -378,7 +389,7 @@ async fn ws_client_actor_inner( match message { Item::Message(ClientMessage::Message(message)) => { let timer = Instant::now(); - queue_length_metric.inc(); + incoming_queue_length_metric.inc(); message_queue.push_back((message, timer)) } Item::HandleResult(res) => { diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 2f94bbcb58b..5ac9be99bd4 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -16,6 +16,7 @@ use bytes::Bytes; use bytestring::ByteString; use derive_more::From; use futures::prelude::*; +use prometheus::IntGauge; use spacetimedb_client_api_messages::websocket::{ BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, Unsubscribe, UnsubscribeMulti, WebsocketFormat, @@ -69,6 +70,7 @@ pub struct ClientConnectionSender { sendtx: mpsc::Sender, abort_handle: AbortHandle, cancelled: AtomicBool, + sendtx_queue_size_metric: Option, } #[derive(Debug, thiserror::Error)] @@ -87,6 +89,7 @@ impl ClientConnectionSender { Ok(h) => h.spawn(async {}).abort_handle(), Err(_) => tokio::runtime::Runtime::new().unwrap().spawn(async {}).abort_handle(), }; + ( Self { id, @@ -94,6 +97,7 @@ impl ClientConnectionSender { sendtx, abort_handle, cancelled: AtomicBool::new(false), + sendtx_queue_size_metric: None, }, rx, ) @@ -111,6 +115,11 @@ impl ClientConnectionSender { if self.cancelled.load(Relaxed) { return Err(ClientSendError::Cancelled); } + + if let Some(metric) = &self.sendtx_queue_size_metric { + metric.inc(); + } + self.sendtx.try_send(message).map_err(|e| match e { mpsc::error::TrySendError::Full(_) => { // we've hit CLIENT_CHANNEL_CAPACITY messages backed up in @@ -216,12 +225,17 @@ impl ClientConnection { }) .abort_handle(); + let sendtx_queue_size_metric = WORKER_METRICS + .client_connection_outgoing_queue_length + .with_label_values(&db, &id.identity, &id.connection_id); + let sender = Arc::new(ClientConnectionSender { id, config, sendtx, abort_handle, cancelled: AtomicBool::new(false), + sendtx_queue_size_metric: Some(sendtx_queue_size_metric), }); let this = Self { sender, diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 4fbc805266c..4b67066003a 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -262,6 +262,11 @@ metrics_group!( #[help = "The number of client -> server WebSocket messages waiting in a client connection's incoming queue"] #[labels(db: Identity, client_identity: Identity, connection_id: ConnectionId)] pub client_connection_incoming_queue_length: IntGaugeVec, + + #[name = spacetime_client_connection_outgoing_queue_length] + #[help = "The number of server -> client WebSocket messages waiting in a client connection's outgoing queue"] + #[labels(db: Identity, client_identity: Identity, connection_id: ConnectionId)] + pub client_connection_outgoing_queue_length: IntGaugeVec, } ); From c00797660a24913f8323e2cd34aa27f8f7acb88d Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 21 May 2025 11:44:25 -0400 Subject: [PATCH 3/3] Less granular message queue length metrics This commit alters the message queue length metrics introduced by https://github.com/clockworklabs/SpacetimeDB/pull/2754 to be per-database, rather than per-client. This should limit the cardinality of these metrics, and better lines up with the labels of our other metrics. Because the metrics are now per-database rather than per-client, it's no longer correct to just drop the label when the client disconnects. Instead, care must be taken to decrement the metric by the number of messages which were waiting in the queue at the time of the disconnection. I've added comments to call attention to this complexity. --- crates/client-api/src/routes/subscribe.rs | 95 ++++++++++----------- crates/core/src/client/client_connection.rs | 34 +++++--- crates/core/src/worker_metrics/mod.rs | 18 ++-- 3 files changed, 77 insertions(+), 70 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 54a02806dec..64ffc4e3e3d 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -228,30 +228,23 @@ async fn ws_client_actor_inner( let addr = client.module.info().database_identity; - let client_identity = client.sender().id.identity; - let connection_id = client.sender().id.connection_id; - - scopeguard::defer!( - if let Err(e) = WORKER_METRICS - .client_connection_incoming_queue_length - .remove_label_values(&addr, &client_identity, &connection_id) { - log::error!("Failed to `remove_label_values` for `client_connection_incoming_queue_length`: {e:?}"); - }; - - if let Err(e) = WORKER_METRICS - .client_connection_outgoing_queue_length - .remove_label_values(&addr, &client_identity, &connection_id) { - log::error!("Failed to `remove_label_values` for `client_connection_outgoing_queue_length`: {e:?}"); - } - ); - - let incoming_queue_length_metric = WORKER_METRICS - .client_connection_incoming_queue_length - .with_label_values(&addr, &client_identity, &connection_id); - - let outgoing_queue_length_metric = WORKER_METRICS - .client_connection_outgoing_queue_length - .with_label_values(&addr, &client_identity, &connection_id); + // Grab handles on the total incoming and outgoing queue length metrics, + // which we'll increment and decrement as we push into and pull out of those queues. + // Note that `total_outgoing_queue_length` is incremented separately, + // by `ClientConnectionSender::send` in core/src/client/client_connection.rs; + // we're only responsible for decrementing that one. + // Also note that much care must be taken to clean up these metrics when the connection closes! + // Any path which exits this function must decrement each of these metrics + // by the number of messages still waiting in this client's queue, + // or else they will grow without bound as clients disconnect, and be useless. + let incoming_queue_length_metric = WORKER_METRICS.total_incoming_queue_length.with_label_values(&addr); + let outgoing_queue_length_metric = WORKER_METRICS.total_outgoing_queue_length.with_label_values(&addr); + + let clean_up_metrics = |message_queue: &VecDeque<(DataMessage, Instant)>, + sendrx: &mpsc::Receiver| { + incoming_queue_length_metric.sub(message_queue.len() as _); + outgoing_queue_length_metric.sub(sendrx.len() as _); + }; loop { rx_buf.clear(); @@ -289,7 +282,10 @@ async fn ws_client_actor_inner( continue; } // the client sent us a close frame - None => break, + None => { + clean_up_metrics(&message_queue, &sendrx); + break + }, }, // If we have an outgoing message to send, send it off. @@ -302,31 +298,31 @@ async fn ws_client_actor_inner( // even though the websocket RFC allows it. should we fork tungstenite? log::info!("dropping messages due to ws already being closed: {:?}", &rx_buf[..n]); } else { - let send_all = async { - for msg in rx_buf.drain(..n) { - let workload = msg.workload(); - let num_rows = msg.num_rows(); - - let msg = datamsg_to_wsmsg(serialize(msg, client.config)); - - // These metrics should be updated together, - // or not at all. - if let (Some(workload), Some(num_rows)) = (workload, num_rows) { - WORKER_METRICS - .websocket_sent_num_rows - .with_label_values(&addr, &workload) - .observe(num_rows as f64); - WORKER_METRICS - .websocket_sent_msg_size - .with_label_values(&addr, &workload) - .observe(msg.len() as f64); - } - // feed() buffers the message, but does not necessarily send it - ws.feed(msg).await?; + let send_all = async { + for msg in rx_buf.drain(..n) { + let workload = msg.workload(); + let num_rows = msg.num_rows(); + + let msg = datamsg_to_wsmsg(serialize(msg, client.config)); + + // These metrics should be updated together, + // or not at all. + if let (Some(workload), Some(num_rows)) = (workload, num_rows) { + WORKER_METRICS + .websocket_sent_num_rows + .with_label_values(&addr, &workload) + .observe(num_rows as f64); + WORKER_METRICS + .websocket_sent_msg_size + .with_label_values(&addr, &workload) + .observe(msg.len() as f64); } - // now we flush all the messages to the socket - ws.flush().await - }; + // feed() buffers the message, but does not necessarily send it + ws.feed(msg).await?; + } + // now we flush all the messages to the socket + ws.flush().await + }; // Flush the websocket while continuing to poll the `handle_queue`, // to avoid deadlocks or delays due to enqueued futures holding resources. let send_all = also_poll(send_all, make_progress(&mut current_message)); @@ -375,6 +371,7 @@ async fn ws_client_actor_inner( } else { // the client never responded to our ping; drop them without trying to send them a Close log::warn!("client {} timed out", client.id); + clean_up_metrics(&message_queue, &sendrx); break; } } diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 5ac9be99bd4..27c0b380d10 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -70,6 +70,13 @@ pub struct ClientConnectionSender { sendtx: mpsc::Sender, abort_handle: AbortHandle, cancelled: AtomicBool, + /// The `total_outgoing_queue_length` metric labeled with this database's `Identity`, + /// which we'll increment whenever sending a message. + /// + /// This metric will be decremented, and cleaned up, + /// by `ws_client_actor_inner` in client-api/src/routes/subscribe.rs. + /// Care must be taken not to increment it after the client has disconnected + /// and performed its clean-up. sendtx_queue_size_metric: Option, } @@ -116,21 +123,26 @@ impl ClientConnectionSender { return Err(ClientSendError::Cancelled); } - if let Some(metric) = &self.sendtx_queue_size_metric { - metric.inc(); - } - - self.sendtx.try_send(message).map_err(|e| match e { - mpsc::error::TrySendError::Full(_) => { + match self.sendtx.try_send(message) { + Err(mpsc::error::TrySendError::Full(_)) => { // we've hit CLIENT_CHANNEL_CAPACITY messages backed up in // the channel, so forcibly kick the client tracing::warn!(identity = %self.id.identity, connection_id = %self.id.connection_id, "client channel capacity exceeded"); self.abort_handle.abort(); self.cancelled.store(true, Relaxed); - ClientSendError::Cancelled + return Err(ClientSendError::Cancelled); } - mpsc::error::TrySendError::Closed(_) => ClientSendError::Disconnected, - })?; + Err(mpsc::error::TrySendError::Closed(_)) => return Err(ClientSendError::Disconnected), + Ok(()) => { + // If we successfully pushed a message into the queue, increment the queue size metric. + // Don't do this before pushing because, if the client has disconnected, + // it will already have performed its clean-up, + // and so would never perform the corresponding `dec` to this `inc`. + if let Some(metric) = &self.sendtx_queue_size_metric { + metric.inc(); + } + } + } Ok(()) } @@ -225,9 +237,7 @@ impl ClientConnection { }) .abort_handle(); - let sendtx_queue_size_metric = WORKER_METRICS - .client_connection_outgoing_queue_length - .with_label_values(&db, &id.identity, &id.connection_id); + let sendtx_queue_size_metric = WORKER_METRICS.total_outgoing_queue_length.with_label_values(&db); let sender = Arc::new(ClientConnectionSender { id, diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 4b67066003a..e1518124a75 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -258,15 +258,15 @@ metrics_group!( #[labels(txn_type: WorkloadType, db: Identity)] pub bytes_sent_to_clients: IntCounterVec, - #[name = spacetime_client_connection_incoming_queue_length] - #[help = "The number of client -> server WebSocket messages waiting in a client connection's incoming queue"] - #[labels(db: Identity, client_identity: Identity, connection_id: ConnectionId)] - pub client_connection_incoming_queue_length: IntGaugeVec, - - #[name = spacetime_client_connection_outgoing_queue_length] - #[help = "The number of server -> client WebSocket messages waiting in a client connection's outgoing queue"] - #[labels(db: Identity, client_identity: Identity, connection_id: ConnectionId)] - pub client_connection_outgoing_queue_length: IntGaugeVec, + #[name = spacetime_total_incoming_queue_length] + #[help = "The number of client -> server WebSocket messages waiting any client's incoming queue"] + #[labels(db: Identity)] + pub total_incoming_queue_length: IntGaugeVec, + + #[name = spacetime_total_outgoing_queue_length] + #[help = "The number of server -> client WebSocket messages waiting in any client's outgoing queue"] + #[labels(db: Identity)] + pub total_outgoing_queue_length: IntGaugeVec, } );