@@ -228,6 +228,24 @@ async fn ws_client_actor_inner(
228228
229229 let addr = client. module . info ( ) . database_identity ;
230230
231+ // Grab handles on the total incoming and outgoing queue length metrics,
232+ // which we'll increment and decrement as we push into and pull out of those queues.
233+ // Note that `total_outgoing_queue_length` is incremented separately,
234+ // by `ClientConnectionSender::send` in core/src/client/client_connection.rs;
235+ // we're only responsible for decrementing that one.
236+ // Also note that much care must be taken to clean up these metrics when the connection closes!
237+ // Any path which exits this function must decrement each of these metrics
238+ // by the number of messages still waiting in this client's queue,
239+ // or else they will grow without bound as clients disconnect, and be useless.
240+ let incoming_queue_length_metric = WORKER_METRICS . total_incoming_queue_length . with_label_values ( & addr) ;
241+ let outgoing_queue_length_metric = WORKER_METRICS . total_outgoing_queue_length . with_label_values ( & addr) ;
242+
243+ let clean_up_metrics = |message_queue : & VecDeque < ( DataMessage , Instant ) > ,
244+ sendrx : & mpsc:: Receiver < SerializableMessage > | {
245+ incoming_queue_length_metric. sub ( message_queue. len ( ) as _ ) ;
246+ outgoing_queue_length_metric. sub ( sendrx. len ( ) as _ ) ;
247+ } ;
248+
231249 loop {
232250 rx_buf. clear ( ) ;
233251 enum Item {
@@ -236,6 +254,7 @@ async fn ws_client_actor_inner(
236254 }
237255 if let MaybeDone :: Gone = * current_message {
238256 if let Some ( ( message, timer) ) = message_queue. pop_front ( ) {
257+ incoming_queue_length_metric. dec ( ) ;
239258 let client = client. clone ( ) ;
240259 let fut = async move { client. handle_message ( message, timer) . await } ;
241260 current_message. set ( MaybeDone :: Future ( fut) ) ;
@@ -263,43 +282,47 @@ async fn ws_client_actor_inner(
263282 continue ;
264283 }
265284 // the client sent us a close frame
266- None => break ,
285+ None => {
286+ clean_up_metrics( & message_queue, & sendrx) ;
287+ break
288+ } ,
267289 } ,
268290
269291 // If we have an outgoing message to send, send it off.
270292 // No incoming `message` to handle, so `continue`.
271293 Some ( n) = sendrx. recv_many( & mut rx_buf, 32 ) . map( |n| ( n != 0 ) . then_some( n) ) => {
294+ outgoing_queue_length_metric. sub( n as _) ;
272295 if closed {
273296 // TODO: this isn't great. when we receive a close request from the peer,
274297 // tungstenite doesn't let us send any new messages on the socket,
275298 // even though the websocket RFC allows it. should we fork tungstenite?
276299 log:: info!( "dropping messages due to ws already being closed: {:?}" , & rx_buf[ ..n] ) ;
277300 } else {
278- let send_all = async {
279- for msg in rx_buf. drain( ..n) {
280- let workload = msg. workload( ) ;
281- let num_rows = msg. num_rows( ) ;
282-
283- let msg = datamsg_to_wsmsg( serialize( msg, client. config) ) ;
284-
285- // These metrics should be updated together,
286- // or not at all.
287- if let ( Some ( workload) , Some ( num_rows) ) = ( workload, num_rows) {
288- WORKER_METRICS
289- . websocket_sent_num_rows
290- . with_label_values( & addr, & workload)
291- . observe( num_rows as f64 ) ;
292- WORKER_METRICS
293- . websocket_sent_msg_size
294- . with_label_values( & addr, & workload)
295- . observe( msg. len( ) as f64 ) ;
296- }
297- // feed() buffers the message, but does not necessarily send it
298- ws. feed( msg) . await ?;
301+ let send_all = async {
302+ for msg in rx_buf. drain( ..n) {
303+ let workload = msg. workload( ) ;
304+ let num_rows = msg. num_rows( ) ;
305+
306+ let msg = datamsg_to_wsmsg( serialize( msg, client. config) ) ;
307+
308+ // These metrics should be updated together,
309+ // or not at all.
310+ if let ( Some ( workload) , Some ( num_rows) ) = ( workload, num_rows) {
311+ WORKER_METRICS
312+ . websocket_sent_num_rows
313+ . with_label_values( & addr, & workload)
314+ . observe( num_rows as f64 ) ;
315+ WORKER_METRICS
316+ . websocket_sent_msg_size
317+ . with_label_values( & addr, & workload)
318+ . observe( msg. len( ) as f64 ) ;
299319 }
300- // now we flush all the messages to the socket
301- ws. flush( ) . await
302- } ;
320+ // feed() buffers the message, but does not necessarily send it
321+ ws. feed( msg) . await ?;
322+ }
323+ // now we flush all the messages to the socket
324+ ws. flush( ) . await
325+ } ;
303326 // Flush the websocket while continuing to poll the `handle_queue`,
304327 // to avoid deadlocks or delays due to enqueued futures holding resources.
305328 let send_all = also_poll( send_all, make_progress( & mut current_message) ) ;
@@ -348,6 +371,7 @@ async fn ws_client_actor_inner(
348371 } else {
349372 // the client never responded to our ping; drop them without trying to send them a Close
350373 log:: warn!( "client {} timed out" , client. id) ;
374+ clean_up_metrics( & message_queue, & sendrx) ;
351375 break ;
352376 }
353377 }
@@ -362,6 +386,7 @@ async fn ws_client_actor_inner(
362386 match message {
363387 Item :: Message ( ClientMessage :: Message ( message) ) => {
364388 let timer = Instant :: now ( ) ;
389+ incoming_queue_length_metric. inc ( ) ;
365390 message_queue. push_back ( ( message, timer) )
366391 }
367392 Item :: HandleResult ( res) => {
0 commit comments