@@ -9,7 +9,7 @@ use axum_extra::TypedHeader;
99use bytes:: Bytes ;
1010use bytestring:: ByteString ;
1111use futures:: future:: MaybeDone ;
12- use futures:: { Future , FutureExt , SinkExt , StreamExt } ;
12+ use futures:: { Future , SinkExt , StreamExt } ;
1313use http:: { HeaderValue , StatusCode } ;
1414use scopeguard:: ScopeGuard ;
1515use serde:: Deserialize ;
@@ -182,6 +182,7 @@ where
182182}
183183
184184const LIVELINESS_TIMEOUT : Duration = Duration :: from_secs ( 60 ) ;
185+ const SEND_TIMEOUT : Duration = Duration :: from_secs ( 5 ) ;
185186
186187async fn ws_client_actor ( client : ClientConnection , ws : WebSocketStream , sendrx : MeteredReceiver < SerializableMessage > ) {
187188 // ensure that even if this task gets cancelled, we always cleanup the connection
@@ -276,54 +277,79 @@ async fn ws_client_actor_inner(
276277
277278 // If we have an outgoing message to send, send it off.
278279 // No incoming `message` to handle, so `continue`.
279- Some ( n) = sendrx. recv_many( & mut rx_buf, 32 ) . map( |n| ( n != 0 ) . then_some( n) ) => {
280+ n = sendrx. recv_many( & mut rx_buf, 32 ) => {
281+ // The receiver has been closed and there are no pending messages.
282+ // This is due to the client sending a close frame, so break from the loop.
283+ if n == 0 {
284+ break ;
285+ }
280286 if closed {
281287 // TODO: this isn't great. when we receive a close request from the peer,
282288 // tungstenite doesn't let us send any new messages on the socket,
283289 // even though the websocket RFC allows it. should we fork tungstenite?
284290 log:: info!( "dropping {n} messages due to ws already being closed" ) ;
285291 log:: debug!( "dropped messages: {:?}" , & rx_buf[ ..n] ) ;
286- } else {
287- let send_all = async {
288- for msg in rx_buf . drain ( ..n ) {
289- let workload = msg . workload ( ) ;
290- let num_rows = msg. num_rows ( ) ;
291-
292- // Serialize the message, report metrics,
293- // and keep a handle to the buffer.
294- let ( msg_alloc , msg_data ) = serialize ( msg_buffer , msg , client . config ) ;
295- report_ws_sent_metrics ( & addr , workload , num_rows , & msg_data ) ;
296-
297- // Buffer the message without necessarily sending it.
298- let res = ws . feed ( datamsg_to_wsmsg ( msg_data ) ) . await ;
299-
300- // At this point,
301- // the underlying allocation of `msg_data` should have a single referent
302- // and this should be `msg_alloc`.
303- // We can put this back into our pool .
304- msg_buffer = msg_alloc . try_reclaim ( )
305- . expect ( "should have a unique referent to `msg_alloc`" ) ;
306-
307- if res . is_err ( ) {
308- return ( res, msg_buffer ) ;
309- }
292+ break ;
293+ }
294+ let send_all = async {
295+ for msg in rx_buf . drain ( ..n ) {
296+ let workload = msg. workload ( ) ;
297+ let num_rows = msg . num_rows ( ) ;
298+
299+ // Serialize the message, report metrics,
300+ // and keep a handle to the buffer.
301+ let ( msg_alloc , msg_data ) = serialize ( msg_buffer , msg , client . config ) ;
302+ report_ws_sent_metrics ( & addr , workload , num_rows , & msg_data ) ;
303+
304+ // Buffer the message without necessarily sending it.
305+ let res = ws . feed ( datamsg_to_wsmsg ( msg_data ) ) . await ;
306+
307+ // At this point,
308+ // the underlying allocation of `msg_data` should have a single referent
309+ // and this should be `msg_alloc` .
310+ // We can put this back into our pool.
311+ msg_buffer = msg_alloc . try_reclaim ( )
312+ . expect ( "should have a unique referent to `msg_alloc`" ) ;
313+
314+ if res. is_err ( ) {
315+ return ( res , msg_buffer ) ;
310316 }
311- // now we flush all the messages to the socket
312- ( ws. flush( ) . await , msg_buffer)
313- } ;
314- // Flush the websocket while continuing to poll the `handle_queue`,
315- // to avoid deadlocks or delays due to enqueued futures holding resources.
316- let send_all = also_poll( send_all, make_progress( & mut current_message) ) ;
317- let t1 = Instant :: now( ) ;
318- let ( send_all_result, buf) = send_all. await ;
319- msg_buffer = buf;
320- if let Err ( error) = send_all_result {
321- log:: warn!( "Websocket send error: {error}" )
322317 }
323- let time = t1. elapsed( ) ;
324- if time > Duration :: from_millis( 50 ) {
325- tracing:: warn!( ?time, "send_all took a very long time" ) ;
318+ // now we flush all the messages to the socket
319+ ( ws. flush( ) . await , msg_buffer)
320+ } ;
321+ // Build a future that both times out and drives the send.
322+ //
323+ // Note that if flushing cannot immediately complete for whatever reason,
324+ // it will wait without polling the other futures in the `select!` arms.
325+ // Among other things, this means our liveness tick will not be polled.
326+ //
327+ // To avoid waiting indefinitely, we wrap the send in a timeout.
328+ // A timeout is treated as an unresponsive client and we drop the connection.
329+ let send_all = async {
330+ tokio:: time:: timeout( SEND_TIMEOUT , send_all) . await
331+ } ;
332+ // Flush the websocket while continuing to poll the `handle_queue`,
333+ // to avoid deadlocks or delays due to enqueued futures holding resources.
334+ let send_all = also_poll( send_all, make_progress( & mut current_message) ) ;
335+ let t1 = Instant :: now( ) ;
336+ let ( send_all_result, buf) = match send_all. await {
337+ Ok ( ( send_all_result, buf) ) => {
338+ ( send_all_result, buf)
339+ }
340+ Err ( e) => {
341+ // Our send timed out; drop client without trying to send them a Close
342+ log:: warn!( "send_all timed out after: {e}" ) ;
343+ break ;
326344 }
345+ } ;
346+ msg_buffer = buf;
347+ if let Err ( error) = send_all_result {
348+ log:: warn!( "Websocket send error: {error}" )
349+ }
350+ let time = t1. elapsed( ) ;
351+ if time > Duration :: from_millis( 50 ) {
352+ tracing:: warn!( ?time, "send_all took a very long time" ) ;
327353 }
328354 continue ;
329355 }
@@ -352,10 +378,29 @@ async fn ws_client_actor_inner(
352378 _ = liveness_check_interval. tick( ) => {
353379 // If we received a pong at some point, send a fresh ping.
354380 if mem:: take( & mut got_pong) {
381+ // Build a future that both times out and drives the send.
382+ //
383+ // Note that if the send cannot immediately complete for whatever reason,
384+ // it will wait without polling the other futures in the `select!` arms.
385+ // Among other things, this means we won't poll the websocket for a Close frame.
386+ //
387+ // To avoid waiting indefinitely, we wrap the ping in a timeout.
388+ // A timeout is treated as an unresponsive client and we drop the connection.
389+ let ping_with_timeout = async {
390+ tokio:: time:: timeout( SEND_TIMEOUT , ws. send( WsMessage :: Ping ( Bytes :: new( ) ) ) ) . await
391+ } ;
355392 // Send a ping message while continuing to poll the `handle_queue`,
356393 // to avoid deadlocks or delays due to enqueued futures holding resources.
357- if let Err ( e) = also_poll( ws. send( WsMessage :: Ping ( Bytes :: new( ) ) ) , make_progress( & mut current_message) ) . await {
358- log:: warn!( "error sending ping: {e:#}" ) ;
394+ match also_poll( ping_with_timeout, make_progress( & mut current_message) ) . await {
395+ Ok ( Err ( e) ) => {
396+ log:: warn!( "error sending ping: {e:#}" ) ;
397+ }
398+ Err ( e) => {
399+ // Our ping timed out; drop them without trying to send them a Close
400+ log:: warn!( "ping timed out after: {e}" ) ;
401+ break ;
402+ }
403+ _ => { }
359404 }
360405 continue ;
361406 } else {
@@ -380,7 +425,7 @@ async fn ws_client_actor_inner(
380425 Item :: HandleResult ( res) => {
381426 if let Err ( e) = res {
382427 if let MessageHandleError :: Execution ( err) = e {
383- log:: error!( "{err:#}" ) ;
428+ log:: error!( "reducer execution error: {err:#}" ) ;
384429 // Serialize the message and keep a handle to the buffer.
385430 let ( msg_alloc, msg_data) = serialize ( msg_buffer, err, client. config ) ;
386431
@@ -399,7 +444,7 @@ async fn ws_client_actor_inner(
399444
400445 continue ;
401446 }
402- log:: debug !( "Client caused error on text message: {}" , e) ;
447+ log:: warn !( "Client caused error on text message: {}" , e) ;
403448 if let Err ( e) = ws
404449 . close ( Some ( CloseFrame {
405450 code : CloseCode :: Error ,
@@ -439,7 +484,18 @@ async fn ws_client_actor_inner(
439484 . with_label_values ( & addr)
440485 . inc ( ) ;
441486 }
442- closed = true ;
487+
488+ // Is it correct to break if a reducer is still in progress?
489+ // Answer: Yes it is.
490+ //
491+ // If a reducer is currently being executed,
492+ // we are waiting for the `current_message` future to complete.
493+ // When we break, the task completes and this future is dropped.
494+ //
495+ // Notably though the reducer itself will run to completion,
496+ // however when it tries to notify this task that it is done,
497+ // it will encounter a closed sender in `JobThread::run`.
498+ break ;
443499 }
444500 }
445501 }
0 commit comments