Skip to content

Commit 64cc42e

Browse files
authored
fix(fullstack): prevent AlreadyBorrowed panic in WebSocket recv() (#5300)
1 parent d75d964 commit 64cc42e

4 files changed

Lines changed: 82 additions & 53 deletions

File tree

examples/07-fullstack/websocket.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,28 +24,38 @@ fn main() {
2424
}
2525

2626
fn app() -> Element {
27-
// Track the messages we've received from the server.
2827
let mut messages = use_signal(std::vec::Vec::new);
2928

30-
// The `use_websocket` wraps the `WebSocket` connection and provides a reactive handle to easily
31-
// send and receive messages and track the connection state.
32-
//
33-
// We can customize the websocket connection with the `WebSocketOptions` struct, allowing us to
34-
// set things like custom headers, protocols, reconnection strategies, etc.
35-
let mut socket = use_websocket(|| uppercase_ws("John Doe".into(), 30, WebSocketOptions::new()));
29+
// This signal is read inside the use_websocket closure, making it a reactive dependency.
30+
// Whenever it changes, the websocket will automatically re-connect.
31+
let mut name = use_signal(|| "John Doe".to_string());
32+
33+
let mut socket =
34+
use_websocket(move || uppercase_ws(name.cloned(), 30, WebSocketOptions::new()));
3635

37-
// Calling `.recv()` automatically waits for the connection to be established and deserializes
38-
// messages as they arrive.
3936
use_future(move || async move {
40-
while let Ok(msg) = socket.recv().await {
41-
messages.push(msg);
37+
loop {
38+
// Wait for the socket to connect
39+
_ = socket.connect().await;
40+
41+
// Loop poll with recv. Throws an error when the connection closes, making it possible
42+
// to run code before the socket re-connects when the name input changes
43+
while let Ok(msg) = socket.recv().await {
44+
messages.push(msg);
45+
}
4246
}
4347
});
4448

4549
rsx! {
4650
h1 { "WebSocket Example" }
4751
p { "Type a message and see it echoed back in uppercase!" }
4852
p { "Connection status: {socket.status():?}" }
53+
p { "Change your name to trigger a websocket re-connect" }
54+
input {
55+
placeholder: "Your name",
56+
value: "{name}",
57+
oninput: move |e| name.set(e.value()),
58+
}
4959
input {
5060
placeholder: "Type a message",
5161
oninput: move |e| async move { _ = socket.send(ClientEvent::TextInput(e.value())).await; },

packages/cli/src/serve/proxy_ws.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ pub(crate) async fn proxy_websocket(
2929
match handle_ws_connection(client_ws, proxied_request).await {
3030
Ok(()) => tracing::trace!(dx_src = ?TraceSrc::Dev, "Websocket connection closed"),
3131
Err(e) => {
32-
tracing::error!(dx_src = ?TraceSrc::Dev, "Error proxying websocket connection: {e}")
32+
// Connection resets during shutdown are expected and not worth logging as errors
33+
tracing::debug!(dx_src = ?TraceSrc::Dev, "Error proxying websocket connection: {e}")
3334
}
3435
}
3536
}))

packages/fullstack/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ content_disposition = "0.4.0"
8484
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
8585
async-tungstenite = { version = "0.31.0", default-features = false, features = ["futures-03-sink"], optional = true }
8686
tungstenite = { version = "0.27", default-features = false, features = ["handshake"], optional = true }
87-
tokio-util = { workspace = true, features = ["codec", "compat"] }
87+
tokio-util = { workspace = true, features = ["codec", "compat", "rt"] }
8888

8989
[dev-dependencies]
9090
dioxus = { workspace = true, features = ["fullstack", "router"] }

packages/fullstack/src/payloads/websocket.rs

Lines changed: 58 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use std::{
4242
marker::PhantomData,
4343
pin::Pin,
4444
prelude::rust_2024::Future,
45+
rc::Rc,
4546
task::{ready, Context, Poll},
4647
};
4748

@@ -89,7 +90,9 @@ pub fn use_websocket<
8990
// Wake up the `.recv()` calls waiting for the connection to be established
9091
waker.wake(());
9192

92-
connection
93+
// Wrap in Rc so we can clone it out of the Resource without holding
94+
// a borrow guard across await points
95+
connection.map(Rc::new)
9396
}
9497
});
9598

