diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 51d51120ec0..64ffc4e3e3d 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -228,6 +228,24 @@ async fn ws_client_actor_inner( let addr = client.module.info().database_identity; + // Grab handles on the total incoming and outgoing queue length metrics, + // which we'll increment and decrement as we push into and pull out of those queues. + // Note that `total_outgoing_queue_length` is incremented separately, + // by `ClientConnectionSender::send` in core/src/client/client_connection.rs; + // we're only responsible for decrementing that one. + // Also note that much care must be taken to clean up these metrics when the connection closes! + // Any path which exits this function must decrement each of these metrics + // by the number of messages still waiting in this client's queue, + // or else they will grow without bound as clients disconnect, and be useless. + let incoming_queue_length_metric = WORKER_METRICS.total_incoming_queue_length.with_label_values(&addr); + let outgoing_queue_length_metric = WORKER_METRICS.total_outgoing_queue_length.with_label_values(&addr); + + let clean_up_metrics = |message_queue: &VecDeque<(DataMessage, Instant)>, + sendrx: &mpsc::Receiver| { + incoming_queue_length_metric.sub(message_queue.len() as _); + outgoing_queue_length_metric.sub(sendrx.len() as _); + }; + loop { rx_buf.clear(); enum Item { @@ -236,6 +254,7 @@ async fn ws_client_actor_inner( } if let MaybeDone::Gone = *current_message { if let Some((message, timer)) = message_queue.pop_front() { + incoming_queue_length_metric.dec(); let client = client.clone(); let fut = async move { client.handle_message(message, timer).await }; current_message.set(MaybeDone::Future(fut)); @@ -263,43 +282,47 @@ async fn ws_client_actor_inner( continue; } // the client sent us a close frame - None => break, + None => { + clean_up_metrics(&message_queue, &sendrx); + break + }, }, // If we have an outgoing message to send, send it off. // No incoming `message` to handle, so `continue`. Some(n) = sendrx.recv_many(&mut rx_buf, 32).map(|n| (n != 0).then_some(n)) => { + outgoing_queue_length_metric.sub(n as _); if closed { // TODO: this isn't great. when we receive a close request from the peer, // tungstenite doesn't let us send any new messages on the socket, // even though the websocket RFC allows it. should we fork tungstenite? log::info!("dropping messages due to ws already being closed: {:?}", &rx_buf[..n]); } else { - let send_all = async { - for msg in rx_buf.drain(..n) { - let workload = msg.workload(); - let num_rows = msg.num_rows(); - - let msg = datamsg_to_wsmsg(serialize(msg, client.config)); - - // These metrics should be updated together, - // or not at all. - if let (Some(workload), Some(num_rows)) = (workload, num_rows) { - WORKER_METRICS - .websocket_sent_num_rows - .with_label_values(&addr, &workload) - .observe(num_rows as f64); - WORKER_METRICS - .websocket_sent_msg_size - .with_label_values(&addr, &workload) - .observe(msg.len() as f64); - } - // feed() buffers the message, but does not necessarily send it - ws.feed(msg).await?; + let send_all = async { + for msg in rx_buf.drain(..n) { + let workload = msg.workload(); + let num_rows = msg.num_rows(); + + let msg = datamsg_to_wsmsg(serialize(msg, client.config)); + + // These metrics should be updated together, + // or not at all. + if let (Some(workload), Some(num_rows)) = (workload, num_rows) { + WORKER_METRICS + .websocket_sent_num_rows + .with_label_values(&addr, &workload) + .observe(num_rows as f64); + WORKER_METRICS + .websocket_sent_msg_size + .with_label_values(&addr, &workload) + .observe(msg.len() as f64); } - // now we flush all the messages to the socket - ws.flush().await - }; + // feed() buffers the message, but does not necessarily send it + ws.feed(msg).await?; + } + // now we flush all the messages to the socket + ws.flush().await + }; // Flush the websocket while continuing to poll the `handle_queue`, // to avoid deadlocks or delays due to enqueued futures holding resources. let send_all = also_poll(send_all, make_progress(&mut current_message)); @@ -348,6 +371,7 @@ async fn ws_client_actor_inner( } else { // the client never responded to our ping; drop them without trying to send them a Close log::warn!("client {} timed out", client.id); + clean_up_metrics(&message_queue, &sendrx); break; } } @@ -362,6 +386,7 @@ async fn ws_client_actor_inner( match message { Item::Message(ClientMessage::Message(message)) => { let timer = Instant::now(); + incoming_queue_length_metric.inc(); message_queue.push_back((message, timer)) } Item::HandleResult(res) => { diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 24156019fc9..a1a537ae5c5 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -16,7 +16,7 @@ use bytes::Bytes; use bytestring::ByteString; use derive_more::From; use futures::prelude::*; -use prometheus::{Histogram, IntCounter}; +use prometheus::{Histogram, IntCounter, IntGauge}; use spacetimedb_client_api_messages::websocket::{ BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, Unsubscribe, UnsubscribeMulti, WebsocketFormat, @@ -91,6 +91,15 @@ pub struct ClientConnectionSender { pub struct ClientConnectionMetrics { pub websocket_request_msg_size: Histogram, pub websocket_requests: IntCounter, + + /// The `total_outgoing_queue_length` metric labeled with this database's `Identity`, + /// which we'll increment whenever sending a message. + /// + /// This metric will be decremented, and cleaned up, + /// by `ws_client_actor_inner` in client-api/src/routes/subscribe.rs. + /// Care must be taken not to increment it after the client has disconnected + /// and performed its clean-up. + pub sendtx_queue_size: IntGauge, } impl ClientConnectionMetrics { @@ -102,10 +111,14 @@ impl ClientConnectionMetrics { let websocket_requests = WORKER_METRICS .websocket_requests .with_label_values(&database_identity, message_kind); + let sendtx_queue_size = WORKER_METRICS + .total_outgoing_queue_length + .with_label_values(&database_identity); Self { websocket_request_msg_size, websocket_requests, + sendtx_queue_size, } } } @@ -126,6 +139,7 @@ impl ClientConnectionSender { Ok(h) => h.spawn(async {}).abort_handle(), Err(_) => tokio::runtime::Runtime::new().unwrap().spawn(async {}).abort_handle(), }; + let cancelled = AtomicBool::new(false); let sender = Self { id, @@ -150,17 +164,27 @@ impl ClientConnectionSender { if self.cancelled.load(Relaxed) { return Err(ClientSendError::Cancelled); } - self.sendtx.try_send(message).map_err(|e| match e { - mpsc::error::TrySendError::Full(_) => { + + match self.sendtx.try_send(message) { + Err(mpsc::error::TrySendError::Full(_)) => { // we've hit CLIENT_CHANNEL_CAPACITY messages backed up in // the channel, so forcibly kick the client tracing::warn!(identity = %self.id.identity, connection_id = %self.id.connection_id, "client channel capacity exceeded"); self.abort_handle.abort(); self.cancelled.store(true, Relaxed); - ClientSendError::Cancelled + return Err(ClientSendError::Cancelled); } - mpsc::error::TrySendError::Closed(_) => ClientSendError::Disconnected, - })?; + Err(mpsc::error::TrySendError::Closed(_)) => return Err(ClientSendError::Disconnected), + Ok(()) => { + // If we successfully pushed a message into the queue, increment the queue size metric. + // Don't do this before pushing because, if the client has disconnected, + // it will already have performed its clean-up, + // and so would never perform the corresponding `dec` to this `inc`. + if let Some(metrics) = &self.metrics { + metrics.sendtx_queue_size.inc(); + } + } + } Ok(()) } diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 377d08dfc91..13923494709 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -257,6 +257,16 @@ metrics_group!( #[help = "The cumulative number of bytes sent to clients"] #[labels(txn_type: WorkloadType, db: Identity)] pub bytes_sent_to_clients: IntCounterVec, + + #[name = spacetime_total_incoming_queue_length] + #[help = "The number of client -> server WebSocket messages waiting any client's incoming queue"] + #[labels(db: Identity)] + pub total_incoming_queue_length: IntGaugeVec, + + #[name = spacetime_total_outgoing_queue_length] + #[help = "The number of server -> client WebSocket messages waiting in any client's outgoing queue"] + #[labels(db: Identity)] + pub total_outgoing_queue_length: IntGaugeVec, } );