Skip to content

Commit f79f21c

Browse files
Revert "Per-database incoming and outgoing queue length metrics (#2773)"
This reverts commit ac18790.
1 parent 8a16a12 commit f79f21c

3 files changed

Lines changed: 31 additions & 90 deletions

File tree

crates/client-api/src/routes/subscribe.rs

Lines changed: 25 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -228,24 +228,6 @@ async fn ws_client_actor_inner(
228228

229229
let addr = client.module.info().database_identity;
230230

231-
// Grab handles on the total incoming and outgoing queue length metrics,
232-
// which we'll increment and decrement as we push into and pull out of those queues.
233-
// Note that `total_outgoing_queue_length` is incremented separately,
234-
// by `ClientConnectionSender::send` in core/src/client/client_connection.rs;
235-
// we're only responsible for decrementing that one.
236-
// Also note that much care must be taken to clean up these metrics when the connection closes!
237-
// Any path which exits this function must decrement each of these metrics
238-
// by the number of messages still waiting in this client's queue,
239-
// or else they will grow without bound as clients disconnect, and be useless.
240-
let incoming_queue_length_metric = WORKER_METRICS.total_incoming_queue_length.with_label_values(&addr);
241-
let outgoing_queue_length_metric = WORKER_METRICS.total_outgoing_queue_length.with_label_values(&addr);
242-
243-
let clean_up_metrics = |message_queue: &VecDeque<(DataMessage, Instant)>,
244-
sendrx: &mpsc::Receiver<SerializableMessage>| {
245-
incoming_queue_length_metric.sub(message_queue.len() as _);
246-
outgoing_queue_length_metric.sub(sendrx.len() as _);
247-
};
248-
249231
loop {
250232
rx_buf.clear();
251233
enum Item {
@@ -254,7 +236,6 @@ async fn ws_client_actor_inner(
254236
}
255237
if let MaybeDone::Gone = *current_message {
256238
if let Some((message, timer)) = message_queue.pop_front() {
257-
incoming_queue_length_metric.dec();
258239
let client = client.clone();
259240
let fut = async move { client.handle_message(message, timer).await };
260241
current_message.set(MaybeDone::Future(fut));
@@ -282,47 +263,43 @@ async fn ws_client_actor_inner(
282263
continue;
283264
}
284265
// the client sent us a close frame
285-
None => {
286-
clean_up_metrics(&message_queue, &sendrx);
287-
break
288-
},
266+
None => break,
289267
},
290268

291269
// If we have an outgoing message to send, send it off.
292270
// No incoming `message` to handle, so `continue`.
293271
Some(n) = sendrx.recv_many(&mut rx_buf, 32).map(|n| (n != 0).then_some(n)) => {
294-
outgoing_queue_length_metric.sub(n as _);
295272
if closed {
296273
// TODO: this isn't great. when we receive a close request from the peer,
297274
// tungstenite doesn't let us send any new messages on the socket,
298275
// even though the websocket RFC allows it. should we fork tungstenite?
299276
log::info!("dropping messages due to ws already being closed: {:?}", &rx_buf[..n]);
300277
} else {
301-
let send_all = async {
302-
for msg in rx_buf.drain(..n) {
303-
let workload = msg.workload();
304-
let num_rows = msg.num_rows();
305-
306-
let msg = datamsg_to_wsmsg(serialize(msg, client.config));
307-
308-
// These metrics should be updated together,
309-
// or not at all.
310-
if let (Some(workload), Some(num_rows)) = (workload, num_rows) {
311-
WORKER_METRICS
312-
.websocket_sent_num_rows
313-
.with_label_values(&addr, &workload)
314-
.observe(num_rows as f64);
315-
WORKER_METRICS
316-
.websocket_sent_msg_size
317-
.with_label_values(&addr, &workload)
318-
.observe(msg.len() as f64);
278+
let send_all = async {
279+
for msg in rx_buf.drain(..n) {
280+
let workload = msg.workload();
281+
let num_rows = msg.num_rows();
282+
283+
let msg = datamsg_to_wsmsg(serialize(msg, client.config));
284+
285+
// These metrics should be updated together,
286+
// or not at all.
287+
if let (Some(workload), Some(num_rows)) = (workload, num_rows) {
288+
WORKER_METRICS
289+
.websocket_sent_num_rows
290+
.with_label_values(&addr, &workload)
291+
.observe(num_rows as f64);
292+
WORKER_METRICS
293+
.websocket_sent_msg_size
294+
.with_label_values(&addr, &workload)
295+
.observe(msg.len() as f64);
296+
}
297+
// feed() buffers the message, but does not necessarily send it
298+
ws.feed(msg).await?;
319299
}
320-
// feed() buffers the message, but does not necessarily send it
321-
ws.feed(msg).await?;
322-
}
323-
// now we flush all the messages to the socket
324-
ws.flush().await
325-
};
300+
// now we flush all the messages to the socket
301+
ws.flush().await
302+
};
326303
// Flush the websocket while continuing to poll the `handle_queue`,
327304
// to avoid deadlocks or delays due to enqueued futures holding resources.
328305
let send_all = also_poll(send_all, make_progress(&mut current_message));
@@ -371,7 +348,6 @@ async fn ws_client_actor_inner(
371348
} else {
372349
// the client never responded to our ping; drop them without trying to send them a Close
373350
log::warn!("client {} timed out", client.id);
374-
clean_up_metrics(&message_queue, &sendrx);
375351
break;
376352
}
377353
}
@@ -386,7 +362,6 @@ async fn ws_client_actor_inner(
386362
match message {
387363
Item::Message(ClientMessage::Message(message)) => {
388364
let timer = Instant::now();
389-
incoming_queue_length_metric.inc();
390365
message_queue.push_back((message, timer))
391366
}
392367
Item::HandleResult(res) => {

crates/core/src/client/client_connection.rs

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use bytes::Bytes;
1616
use bytestring::ByteString;
1717
use derive_more::From;
1818
use futures::prelude::*;
19-
use prometheus::{Histogram, IntCounter, IntGauge};
19+
use prometheus::{Histogram, IntCounter};
2020
use spacetimedb_client_api_messages::websocket::{
2121
BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, Unsubscribe,
2222
UnsubscribeMulti, WebsocketFormat,
@@ -91,15 +91,6 @@ pub struct ClientConnectionSender {
9191
pub struct ClientConnectionMetrics {
9292
pub websocket_request_msg_size: Histogram,
9393
pub websocket_requests: IntCounter,
94-
95-
/// The `total_outgoing_queue_length` metric labeled with this database's `Identity`,
96-
/// which we'll increment whenever sending a message.
97-
///
98-
/// This metric will be decremented, and cleaned up,
99-
/// by `ws_client_actor_inner` in client-api/src/routes/subscribe.rs.
100-
/// Care must be taken not to increment it after the client has disconnected
101-
/// and performed its clean-up.
102-
pub sendtx_queue_size: IntGauge,
10394
}
10495

10596
impl ClientConnectionMetrics {
@@ -111,14 +102,10 @@ impl ClientConnectionMetrics {
111102
let websocket_requests = WORKER_METRICS
112103
.websocket_requests
113104
.with_label_values(&database_identity, message_kind);
114-
let sendtx_queue_size = WORKER_METRICS
115-
.total_outgoing_queue_length
116-
.with_label_values(&database_identity);
117105

118106
Self {
119107
websocket_request_msg_size,
120108
websocket_requests,
121-
sendtx_queue_size,
122109
}
123110
}
124111
}
@@ -139,7 +126,6 @@ impl ClientConnectionSender {
139126
Ok(h) => h.spawn(async {}).abort_handle(),
140127
Err(_) => tokio::runtime::Runtime::new().unwrap().spawn(async {}).abort_handle(),
141128
};
142-
143129
let cancelled = AtomicBool::new(false);
144130
let sender = Self {
145131
id,
@@ -164,27 +150,17 @@ impl ClientConnectionSender {
164150
if self.cancelled.load(Relaxed) {
165151
return Err(ClientSendError::Cancelled);
166152
}
167-
168-
match self.sendtx.try_send(message) {
169-
Err(mpsc::error::TrySendError::Full(_)) => {
153+
self.sendtx.try_send(message).map_err(|e| match e {
154+
mpsc::error::TrySendError::Full(_) => {
170155
// we've hit CLIENT_CHANNEL_CAPACITY messages backed up in
171156
// the channel, so forcibly kick the client
172157
tracing::warn!(identity = %self.id.identity, connection_id = %self.id.connection_id, "client channel capacity exceeded");
173158
self.abort_handle.abort();
174159
self.cancelled.store(true, Relaxed);
175-
return Err(ClientSendError::Cancelled);
160+
ClientSendError::Cancelled
176161
}
177-
Err(mpsc::error::TrySendError::Closed(_)) => return Err(ClientSendError::Disconnected),
178-
Ok(()) => {
179-
// If we successfully pushed a message into the queue, increment the queue size metric.
180-
// Don't do this before pushing because, if the client has disconnected,
181-
// it will already have performed its clean-up,
182-
// and so would never perform the corresponding `dec` to this `inc`.
183-
if let Some(metrics) = &self.metrics {
184-
metrics.sendtx_queue_size.inc();
185-
}
186-
}
187-
}
162+
mpsc::error::TrySendError::Closed(_) => ClientSendError::Disconnected,
163+
})?;
188164

189165
Ok(())
190166
}

crates/core/src/worker_metrics/mod.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -262,16 +262,6 @@ metrics_group!(
262262
#[help = "The number of `ComputedQueries` waiting in the queue to be aggregated and broadcast by the `send_worker`"]
263263
#[labels(database_identity: Identity)]
264264
pub subscription_send_queue_length: IntGaugeVec,
265-
266-
#[name = spacetime_total_incoming_queue_length]
267-
#[help = "The number of client -> server WebSocket messages waiting any client's incoming queue"]
268-
#[labels(db: Identity)]
269-
pub total_incoming_queue_length: IntGaugeVec,
270-
271-
#[name = spacetime_total_outgoing_queue_length]
272-
#[help = "The number of server -> client WebSocket messages waiting in any client's outgoing queue"]
273-
#[labels(db: Identity)]
274-
pub total_outgoing_queue_length: IntGaugeVec,
275265
}
276266
);
277267

0 commit comments

Comments
 (0)