diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 3137c4f10b2..85bd8acb42d 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -182,6 +182,7 @@ where } const LIVELINESS_TIMEOUT: Duration = Duration::from_secs(60); +const SEND_TIMEOUT: Duration = Duration::from_secs(5); async fn ws_client_actor(client: ClientConnection, ws: WebSocketStream, sendrx: MeteredReceiver) { // ensure that even if this task gets cancelled, we always cleanup the connection @@ -266,12 +267,12 @@ async fn ws_client_actor_inner( Some(Ok(m)) => Item::Message(ClientMessage::from_message(m)), Some(Err(error)) => { log::warn!("Websocket receive error: {}", error); - continue; + break; } // the client sent us a close frame None => { - break - }, + break; + } }, // If we have an outgoing message to send, send it off. @@ -311,11 +312,29 @@ async fn ws_client_actor_inner( // now we flush all the messages to the socket (ws.flush().await, msg_buffer) }; + // Build a future that both times out and drives the send. + // + // Note that if flushing cannot immediately complete for whatever reason, + // it will wait without polling the other futures in the `select!` arms. + // Among other things, this means our liveness tick will not be polled. + // + // To avoid waiting indefinitely, we wrap the send in a timeout. + // A timeout is treated as an unresponsive client and we drop the connection. + let send_all = tokio::time::timeout(SEND_TIMEOUT, send_all); // Flush the websocket while continuing to poll the `handle_queue`, // to avoid deadlocks or delays due to enqueued futures holding resources. let send_all = also_poll(send_all, make_progress(&mut current_message)); let t1 = Instant::now(); - let (send_all_result, buf) = send_all.await; + let (send_all_result, buf) = match send_all.await { + Ok((send_all_result, buf)) => { + (send_all_result, buf) + } + Err(e) => { + // Our send timed out; drop client without trying to send them a Close + log::warn!("send_all timed out: {e}"); + break; + } + }; msg_buffer = buf; if let Err(error) = send_all_result { log::warn!("Websocket send error: {error}") @@ -335,13 +354,33 @@ async fn ws_client_actor_inner( Err(NoSuchModule) => { // Send a close frame while continuing to poll the `handle_queue`, // to avoid deadlocks or delays due to enqueued futures holding resources. - let close = also_poll( - ws.close(Some(CloseFrame { code: CloseCode::Away, reason: "module exited".into() })), - make_progress(&mut current_message), - ); - if let Err(e) = close.await { - log::warn!("error closing: {e:#}") - } + let close = ws.close(Some(CloseFrame { code: CloseCode::Away, reason: "module exited".into() })); + // Wrap the close in a timeout + let close = tokio::time::timeout(SEND_TIMEOUT, close); + match also_poll(close, make_progress(&mut current_message)).await { + Ok(Err(e)) => { + log::warn!("error closing websocket: {e:#}") + } + Err(e) => { + // Our send timed out; drop client without trying to send them a Close. + // + // Is it correct to break if a reducer is still in progress? + // Answer: Yes it is. + // + // If a reducer is currently being executed, + // we are waiting for the `current_message` future to complete. + // When we break, the task completes and this future is dropped. + // + // Notably though the reducer itself will run to completion, + // however when it tries to notify this task that it is done, + // it will encounter a closed sender in `JobThread::run`, + // dropping the value that it's trying to send. + // In particular it will not throw an error or panic. + log::warn!("websocket close timed out: {e}"); + break; + } + _ => {} + }; closed = true; } } @@ -352,10 +391,29 @@ async fn ws_client_actor_inner( _ = liveness_check_interval.tick() => { // If we received a pong at some point, send a fresh ping. if mem::take(&mut got_pong) { + // Build a future that both times out and drives the send. + // + // Note that if the send cannot immediately complete for whatever reason, + // it will wait without polling the other futures in the `select!` arms. + // Among other things, this means we won't poll the websocket for a Close frame. + // + // To avoid waiting indefinitely, we wrap the ping in a timeout. + // A timeout is treated as an unresponsive client and we drop the connection. + let ping = ws.send(WsMessage::Ping(Bytes::new())); + let ping_with_timeout = tokio::time::timeout(SEND_TIMEOUT, ping); + // Send a ping message while continuing to poll the `handle_queue`, // to avoid deadlocks or delays due to enqueued futures holding resources. - if let Err(e) = also_poll(ws.send(WsMessage::Ping(Bytes::new())), make_progress(&mut current_message)).await { - log::warn!("error sending ping: {e:#}"); + match also_poll(ping_with_timeout, make_progress(&mut current_message)).await { + Ok(Err(e)) => { + log::warn!("error sending ping: {e:#}"); + } + Err(e) => { + // Our ping timed out; drop them without trying to send them a Close + log::warn!("ping timed out after: {e}"); + break; + } + _ => {} } continue; } else { @@ -380,13 +438,22 @@ async fn ws_client_actor_inner( Item::HandleResult(res) => { if let Err(e) = res { if let MessageHandleError::Execution(err) = e { - log::error!("{err:#}"); + log::error!("reducer execution error: {err:#}"); // Serialize the message and keep a handle to the buffer. let (msg_alloc, msg_data) = serialize(msg_buffer, err, client.config); - // Buffer the message without necessarily sending it. - if let Err(error) = ws.send(datamsg_to_wsmsg(msg_data)).await { - log::warn!("Websocket send error: {error}") + let send = async { ws.send(datamsg_to_wsmsg(msg_data)).await }; + let send = tokio::time::timeout(SEND_TIMEOUT, send); + + match send.await { + Ok(Err(error)) => { + log::warn!("Websocket send error: {error}") + } + Err(error) => { + log::warn!("send timed out after: {error}"); + break; + } + _ => {} } // At this point, @@ -399,16 +466,23 @@ async fn ws_client_actor_inner( continue; } - log::debug!("Client caused error on text message: {}", e); - if let Err(e) = ws - .close(Some(CloseFrame { - code: CloseCode::Error, - reason: format!("{e:#}").into(), - })) - .await - { - log::warn!("error closing websocket: {e:#}") - }; + log::warn!("Client caused error on text message: {}", e); + let close = ws.close(Some(CloseFrame { + code: CloseCode::Error, + reason: format!("{e:#}").into(), + })); + + // Wrap the close in a timeout + match tokio::time::timeout(SEND_TIMEOUT, close).await { + Ok(Err(e)) => { + log::warn!("error closing websocket: {e:#}") + } + Err(e) => { + log::warn!("send timed out after: {e}"); + break; + } + _ => {} + } } } Item::Message(ClientMessage::Ping(_message)) => { @@ -439,6 +513,10 @@ async fn ws_client_actor_inner( .with_label_values(&addr) .inc(); } + + // Can't we just break out of the loop here? + // Not, if we want tungstenite to send a close frame back to the client. + // That will only happen once `ws.next()` returns `None`. closed = true; } }