@@ -9,7 +9,7 @@ use axum_extra::TypedHeader;
99use bytes:: Bytes ;
1010use bytestring:: ByteString ;
1111use futures:: future:: MaybeDone ;
12- use futures:: { Future , FutureExt , SinkExt , StreamExt } ;
12+ use futures:: { Future , SinkExt , StreamExt } ;
1313use http:: { HeaderValue , StatusCode } ;
1414use scopeguard:: ScopeGuard ;
1515use serde:: Deserialize ;
@@ -182,6 +182,7 @@ where
182182}
183183
184184const LIVELINESS_TIMEOUT : Duration = Duration :: from_secs ( 60 ) ;
185+ const SEND_TIMEOUT : Duration = Duration :: from_secs ( 5 ) ;
185186
186187async fn ws_client_actor ( client : ClientConnection , ws : WebSocketStream , sendrx : MeteredReceiver < SerializableMessage > ) {
187188 // ensure that even if this task gets cancelled, we always cleanup the connection
@@ -276,54 +277,72 @@ async fn ws_client_actor_inner(
276277
277278 // If we have an outgoing message to send, send it off.
278279 // No incoming `message` to handle, so `continue`.
279- Some ( n) = sendrx. recv_many( & mut rx_buf, 32 ) . map( |n| ( n != 0 ) . then_some( n) ) => {
280+ n = sendrx. recv_many( & mut rx_buf, 32 ) => {
281+ // The receiver has been closed and there are no pending messages.
282+ // This is due to the client sending a close frame, so break from the loop.
283+ if n == 0 {
284+ break ;
285+ }
280286 if closed {
281287 // TODO: this isn't great. when we receive a close request from the peer,
282288 // tungstenite doesn't let us send any new messages on the socket,
283289 // even though the websocket RFC allows it. should we fork tungstenite?
284290 log:: info!( "dropping {n} messages due to ws already being closed" ) ;
285291 log:: debug!( "dropped messages: {:?}" , & rx_buf[ ..n] ) ;
286- } else {
287- let send_all = async {
288- for msg in rx_buf . drain ( ..n ) {
289- let workload = msg . workload ( ) ;
290- let num_rows = msg. num_rows ( ) ;
291-
292- // Serialize the message, report metrics,
293- // and keep a handle to the buffer.
294- let ( msg_alloc , msg_data ) = serialize ( msg_buffer , msg , client . config ) ;
295- report_ws_sent_metrics ( & addr , workload , num_rows , & msg_data ) ;
296-
297- // Buffer the message without necessarily sending it.
298- let res = ws . feed ( datamsg_to_wsmsg ( msg_data ) ) . await ;
299-
300- // At this point,
301- // the underlying allocation of `msg_data` should have a single referent
302- // and this should be `msg_alloc`.
303- // We can put this back into our pool .
304- msg_buffer = msg_alloc . try_reclaim ( )
305- . expect ( "should have a unique referent to `msg_alloc`" ) ;
306-
307- if res . is_err ( ) {
308- return ( res, msg_buffer ) ;
309- }
292+ break ;
293+ }
294+ let send_all = async {
295+ for msg in rx_buf . drain ( ..n ) {
296+ let workload = msg. workload ( ) ;
297+ let num_rows = msg . num_rows ( ) ;
298+
299+ // Serialize the message, report metrics,
300+ // and keep a handle to the buffer.
301+ let ( msg_alloc , msg_data ) = serialize ( msg_buffer , msg , client . config ) ;
302+ report_ws_sent_metrics ( & addr , workload , num_rows , & msg_data ) ;
303+
304+ // Buffer the message without necessarily sending it.
305+ let res = ws . feed ( datamsg_to_wsmsg ( msg_data ) ) . await ;
306+
307+ // At this point,
308+ // the underlying allocation of `msg_data` should have a single referent
309+ // and this should be `msg_alloc` .
310+ // We can put this back into our pool.
311+ msg_buffer = msg_alloc . try_reclaim ( )
312+ . expect ( "should have a unique referent to `msg_alloc`" ) ;
313+
314+ if res. is_err ( ) {
315+ return ( res , msg_buffer ) ;
310316 }
311- // now we flush all the messages to the socket
312- ( ws. flush( ) . await , msg_buffer)
313- } ;
314- // Flush the websocket while continuing to poll the `handle_queue`,
315- // to avoid deadlocks or delays due to enqueued futures holding resources.
316- let send_all = also_poll( send_all, make_progress( & mut current_message) ) ;
317- let t1 = Instant :: now( ) ;
318- let ( send_all_result, buf) = send_all. await ;
319- msg_buffer = buf;
320- if let Err ( error) = send_all_result {
321- log:: warn!( "Websocket send error: {error}" )
322317 }
323- let time = t1. elapsed( ) ;
324- if time > Duration :: from_millis( 50 ) {
325- tracing:: warn!( ?time, "send_all took a very long time" ) ;
318+ // now we flush all the messages to the socket
319+ ( ws. flush( ) . await , msg_buffer)
320+ } ;
321+ // build a future that both times out and drives the send
322+ let send_all = async {
323+ tokio:: time:: timeout( SEND_TIMEOUT , send_all) . await
324+ } ;
325+ // Flush the websocket while continuing to poll the `handle_queue`,
326+ // to avoid deadlocks or delays due to enqueued futures holding resources.
327+ let send_all = also_poll( send_all, make_progress( & mut current_message) ) ;
328+ let t1 = Instant :: now( ) ;
329+ let ( send_all_result, buf) = match send_all. await {
330+ Ok ( ( send_all_result, buf) ) => {
331+ ( send_all_result, buf)
326332 }
333+ Err ( e) => {
334+ // Our send timed out; drop client without trying to send them a Close
335+ log:: warn!( "send_all timed out after: {e}" ) ;
336+ break ;
337+ }
338+ } ;
339+ msg_buffer = buf;
340+ if let Err ( error) = send_all_result {
341+ log:: warn!( "Websocket send error: {error}" )
342+ }
343+ let time = t1. elapsed( ) ;
344+ if time > Duration :: from_millis( 50 ) {
345+ tracing:: warn!( ?time, "send_all took a very long time" ) ;
327346 }
328347 continue ;
329348 }
@@ -349,13 +368,25 @@ async fn ws_client_actor_inner(
349368 }
350369
351370 // If it's time to send a ping...
352- _ = liveness_check_interval. tick( ) => {
371+ _ = liveness_check_interval. tick( ) , if got_pong => {
353372 // If we received a pong at some point, send a fresh ping.
354373 if mem:: take( & mut got_pong) {
374+ // build a future that both times out and drives the send
375+ let ping_with_timeout = async {
376+ tokio:: time:: timeout( SEND_TIMEOUT , ws. send( WsMessage :: Ping ( Bytes :: new( ) ) ) ) . await
377+ } ;
355378 // Send a ping message while continuing to poll the `handle_queue`,
356379 // to avoid deadlocks or delays due to enqueued futures holding resources.
357- if let Err ( e) = also_poll( ws. send( WsMessage :: Ping ( Bytes :: new( ) ) ) , make_progress( & mut current_message) ) . await {
358- log:: warn!( "error sending ping: {e:#}" ) ;
380+ match also_poll( ping_with_timeout, make_progress( & mut current_message) ) . await {
381+ Ok ( Err ( e) ) => {
382+ log:: warn!( "error sending ping: {e:#}" ) ;
383+ }
384+ Err ( e) => {
385+ // Our ping timed out; drop them without trying to send them a Close
386+ log:: warn!( "ping timed out after: {e}" ) ;
387+ break ;
388+ }
389+ _ => { }
359390 }
360391 continue ;
361392 } else {
@@ -380,7 +411,7 @@ async fn ws_client_actor_inner(
380411 Item :: HandleResult ( res) => {
381412 if let Err ( e) = res {
382413 if let MessageHandleError :: Execution ( err) = e {
383- log:: error!( "{err:#}" ) ;
414+ log:: error!( "reducer execution error: {err:#}" ) ;
384415 // Serialize the message and keep a handle to the buffer.
385416 let ( msg_alloc, msg_data) = serialize ( msg_buffer, err, client. config ) ;
386417
@@ -399,7 +430,7 @@ async fn ws_client_actor_inner(
399430
400431 continue ;
401432 }
402- log:: debug !( "Client caused error on text message: {}" , e) ;
433+ log:: warn !( "Client caused error on text message: {}" , e) ;
403434 if let Err ( e) = ws
404435 . close ( Some ( CloseFrame {
405436 code : CloseCode :: Error ,
@@ -439,7 +470,7 @@ async fn ws_client_actor_inner(
439470 . with_label_values ( & addr)
440471 . inc ( ) ;
441472 }
442- closed = true ;
473+ break ;
443474 }
444475 }
445476 }
0 commit comments