@@ -9,7 +9,7 @@ use axum_extra::TypedHeader;
99use bytes:: Bytes ;
1010use bytestring:: ByteString ;
1111use futures:: future:: MaybeDone ;
12- use futures:: { Future , SinkExt , StreamExt } ;
12+ use futures:: { Future , FutureExt , SinkExt , StreamExt } ;
1313use http:: { HeaderValue , StatusCode } ;
1414use scopeguard:: ScopeGuard ;
1515use serde:: Deserialize ;
@@ -267,89 +267,82 @@ async fn ws_client_actor_inner(
267267 Some ( Ok ( m) ) => Item :: Message ( ClientMessage :: from_message( m) ) ,
268268 Some ( Err ( error) ) => {
269269 log:: warn!( "Websocket receive error: {}" , error) ;
270- continue ;
270+ break ;
271271 }
272272 // the client sent us a close frame
273273 None => {
274- break
275- } ,
274+ break ;
275+ }
276276 } ,
277277
278278 // If we have an outgoing message to send, send it off.
279279 // No incoming `message` to handle, so `continue`.
280- n = sendrx. recv_many( & mut rx_buf, 32 ) => {
281- // The receiver has been closed and there are no pending messages.
282- // This is due to the client sending a close frame, so break from the loop.
283- if n == 0 {
284- break ;
285- }
280+ Some ( n) = sendrx. recv_many( & mut rx_buf, 32 ) . map( |n| ( n != 0 ) . then_some( n) ) => {
286281 if closed {
287282 // TODO: this isn't great. when we receive a close request from the peer,
288283 // tungstenite doesn't let us send any new messages on the socket,
289284 // even though the websocket RFC allows it. should we fork tungstenite?
290285 log:: info!( "dropping {n} messages due to ws already being closed" ) ;
291286 log:: debug!( "dropped messages: {:?}" , & rx_buf[ ..n] ) ;
292- break ;
293- }
294- let send_all = async {
295- for msg in rx_buf . drain ( ..n ) {
296- let workload = msg. workload ( ) ;
297- let num_rows = msg . num_rows ( ) ;
298-
299- // Serialize the message, report metrics,
300- // and keep a handle to the buffer.
301- let ( msg_alloc , msg_data ) = serialize ( msg_buffer , msg , client . config ) ;
302- report_ws_sent_metrics ( & addr , workload , num_rows , & msg_data ) ;
303-
304- // Buffer the message without necessarily sending it.
305- let res = ws . feed ( datamsg_to_wsmsg ( msg_data ) ) . await ;
306-
307- // At this point,
308- // the underlying allocation of `msg_data` should have a single referent
309- // and this should be `msg_alloc` .
310- // We can put this back into our pool.
311- msg_buffer = msg_alloc . try_reclaim ( )
312- . expect ( "should have a unique referent to `msg_alloc`" ) ;
313-
314- if res . is_err ( ) {
315- return ( res , msg_buffer ) ;
287+ } else {
288+ let send_all = async {
289+ for msg in rx_buf . drain ( ..n ) {
290+ let workload = msg . workload ( ) ;
291+ let num_rows = msg. num_rows ( ) ;
292+
293+ // Serialize the message, report metrics,
294+ // and keep a handle to the buffer.
295+ let ( msg_alloc , msg_data ) = serialize ( msg_buffer , msg , client . config ) ;
296+ report_ws_sent_metrics ( & addr , workload , num_rows , & msg_data ) ;
297+
298+ // Buffer the message without necessarily sending it.
299+ let res = ws . feed ( datamsg_to_wsmsg ( msg_data ) ) . await ;
300+
301+ // At this point,
302+ // the underlying allocation of `msg_data` should have a single referent
303+ // and this should be `msg_alloc`.
304+ // We can put this back into our pool .
305+ msg_buffer = msg_alloc . try_reclaim ( )
306+ . expect ( "should have a unique referent to `msg_alloc`" ) ;
307+
308+ if res . is_err ( ) {
309+ return ( res , msg_buffer ) ;
310+ }
316311 }
312+ // now we flush all the messages to the socket
313+ ( ws. flush( ) . await , msg_buffer)
314+ } ;
315+ // Build a future that both times out and drives the send.
316+ //
317+ // Note that if flushing cannot immediately complete for whatever reason,
318+ // it will wait without polling the other futures in the `select!` arms.
319+ // Among other things, this means our liveness tick will not be polled.
320+ //
321+ // To avoid waiting indefinitely, we wrap the send in a timeout.
322+ // A timeout is treated as an unresponsive client and we drop the connection.
323+ let send_all = tokio:: time:: timeout( SEND_TIMEOUT , send_all) ;
324+ // Flush the websocket while continuing to poll the `handle_queue`,
325+ // to avoid deadlocks or delays due to enqueued futures holding resources.
326+ let send_all = also_poll( send_all, make_progress( & mut current_message) ) ;
327+ let t1 = Instant :: now( ) ;
328+ let ( send_all_result, buf) = match send_all. await {
329+ Ok ( ( send_all_result, buf) ) => {
330+ ( send_all_result, buf)
331+ }
332+ Err ( e) => {
333+ // Our send timed out; drop client without trying to send them a Close
334+ log:: warn!( "send_all timed out: {e}" ) ;
335+ break ;
336+ }
337+ } ;
338+ msg_buffer = buf;
339+ if let Err ( error) = send_all_result {
340+ log:: warn!( "Websocket send error: {error}" )
317341 }
318- // now we flush all the messages to the socket
319- ( ws. flush( ) . await , msg_buffer)
320- } ;
321- // Build a future that both times out and drives the send.
322- //
323- // Note that if flushing cannot immediately complete for whatever reason,
324- // it will wait without polling the other futures in the `select!` arms.
325- // Among other things, this means our liveness tick will not be polled.
326- //
327- // To avoid waiting indefinitely, we wrap the send in a timeout.
328- // A timeout is treated as an unresponsive client and we drop the connection.
329- let send_all = async {
330- tokio:: time:: timeout( SEND_TIMEOUT , send_all) . await
331- } ;
332- // Flush the websocket while continuing to poll the `handle_queue`,
333- // to avoid deadlocks or delays due to enqueued futures holding resources.
334- let send_all = also_poll( send_all, make_progress( & mut current_message) ) ;
335- let t1 = Instant :: now( ) ;
336- let ( send_all_result, buf) = match send_all. await {
337- Ok ( ( send_all_result, buf) ) => {
338- ( send_all_result, buf)
339- }
340- Err ( e) => {
341- // Our send timed out; drop client without trying to send them a Close
342- log:: warn!( "send_all timed out after: {e}" ) ;
343- break ;
342+ let time = t1. elapsed( ) ;
343+ if time > Duration :: from_millis( 50 ) {
344+ tracing:: warn!( ?time, "send_all took a very long time" ) ;
344345 }
345- } ;
346- msg_buffer = buf;
347- if let Err ( error) = send_all_result {
348- log:: warn!( "Websocket send error: {error}" )
349- }
350- let time = t1. elapsed( ) ;
351- if time > Duration :: from_millis( 50 ) {
352- tracing:: warn!( ?time, "send_all took a very long time" ) ;
353346 }
354347 continue ;
355348 }
@@ -361,13 +354,33 @@ async fn ws_client_actor_inner(
361354 Err ( NoSuchModule ) => {
362355 // Send a close frame while continuing to poll the `handle_queue`,
363356 // to avoid deadlocks or delays due to enqueued futures holding resources.
364- let close = also_poll(
365- ws. close( Some ( CloseFrame { code: CloseCode :: Away , reason: "module exited" . into( ) } ) ) ,
366- make_progress( & mut current_message) ,
367- ) ;
368- if let Err ( e) = close. await {
369- log:: warn!( "error closing: {e:#}" )
370- }
357+ let close = ws. close( Some ( CloseFrame { code: CloseCode :: Away , reason: "module exited" . into( ) } ) ) ;
358+ // Wrap the close in a timeout
359+ let close = tokio:: time:: timeout( SEND_TIMEOUT , close) ;
360+ match also_poll( close, make_progress( & mut current_message) ) . await {
361+ Ok ( Err ( e) ) => {
362+ log:: warn!( "error closing websocket: {e:#}" )
363+ }
364+ Err ( e) => {
365+ // Our send timed out; drop client without trying to send them a Close.
366+ //
367+ // Is it correct to break if a reducer is still in progress?
368+ // Answer: Yes it is.
369+ //
370+ // If a reducer is currently being executed,
371+ // we are waiting for the `current_message` future to complete.
372+ // When we break, the task completes and this future is dropped.
373+ //
374+ // Notably though the reducer itself will run to completion,
375+ // however when it tries to notify this task that it is done,
376+ // it will encounter a closed sender in `JobThread::run`,
377+ // dropping the value that it's trying to send.
378+ // In particular it will not throw an error or panic.
379+ log:: warn!( "websocket close timed out: {e}" ) ;
380+ break ;
381+ }
382+ _ => { }
383+ } ;
371384 closed = true ;
372385 }
373386 }
@@ -386,9 +399,9 @@ async fn ws_client_actor_inner(
386399 //
387400 // To avoid waiting indefinitely, we wrap the ping in a timeout.
388401 // A timeout is treated as an unresponsive client and we drop the connection.
389- let ping_with_timeout = async {
390- tokio:: time:: timeout( SEND_TIMEOUT , ws . send ( WsMessage :: Ping ( Bytes :: new ( ) ) ) ) . await
391- } ;
402+ let ping = ws . send ( WsMessage :: Ping ( Bytes :: new ( ) ) ) ;
403+ let ping_with_timeout = tokio:: time:: timeout( SEND_TIMEOUT , ping ) ;
404+
392405 // Send a ping message while continuing to poll the `handle_queue`,
393406 // to avoid deadlocks or delays due to enqueued futures holding resources.
394407 match also_poll( ping_with_timeout, make_progress( & mut current_message) ) . await {
@@ -429,9 +442,18 @@ async fn ws_client_actor_inner(
429442 // Serialize the message and keep a handle to the buffer.
430443 let ( msg_alloc, msg_data) = serialize ( msg_buffer, err, client. config ) ;
431444
432- // Buffer the message without necessarily sending it.
433- if let Err ( error) = ws. send ( datamsg_to_wsmsg ( msg_data) ) . await {
434- log:: warn!( "Websocket send error: {error}" )
445+ let send = async { ws. send ( datamsg_to_wsmsg ( msg_data) ) . await } ;
446+ let send = tokio:: time:: timeout ( SEND_TIMEOUT , send) ;
447+
448+ match send. await {
449+ Ok ( Err ( error) ) => {
450+ log:: warn!( "Websocket send error: {error}" )
451+ }
452+ Err ( error) => {
453+ log:: warn!( "send timed out after: {error}" ) ;
454+ break ;
455+ }
456+ _ => { }
435457 }
436458
437459 // At this point,
@@ -445,15 +467,22 @@ async fn ws_client_actor_inner(
445467 continue ;
446468 }
447469 log:: warn!( "Client caused error on text message: {}" , e) ;
448- if let Err ( e) = ws
449- . close ( Some ( CloseFrame {
450- code : CloseCode :: Error ,
451- reason : format ! ( "{e:#}" ) . into ( ) ,
452- } ) )
453- . await
454- {
455- log:: warn!( "error closing websocket: {e:#}" )
456- } ;
470+ let close = ws. close ( Some ( CloseFrame {
471+ code : CloseCode :: Error ,
472+ reason : format ! ( "{e:#}" ) . into ( ) ,
473+ } ) ) ;
474+
475+ // Wrap the close in a timeout
476+ match tokio:: time:: timeout ( SEND_TIMEOUT , close) . await {
477+ Ok ( Err ( e) ) => {
478+ log:: warn!( "error closing websocket: {e:#}" )
479+ }
480+ Err ( e) => {
481+ log:: warn!( "send timed out after: {e}" ) ;
482+ break ;
483+ }
484+ _ => { }
485+ }
457486 }
458487 }
459488 Item :: Message ( ClientMessage :: Ping ( _message) ) => {
@@ -485,19 +514,10 @@ async fn ws_client_actor_inner(
485514 . inc ( ) ;
486515 }
487516
488- // Is it correct to break if a reducer is still in progress?
489- // Answer: Yes it is.
490- //
491- // If a reducer is currently being executed,
492- // we are waiting for the `current_message` future to complete.
493- // When we break, the task completes and this future is dropped.
494- //
495- // Notably though the reducer itself will run to completion,
496- // however when it tries to notify this task that it is done,
497- // it will encounter a closed sender in `JobThread::run`,
498- // dropping the value that it's trying to send.
499- // In particular it will not throw an error or panic.
500- break ;
517+ // Can't we just break out of the loop here?
518+ // Not, if we want tungstenite to send a close frame back to the client.
519+ // That will only happen once `ws.next()` returns `None`.
520+ closed = true ;
501521 }
502522 }
503523 }
0 commit comments