Skip to content

Commit f3c0634

Browse files
Use same approach for client incoming message queue
1 parent c2762dd commit f3c0634

3 files changed

Lines changed: 63 additions & 25 deletions

File tree

crates/client-api/src/routes/subscribe.rs

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::collections::VecDeque;
21
use std::mem;
32
use std::pin::{pin, Pin};
43
use std::time::Duration;
@@ -16,7 +15,8 @@ use scopeguard::ScopeGuard;
1615
use serde::Deserialize;
1716
use spacetimedb::client::messages::{serialize, IdentityTokenMessage, SerializableMessage, SerializeBuffer};
1817
use spacetimedb::client::{
19-
ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageHandleError, MeteredReceiver, Protocol,
18+
ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageHandleError, MeteredDeque, MeteredReceiver,
19+
Protocol,
2020
};
2121
use spacetimedb::execution_context::WorkloadType;
2222
use spacetimedb::host::module_host::ClientConnectedError;
@@ -210,6 +210,8 @@ async fn ws_client_actor_inner(
210210
let mut liveness_check_interval = tokio::time::interval(LIVELINESS_TIMEOUT);
211211
let mut got_pong = true;
212212

213+
let addr = client.module.info().database_identity;
214+
213215
// Build a queue of incoming messages to handle, to be processed one at a time,
214216
// in the order they're received.
215217
//
@@ -223,29 +225,14 @@ async fn ws_client_actor_inner(
223225
// `select!` for examples of how to do this.
224226
//
225227
// TODO: do we want this to have a fixed capacity? or should it be unbounded
226-
let mut message_queue = VecDeque::<(DataMessage, Instant)>::new();
228+
let mut message_queue = MeteredDeque::<(DataMessage, Instant)>::new(
229+
WORKER_METRICS.total_incoming_queue_length.with_label_values(&addr),
230+
);
227231
let mut current_message = pin!(MaybeDone::Gone);
228232

229233
let mut closed = false;
230234
let mut rx_buf = Vec::new();
231235

232-
let addr = client.module.info().database_identity;
233-
234-
// Grab handles on the total incoming and outgoing queue length metrics,
235-
// which we'll increment and decrement as we push into and pull out of those queues.
236-
// Note that `total_outgoing_queue_length` is incremented separately,
237-
// by `ClientConnectionSender::send` in core/src/client/client_connection.rs;
238-
// we're only responsible for decrementing that one.
239-
// Also note that much care must be taken to clean up these metrics when the connection closes!
240-
// Any path which exits this function must decrement each of these metrics
241-
// by the number of messages still waiting in this client's queue,
242-
// or else they will grow without bound as clients disconnect, and be useless.
243-
let incoming_queue_length_metric = WORKER_METRICS.total_incoming_queue_length.with_label_values(&addr);
244-
245-
let clean_up_metrics = |message_queue: &VecDeque<(DataMessage, Instant)>| {
246-
incoming_queue_length_metric.sub(message_queue.len() as _);
247-
};
248-
249236
let mut msg_buffer = SerializeBuffer::new(client.config);
250237
loop {
251238
rx_buf.clear();
@@ -255,7 +242,6 @@ async fn ws_client_actor_inner(
255242
}
256243
if let MaybeDone::Gone = *current_message {
257244
if let Some((message, timer)) = message_queue.pop_front() {
258-
incoming_queue_length_metric.dec();
259245
let client = client.clone();
260246
let fut = async move { client.handle_message(message, timer).await };
261247
current_message.set(MaybeDone::Future(fut));
@@ -284,7 +270,6 @@ async fn ws_client_actor_inner(
284270
}
285271
// the client sent us a close frame
286272
None => {
287-
clean_up_metrics(&message_queue);
288273
break
289274
},
290275
},
@@ -376,7 +361,6 @@ async fn ws_client_actor_inner(
376361
} else {
377362
// the client never responded to our ping; drop them without trying to send them a Close
378363
log::warn!("client {} timed out", client.id);
379-
clean_up_metrics(&message_queue);
380364
break;
381365
}
382366
}
@@ -391,7 +375,6 @@ async fn ws_client_actor_inner(
391375
match message {
392376
Item::Message(ClientMessage::Message(message)) => {
393377
let timer = Instant::now();
394-
incoming_queue_length_metric.inc();
395378
message_queue.push_back((message, timer))
396379
}
397380
Item::HandleResult(res) => {

crates/core/src/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ mod message_handlers;
77
pub mod messages;
88

99
pub use client_connection::{
10-
ClientConfig, ClientConnection, ClientConnectionSender, ClientSendError, DataMessage, MeteredReceiver, Protocol,
10+
ClientConfig, ClientConnection, ClientConnectionSender, ClientSendError, DataMessage, MeteredDeque,
11+
MeteredReceiver, Protocol,
1112
};
1213
pub use client_connection_index::ClientActorIndex;
1314
pub use message_handlers::MessageHandleError;

crates/core/src/client/client_connection.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::VecDeque;
12
use std::ops::Deref;
23
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
34
use std::sync::Arc;
@@ -258,6 +259,59 @@ impl DataMessage {
258259
}
259260
}
260261

262+
/// Wraps a [VecDeque] with a gauge for tracking its size.
263+
/// We subtract its size from the gauge on drop to avoid leaking the metric.
264+
pub struct MeteredDeque<T> {
265+
inner: VecDeque<T>,
266+
gauge: IntGauge,
267+
}
268+
269+
impl<T> MeteredDeque<T> {
270+
pub fn new(gauge: IntGauge) -> Self {
271+
Self {
272+
inner: VecDeque::new(),
273+
gauge,
274+
}
275+
}
276+
277+
pub fn pop_front(&mut self) -> Option<T> {
278+
self.inner.pop_front().inspect(|_| {
279+
self.gauge.dec();
280+
})
281+
}
282+
283+
pub fn pop_back(&mut self) -> Option<T> {
284+
self.inner.pop_back().inspect(|_| {
285+
self.gauge.dec();
286+
})
287+
}
288+
289+
pub fn push_front(&mut self, value: T) {
290+
self.gauge.inc();
291+
self.inner.push_front(value);
292+
}
293+
294+
pub fn push_back(&mut self, value: T) {
295+
self.gauge.inc();
296+
self.inner.push_back(value);
297+
}
298+
299+
pub fn len(&self) -> usize {
300+
self.inner.len()
301+
}
302+
303+
pub fn is_empty(&self) -> bool {
304+
self.inner.is_empty()
305+
}
306+
}
307+
308+
impl<T> Drop for MeteredDeque<T> {
309+
fn drop(&mut self) {
310+
// Record the number of elements still in the deque on drop
311+
self.gauge.sub(self.inner.len() as _);
312+
}
313+
}
314+
261315
/// Wraps the receiving end of a channel with a gauge for tracking the size of the channel.
262316
/// We subtract the size of the channel from the gauge on drop to avoid leaking the metric.
263317
pub struct MeteredReceiver<T> {

0 commit comments

Comments
 (0)