Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 105 additions & 27 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ where
}

const LIVELINESS_TIMEOUT: Duration = Duration::from_secs(60);
const SEND_TIMEOUT: Duration = Duration::from_secs(5);

async fn ws_client_actor(client: ClientConnection, ws: WebSocketStream, sendrx: MeteredReceiver<SerializableMessage>) {
// ensure that even if this task gets cancelled, we always cleanup the connection
Expand Down Expand Up @@ -266,12 +267,12 @@ async fn ws_client_actor_inner(
Some(Ok(m)) => Item::Message(ClientMessage::from_message(m)),
Some(Err(error)) => {
log::warn!("Websocket receive error: {}", error);
continue;
break;
}
// the client sent us a close frame
None => {
break
},
break;
}
},

// If we have an outgoing message to send, send it off.
Expand Down Expand Up @@ -311,11 +312,29 @@ async fn ws_client_actor_inner(
// now we flush all the messages to the socket
(ws.flush().await, msg_buffer)
};
// Build a future that both times out and drives the send.
//
// Note that if flushing cannot immediately complete for whatever reason,
// it will wait without polling the other futures in the `select!` arms.
// Among other things, this means our liveness tick will not be polled.
//
// To avoid waiting indefinitely, we wrap the send in a timeout.
// A timeout is treated as an unresponsive client and we drop the connection.
let send_all = tokio::time::timeout(SEND_TIMEOUT, send_all);
// Flush the websocket while continuing to poll the `handle_queue`,
// to avoid deadlocks or delays due to enqueued futures holding resources.
let send_all = also_poll(send_all, make_progress(&mut current_message));
let t1 = Instant::now();
let (send_all_result, buf) = send_all.await;
let (send_all_result, buf) = match send_all.await {
Ok((send_all_result, buf)) => {
(send_all_result, buf)
}
Err(e) => {
// Our send timed out; drop client without trying to send them a Close
log::warn!("send_all timed out: {e}");
break;
}
};
msg_buffer = buf;
if let Err(error) = send_all_result {
log::warn!("Websocket send error: {error}")
Expand All @@ -335,13 +354,33 @@ async fn ws_client_actor_inner(
Err(NoSuchModule) => {
// Send a close frame while continuing to poll the `handle_queue`,
// to avoid deadlocks or delays due to enqueued futures holding resources.
let close = also_poll(
ws.close(Some(CloseFrame { code: CloseCode::Away, reason: "module exited".into() })),
make_progress(&mut current_message),
);
if let Err(e) = close.await {
log::warn!("error closing: {e:#}")
}
let close = ws.close(Some(CloseFrame { code: CloseCode::Away, reason: "module exited".into() }));
// Wrap the close in a timeout
let close = tokio::time::timeout(SEND_TIMEOUT, close);
match also_poll(close, make_progress(&mut current_message)).await {
Ok(Err(e)) => {
log::warn!("error closing websocket: {e:#}")
}
Err(e) => {
// Our send timed out; drop client without trying to send them a Close.
//
// Is it correct to break if a reducer is still in progress?
// Answer: Yes it is.
//
// If a reducer is currently being executed,
// we are waiting for the `current_message` future to complete.
// When we break, the task completes and this future is dropped.
//
// Notably though the reducer itself will run to completion,
// however when it tries to notify this task that it is done,
// it will encounter a closed sender in `JobThread::run`,
// dropping the value that it's trying to send.
// In particular it will not throw an error or panic.
log::warn!("websocket close timed out: {e}");
break;
}
_ => {}
};
closed = true;
}
}
Expand All @@ -352,10 +391,29 @@ async fn ws_client_actor_inner(
_ = liveness_check_interval.tick() => {
// If we received a pong at some point, send a fresh ping.
if mem::take(&mut got_pong) {
// Build a future that both times out and drives the send.
//
// Note that if the send cannot immediately complete for whatever reason,
// it will wait without polling the other futures in the `select!` arms.
// Among other things, this means we won't poll the websocket for a Close frame.
//
// To avoid waiting indefinitely, we wrap the ping in a timeout.
// A timeout is treated as an unresponsive client and we drop the connection.
let ping = ws.send(WsMessage::Ping(Bytes::new()));
let ping_with_timeout = tokio::time::timeout(SEND_TIMEOUT, ping);

// Send a ping message while continuing to poll the `handle_queue`,
// to avoid deadlocks or delays due to enqueued futures holding resources.
if let Err(e) = also_poll(ws.send(WsMessage::Ping(Bytes::new())), make_progress(&mut current_message)).await {
log::warn!("error sending ping: {e:#}");
match also_poll(ping_with_timeout, make_progress(&mut current_message)).await {
Ok(Err(e)) => {
log::warn!("error sending ping: {e:#}");
}
Err(e) => {
// Our ping timed out; drop them without trying to send them a Close
log::warn!("ping timed out after: {e}");
break;
}
_ => {}
}
continue;
} else {
Expand All @@ -380,13 +438,22 @@ async fn ws_client_actor_inner(
Item::HandleResult(res) => {
if let Err(e) = res {
if let MessageHandleError::Execution(err) = e {
log::error!("{err:#}");
log::error!("reducer execution error: {err:#}");
// Serialize the message and keep a handle to the buffer.
let (msg_alloc, msg_data) = serialize(msg_buffer, err, client.config);

// Buffer the message without necessarily sending it.
if let Err(error) = ws.send(datamsg_to_wsmsg(msg_data)).await {
log::warn!("Websocket send error: {error}")
let send = async { ws.send(datamsg_to_wsmsg(msg_data)).await };
let send = tokio::time::timeout(SEND_TIMEOUT, send);

match send.await {
Ok(Err(error)) => {
log::warn!("Websocket send error: {error}")
}
Err(error) => {
log::warn!("send timed out after: {error}");
break;
}
_ => {}
}

// At this point,
Expand All @@ -399,16 +466,23 @@ async fn ws_client_actor_inner(

continue;
}
log::debug!("Client caused error on text message: {}", e);
if let Err(e) = ws
.close(Some(CloseFrame {
code: CloseCode::Error,
reason: format!("{e:#}").into(),
}))
.await
{
log::warn!("error closing websocket: {e:#}")
};
log::warn!("Client caused error on text message: {}", e);
let close = ws.close(Some(CloseFrame {
code: CloseCode::Error,
reason: format!("{e:#}").into(),
}));

// Wrap the close in a timeout
match tokio::time::timeout(SEND_TIMEOUT, close).await {
Ok(Err(e)) => {
log::warn!("error closing websocket: {e:#}")
}
Err(e) => {
log::warn!("send timed out after: {e}");
break;
}
_ => {}
}
}
}
Item::Message(ClientMessage::Ping(_message)) => {
Expand Down Expand Up @@ -439,6 +513,10 @@ async fn ws_client_actor_inner(
.with_label_values(&addr)
.inc();
}

// Can't we just break out of the loop here?
// Not, if we want tungstenite to send a close frame back to the client.
// That will only happen once `ws.next()` returns `None`.
closed = true;
}
}
Expand Down
Loading