@@ -113,7 +116,8 @@ where
113116
Out: 'static,
114117
Enc: 'static,
115118
{
116-
connection: Resource<Result<Websocket<In, Out, Enc>, CapturedError>>,
119+
#[allow(clippy::type_complexity)]
120+
connection: Resource<Result<Rc<Websocket<In, Out, Enc>>, CapturedError>>,
117121
waker: UseWaker<()>,
118122
status: Signal<WebsocketState>,
119123
status_read: ReadSignal<WebsocketState>,
@@ -161,38 +165,35 @@ impl<In, Out, E> UseWebsocket<In, Out, E> {
161165
/// To send a message with a particular type, see the `.send()` method instead.
162166
pub async fn send_raw(&self, msg: Message) -> Result<(), WebsocketError> {
163167
self.connect().await;
164-
165-
self.connection
166-
.as_ref()
167-
.as_deref()
168-
.ok_or_else(WebsocketError::closed_away)?
169-
.as_ref()
170-
.map_err(|_| WebsocketError::AlreadyClosed)?
171-
.send_raw(msg)
172-
.await
168+
self.get_connection()?.send_raw(msg).await
173169
}
174170

175171
/// Receive a raw message from the WebSocket connection
176172
///
177173
/// To receive a message with a particular type, see the `.recv()` method instead.
178174
pub async fn recv_raw(&mut self) -> Result<Message, WebsocketError> {
179175
self.connect().await;
180-
181-
let result = self
182-
.connection
183-
.as_ref()
184-
.as_deref()
185-
.ok_or_else(WebsocketError::closed_away)?
186-
.as_ref()
187-
.map_err(|_| WebsocketError::AlreadyClosed)?
188-
.recv_raw()
189-
.await;
190-
191-
if let Err(WebsocketError::ConnectionClosed { .. }) = result.as_ref() {
192-
self.received_shutdown();
176+
let ws = self.get_connection()?;
177+
178+
// Race the recv against the waker — if the connection is being recreated
179+
// (e.g. a reactive dependency changed), the waker fires and we return an error
180+
// so the caller's loop can restart and pick up the new connection.
181+
let recv_fut = ws.recv_raw();
182+
let waker_fut = self.waker.wait();
183+
futures::pin_mut!(recv_fut, waker_fut);
184+
185+
match futures::future::select(recv_fut, waker_fut).await {
186+
futures::future::Either::Left((recv_result, _)) => {
187+
if let Err(WebsocketError::ConnectionClosed { .. }) = recv_result.as_ref() {
188+
self.received_shutdown();
189+
}
190+
recv_result
191+
}
192+
futures::future::Either::Right(_) => Err(WebsocketError::ConnectionClosed {
193+
code: CloseCode::Away,
194+
description: "Connection replaced by a new one".to_string(),
195+
}),
193196
}
194-
195-
result
196197
}
197198

198199
pub async fn send(&self, msg: In) -> Result<(), WebsocketError>
@@ -219,22 +220,24 @@ impl<In, Out, E> UseWebsocket<In, Out, E> {
219220
E: Encoding,
220221
{
221222
self.connect().await;
223+
let ws = self.get_connection()?;
222224

223-
let result = self
224-
.connection
225-
.as_ref()
226-
.as_deref()
227-
.ok_or_else(WebsocketError::closed_away)?
228-
.as_ref()
229-
.map_err(|_| WebsocketError::AlreadyClosed)?
230-
.recv()
231-
.await;
225+
let recv_fut = ws.recv();
226+
let waker_fut = self.waker.wait();
227+
futures::pin_mut!(recv_fut, waker_fut);
232228

233-
if let Err(WebsocketError::ConnectionClosed { .. }) = result.as_ref() {
234-
self.received_shutdown();
229+
match futures::future::select(recv_fut, waker_fut).await {
230+
futures::future::Either::Left((recv_result, _)) => {
231+
if let Err(WebsocketError::ConnectionClosed { .. }) = recv_result.as_ref() {
232+
self.received_shutdown();
233+
}
234+
recv_result
235+
}
236+
futures::future::Either::Right(_) => Err(WebsocketError::ConnectionClosed {
237+
code: CloseCode::Away,
238+
description: "Connection replaced by a new one".to_string(),
239+
}),
235240
}
236-
237-
result
238241
}
239242

240243
/// Set the WebSocket connection.
@@ -247,7 +250,8 @@ impl<In, Out, E> UseWebsocket<In, Out, E> {
247250
Err(_) => self.status.set(WebsocketState::FailedToConnect),
248251
}
249252

250-
self.connection.set(Some(socket.map_err(|e| e.into())));
253+
self.connection
254+
.set(Some(socket.map(Rc::new).map_err(|e| e.into())));
251255
self.waker.wake(());
252256
}
253257

@@ -257,6 +261,20 @@ impl<In, Out, E> UseWebsocket<In, Out, E> {
257261
_self.status.set(WebsocketState::Closed);
258262
_self.waker.wake(());
259263
}
264+
265+
/// Clone the `Rc<Websocket>` out of the Resource using peek, so we don't hold a borrow
266+
/// guard across await points. This prevents AlreadyBorrowed panics when the Resource
267+
/// tries to write while recv() is awaiting.
268+
#[allow(clippy::result_large_err)]
269+
fn get_connection(&self) -> Result<Rc<Websocket<In, Out, E>>, WebsocketError> {
270+
self.connection.with_peek(|opt| {
271+
opt.as_ref()
272+
.ok_or_else(WebsocketError::closed_away)?
273+
.as_ref()
274+
.map(Rc::clone)
275+
.map_err(|_| WebsocketError::AlreadyClosed)
276+
})
277+
}
260278
}
261279

262280
impl<In, Out, E> Copy for UseWebsocket<In, Out, E> {}

0 commit comments

Comments
 (0)