@@ -228,24 +228,6 @@ async fn ws_client_actor_inner(
228228
229229 let addr = client. module . info ( ) . database_identity ;
230230
231- // Grab handles on the total incoming and outgoing queue length metrics,
232- // which we'll increment and decrement as we push into and pull out of those queues.
233- // Note that `total_outgoing_queue_length` is incremented separately,
234- // by `ClientConnectionSender::send` in core/src/client/client_connection.rs;
235- // we're only responsible for decrementing that one.
236- // Also note that much care must be taken to clean up these metrics when the connection closes!
237- // Any path which exits this function must decrement each of these metrics
238- // by the number of messages still waiting in this client's queue,
239- // or else they will grow without bound as clients disconnect, and be useless.
240- let incoming_queue_length_metric = WORKER_METRICS . total_incoming_queue_length . with_label_values ( & addr) ;
241- let outgoing_queue_length_metric = WORKER_METRICS . total_outgoing_queue_length . with_label_values ( & addr) ;
242-
243- let clean_up_metrics = |message_queue : & VecDeque < ( DataMessage , Instant ) > ,
244- sendrx : & mpsc:: Receiver < SerializableMessage > | {
245- incoming_queue_length_metric. sub ( message_queue. len ( ) as _ ) ;
246- outgoing_queue_length_metric. sub ( sendrx. len ( ) as _ ) ;
247- } ;
248-
249231 loop {
250232 rx_buf. clear ( ) ;
251233 enum Item {
@@ -254,7 +236,6 @@ async fn ws_client_actor_inner(
254236 }
255237 if let MaybeDone :: Gone = * current_message {
256238 if let Some ( ( message, timer) ) = message_queue. pop_front ( ) {
257- incoming_queue_length_metric. dec ( ) ;
258239 let client = client. clone ( ) ;
259240 let fut = async move { client. handle_message ( message, timer) . await } ;
260241 current_message. set ( MaybeDone :: Future ( fut) ) ;
@@ -282,47 +263,43 @@ async fn ws_client_actor_inner(
282263 continue ;
283264 }
284265 // the client sent us a close frame
285- None => {
286- clean_up_metrics( & message_queue, & sendrx) ;
287- break
288- } ,
266+ None => break ,
289267 } ,
290268
291269 // If we have an outgoing message to send, send it off.
292270 // No incoming `message` to handle, so `continue`.
293271 Some ( n) = sendrx. recv_many( & mut rx_buf, 32 ) . map( |n| ( n != 0 ) . then_some( n) ) => {
294- outgoing_queue_length_metric. sub( n as _) ;
295272 if closed {
296273 // TODO: this isn't great. when we receive a close request from the peer,
297274 // tungstenite doesn't let us send any new messages on the socket,
298275 // even though the websocket RFC allows it. should we fork tungstenite?
299276 log:: info!( "dropping messages due to ws already being closed: {:?}" , & rx_buf[ ..n] ) ;
300277 } else {
301- let send_all = async {
302- for msg in rx_buf. drain( ..n) {
303- let workload = msg. workload( ) ;
304- let num_rows = msg. num_rows( ) ;
305-
306- let msg = datamsg_to_wsmsg( serialize( msg, client. config) ) ;
307-
308- // These metrics should be updated together,
309- // or not at all.
310- if let ( Some ( workload) , Some ( num_rows) ) = ( workload, num_rows) {
311- WORKER_METRICS
312- . websocket_sent_num_rows
313- . with_label_values( & addr, & workload)
314- . observe( num_rows as f64 ) ;
315- WORKER_METRICS
316- . websocket_sent_msg_size
317- . with_label_values( & addr, & workload)
318- . observe( msg. len( ) as f64 ) ;
278+ let send_all = async {
279+ for msg in rx_buf. drain( ..n) {
280+ let workload = msg. workload( ) ;
281+ let num_rows = msg. num_rows( ) ;
282+
283+ let msg = datamsg_to_wsmsg( serialize( msg, client. config) ) ;
284+
285+ // These metrics should be updated together,
286+ // or not at all.
287+ if let ( Some ( workload) , Some ( num_rows) ) = ( workload, num_rows) {
288+ WORKER_METRICS
289+ . websocket_sent_num_rows
290+ . with_label_values( & addr, & workload)
291+ . observe( num_rows as f64 ) ;
292+ WORKER_METRICS
293+ . websocket_sent_msg_size
294+ . with_label_values( & addr, & workload)
295+ . observe( msg. len( ) as f64 ) ;
296+ }
297+ // feed() buffers the message, but does not necessarily send it
298+ ws. feed( msg) . await ?;
319299 }
320- // feed() buffers the message, but does not necessarily send it
321- ws. feed( msg) . await ?;
322- }
323- // now we flush all the messages to the socket
324- ws. flush( ) . await
325- } ;
300+ // now we flush all the messages to the socket
301+ ws. flush( ) . await
302+ } ;
326303 // Flush the websocket while continuing to poll the `handle_queue`,
327304 // to avoid deadlocks or delays due to enqueued futures holding resources.
328305 let send_all = also_poll( send_all, make_progress( & mut current_message) ) ;
@@ -371,7 +348,6 @@ async fn ws_client_actor_inner(
371348 } else {
372349 // the client never responded to our ping; drop them without trying to send them a Close
373350 log:: warn!( "client {} timed out" , client. id) ;
374- clean_up_metrics( & message_queue, & sendrx) ;
375351 break ;
376352 }
377353 }
@@ -386,7 +362,6 @@ async fn ws_client_actor_inner(
386362 match message {
387363 Item :: Message ( ClientMessage :: Message ( message) ) => {
388364 let timer = Instant :: now ( ) ;
389- incoming_queue_length_metric. inc ( ) ;
390365 message_queue. push_back ( ( message, timer) )
391366 }
392367 Item :: HandleResult ( res) => {
0 commit comments