Skip to content

Commit e4ca2e1

Browse files
acul71cursoragent
andauthored
fix(websocket): Raise IOException on closed connection instead of returning b"" (#1213)
* fix(websocket): Raise IOException on closed connection instead of returning b"" Fixes #1212 - WebSocket read() now raises IOException when connection is closed, allowing read_exactly() to immediately detect closure instead of retrying 100 times on empty bytes - Yamux send_window_update() gracefully handles connection closure errors during window updates (data was already read successfully) - Improved _extract_close_info() fallback for non-standard exceptions - Added exception chaining (from e) in write() for better tracebacks - Comprehensive test suite for closure handling scenarios Co-authored-by: Cursor <cursoragent@cursor.com> * refactor(exceptions): Replace fragile string matching with typed ConnectionClosedError Add ConnectionClosedError(IOException) with structured close_code, close_reason, and transport attributes. The WebSocket transport now raises this typed exception instead of a plain IOException with the close info buried in the message string. Yamux catches it by type (`except ConnectionClosedError`) instead of parsing `str(e).lower()` for closure keywords — eliminating a fragile pattern that would silently break if anyone changed an error message. A string-matching fallback is retained for transports that don't yet raise ConnectionClosedError (e.g. TCP RawConnError). Co-authored-by: Cursor <cursoragent@cursor.com> --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 28046fe commit e4ca2e1

6 files changed

Lines changed: 571 additions & 18 deletions

File tree

libp2p/io/exceptions.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,28 @@ class IOException(BaseLibp2pError):
77
pass
88

99

10+
class ConnectionClosedError(IOException):
11+
"""
12+
Raised when a connection is closed by the peer.
13+
14+
Carries structured close information so that upstream code can make
15+
decisions based on exception type (``except ConnectionClosedError``)
16+
rather than fragile string matching on the message.
17+
"""
18+
19+
def __init__(
20+
self,
21+
message: str,
22+
close_code: int | None = None,
23+
close_reason: str = "",
24+
transport: str = "",
25+
) -> None:
26+
super().__init__(message)
27+
self.close_code = close_code
28+
self.close_reason = close_reason
29+
self.transport = transport
30+
31+
1032
class IncompleteReadError(IOException):
1133
"""Fewer bytes were read than requested."""
1234

libp2p/stream_muxer/yamux/yamux.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
ISecureConn,
3232
)
3333
from libp2p.io.exceptions import (
34+
ConnectionClosedError,
3435
IncompleteReadError,
36+
IOException,
3537
)
3638
from libp2p.io.utils import (
3739
read_exactly,
@@ -161,6 +163,11 @@ async def send_window_update(self, increment: int, skip_lock: bool = False) -> N
161163
param:skip_lock (bool): If True, skips acquiring window_lock.
162164
This should only be used when calling from a context
163165
that already holds the lock.
166+
167+
Note: This method gracefully handles connection closure errors.
168+
If the connection is closed (e.g., peer closed WebSocket immediately
169+
after sending data), the window update will fail silently, allowing
170+
the read operation to complete successfully.
164171
"""
165172
if increment <= 0:
166173
# If increment is zero or negative, skip sending update
@@ -182,7 +189,42 @@ async def _do_window_update() -> None:
182189
self.stream_id,
183190
increment,
184191
)
185-
await self.conn.secured_conn.write(header)
192+
try:
193+
await self.conn.secured_conn.write(header)
194+
except ConnectionClosedError as e:
195+
# Typed exception from transports (e.g., WebSocket) that
196+
# properly signal connection closure — handle gracefully.
197+
# Connection may be closed by peer (e.g., WebSocket closed
198+
# immediately after sending data, as seen with Nim).
199+
# This is acceptable — the data was already read successfully.
200+
logger.debug(
201+
f"Stream {self.stream_id}: Window update failed due to "
202+
f"connection closure (data was already read): {e}"
203+
)
204+
return
205+
except (RawConnError, IOException) as e:
206+
# Fallback for transports that don't yet raise
207+
# ConnectionClosedError (e.g., TCP RawConnError).
208+
error_str = str(e).lower()
209+
if any(
210+
keyword in error_str
211+
for keyword in [
212+
"connection closed",
213+
"closed by peer",
214+
"connection is closed",
215+
]
216+
):
217+
logger.debug(
218+
f"Stream {self.stream_id}: Window update failed due to "
219+
f"connection closure (data was already read): {e}"
220+
)
221+
return
222+
# Re-raise if it's a different error we don't expect
223+
logger.warning(
224+
f"Stream {self.stream_id}: Unexpected error during "
225+
f"window update: {e}"
226+
)
227+
raise
186228

187229
if skip_lock:
188230
await _do_window_update()

libp2p/transport/websocket/connection.py

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from trio_websocket import WebSocketConnection
1111

1212
from libp2p.io.abc import ReadWriteCloser
13-
from libp2p.io.exceptions import IOException
13+
from libp2p.io.exceptions import ConnectionClosedError, IOException
1414

1515
logger = logging.getLogger(__name__)
1616

@@ -194,31 +194,40 @@ def _extract_close_info(self, e: Exception) -> tuple[int | None, str]:
194194
Tuple of (close_code, close_reason)
195195
196196
"""
197-
# ConnectionClosed has a 'reason' attribute which is a CloseReason object
197+
# ConnectionClosed has a 'reason' attribute which is a CloseReason object.
198+
# Some exceptions (like mocks in tests) may have code/reason directly.
198199
close_reason_obj = getattr(e, "reason", None)
199-
if close_reason_obj is not None:
200+
201+
# Check if reason is a CloseReason object (has 'code' attribute)
202+
if close_reason_obj is not None and hasattr(close_reason_obj, "code"):
200203
close_code = getattr(close_reason_obj, "code", None)
201204
close_reason = (
202205
getattr(close_reason_obj, "reason", None) or "Connection closed by peer"
203206
)
204207
else:
205-
# Fallback if reason is not available
206-
close_code = None
207-
close_reason = "Connection closed by peer"
208+
# Fallback: check if code and reason are directly on the exception
209+
# (for mock exceptions in tests or other exception types)
210+
close_code = getattr(e, "code", None)
211+
close_reason = getattr(e, "reason", None) or "Connection closed by peer"
208212
return close_code, close_reason
209213

210214
def _handle_connection_closed_exception(
211215
self, e: Exception, operation: str = "read"
212-
) -> IOException:
216+
) -> ConnectionClosedError:
213217
"""
214-
Handle a connection closure exception by creating an appropriate IOException.
218+
Handle a connection closure exception by creating a ConnectionClosedError.
219+
220+
Returns a ``ConnectionClosedError`` (subclass of ``IOException``) that
221+
carries the close code and reason as structured attributes. This lets
222+
upstream code (e.g. yamux) catch connection closures by *type* instead
223+
of doing fragile string matching on the error message.
215224
216225
Args:
217226
e: The exception that indicates connection closure
218227
operation: The operation that was being performed (read/write)
219228
220229
Returns:
221-
IOException with detailed information about the closure
230+
ConnectionClosedError with close_code and close_reason attributes
222231
223232
"""
224233
self._closed = True
@@ -227,13 +236,17 @@ def _handle_connection_closed_exception(
227236
f"WebSocket connection closed during {operation}: "
228237
f"code={close_code}, reason={close_reason}"
229238
)
230-
# Return IOException to be raised by caller
231-
# This allows read_exactly() to immediately detect connection closure
232-
# instead of returning empty bytes which would cause retries
233-
return IOException(
239+
# Return ConnectionClosedError (subclass of IOException) to be raised
240+
# by caller. This allows read_exactly() to immediately detect
241+
# connection closure instead of returning empty bytes which would cause
242+
# retries, and lets yamux catch the typed exception directly.
243+
return ConnectionClosedError(
234244
f"WebSocket connection closed by peer during "
235245
f"{operation} operation: code={close_code}, "
236-
f"reason={close_reason}."
246+
f"reason={close_reason}.",
247+
close_code=close_code,
248+
close_reason=close_reason,
249+
transport="websocket",
237250
)
238251

239252
async def _start_keepalive(self) -> None:
@@ -284,8 +297,10 @@ async def read(self, n: int | None = None) -> bytes:
284297
285298
"""
286299
if self._closed:
287-
# Return empty bytes to signal EOF (like TCP does)
288-
return b""
300+
# Raise IOException immediately when connection is closed.
301+
# This allows read_exactly() to detect closure immediately
302+
# instead of retrying up to 100 times on empty bytes.
303+
raise IOException("Connection is closed")
289304

290305
async with self._read_lock:
291306
try:
@@ -427,6 +442,9 @@ async def read(self, n: int | None = None) -> bytes:
427442
# Re-raise IOException as-is (already has proper context)
428443
raise
429444
except Exception as e:
445+
# Handle connection closure missed by inner handlers
446+
if self._is_connection_closed_exception(e):
447+
raise self._handle_connection_closed_exception(e, "read")
430448
logger.error(f"WebSocket read failed: {e}")
431449
raise IOException(f"Read failed: {str(e)}")
432450

@@ -464,7 +482,7 @@ async def write(self, data: bytes) -> None:
464482
raise self._handle_connection_closed_exception(e, "write")
465483
logger.error(f"WebSocket write failed: {e}")
466484
self._closed = True
467-
raise IOException(f"Write failed: {str(e)}")
485+
raise IOException(f"Write failed: {str(e)}") from e
468486

469487
async def close(self) -> None:
470488
"""

newsfragments/1212.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed WebSocket transport to immediately raise ``IOException`` on connection closure instead of returning empty bytes, preventing retry loops in ``read_exactly()``. Also added graceful error handling in yamux ``send_window_update()`` for connections closed by peers during window updates.

0 commit comments

Comments
 (0)