Skip to content

Commit 2944b50

Browse files
committed
cli: Close the websocket connection gracefully
The `subscribe` command would drop the connection without sending a close frame. Doing so creates a warning on the server, which can be distracting when debugging other connections issues.
1 parent c3cfadb commit 2944b50

1 file changed

Lines changed: 15 additions & 15 deletions

File tree

crates/cli/src/subcommands/subscribe.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use spacetimedb_lib::ser::serde::SerializeWrapper;
1212
use std::time::Duration;
1313
use tokio::io::AsyncWriteExt;
1414
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
15-
use tokio_tungstenite::tungstenite::Message as WsMessage;
15+
use tokio_tungstenite::tungstenite::{Error as WsError, Message as WsMessage};
1616

1717
use crate::api::ClientApi;
1818
use crate::common_args;
@@ -158,29 +158,33 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
158158
if let Some(auth_header) = api.con.auth_header.to_header() {
159159
req.headers_mut().insert(header::AUTHORIZATION, auth_header);
160160
}
161-
let (mut ws, _) = tokio_tungstenite::connect_async(req).await?;
161+
let mut ws = tokio_tungstenite::connect_async(req).await.map(|(ws, _)| ws)?;
162162

163163
let task = async {
164164
subscribe(&mut ws, queries.cloned().map(Into::into).collect()).await?;
165165
await_initial_update(&mut ws, print_initial_update.then_some(&module_def)).await?;
166166
consume_transaction_updates(&mut ws, num, &module_def).await
167167
};
168168

169-
let needs_shutdown = if let Some(timeout) = timeout {
169+
let res = if let Some(timeout) = timeout {
170170
let timeout = Duration::from_secs(timeout.into());
171171
match tokio::time::timeout(timeout, task).await {
172-
Ok(res) => res?,
173-
Err(_elapsed) => true,
172+
Ok(res) => res,
173+
Err(_elapsed) => {
174+
eprintln!("timed out after {}s", timeout.as_secs());
175+
Ok(())
176+
}
174177
}
175178
} else {
176-
task.await?
179+
task.await
177180
};
178181

179-
if needs_shutdown {
182+
// Close the connection gracefully, unless it's a websocket error.
183+
if !res.as_ref().is_err_and(|e| e.downcast_ref::<WsError>().is_some()) {
180184
ws.close(None).await?;
181185
}
182186

183-
Ok(())
187+
res
184188
}
185189

186190
/// Send the subscribe message.
@@ -234,11 +238,7 @@ where
234238

235239
/// Print `num` [`ServerMessage::TransactionUpdate`] messages as JSON.
236240
/// If `num` is `None`, keep going indefinitely.
237-
async fn consume_transaction_updates<S>(
238-
ws: &mut S,
239-
num: Option<u32>,
240-
module_def: &RawModuleDefV9,
241-
) -> anyhow::Result<bool>
241+
async fn consume_transaction_updates<S>(ws: &mut S, num: Option<u32>, module_def: &RawModuleDefV9) -> anyhow::Result<()>
242242
where
243243
S: TryStream<Ok = WsMessage> + Unpin,
244244
S::Error: std::error::Error + Send + Sync + 'static,
@@ -247,11 +247,11 @@ where
247247
let mut num_received = 0;
248248
loop {
249249
if num.is_some_and(|n| num_received >= n) {
250-
break Ok(true);
250+
break Ok(());
251251
}
252252
let Some(msg) = ws.try_next().await? else {
253253
eprintln!("disconnected by server");
254-
break Ok(false);
254+
break Ok(());
255255
};
256256

257257
let Some(msg) = parse_msg_json(&msg) else { continue };

0 commit comments

Comments
 (0)