diff --git a/binance/exceptions.py b/binance/exceptions.py index dc04fc6f2..ec84d6518 100644 --- a/binance/exceptions.py +++ b/binance/exceptions.py @@ -81,6 +81,10 @@ class BinanceWebsocketClosed(Exception): """Raised when websocket connection is closed.""" pass +class ReadLoopClosed(Exception): + """Raised when trying to read from read loop but already closed""" + pass + class NotImplementedException(Exception): def __init__(self, value): message = f"Not implemented: {value}" diff --git a/binance/ws/reconnecting_websocket.py b/binance/ws/reconnecting_websocket.py index 27a0112f2..b39cb2a4e 100644 --- a/binance/ws/reconnecting_websocket.py +++ b/binance/ws/reconnecting_websocket.py @@ -36,6 +36,7 @@ BinanceWebsocketClosed, BinanceWebsocketUnableToConnect, BinanceWebsocketQueueOverflow, + ReadLoopClosed, ) from binance.helpers import get_loop from binance.ws.constants import WSListenerState @@ -247,6 +248,8 @@ async def _read_loop(self): "m": f"{e}", }) break + except Exception as e: + self._log.error(f"Unknown exception: {e.__class__.__name__} ({e})") finally: self._handle_read_loop = None # Signal the coro is stopped self._reconnects = 0 @@ -272,6 +275,10 @@ async def _run_reconnect(self): async def recv(self): res = None while not res: + if not self._handle_read_loop: + raise ReadLoopClosed( + "Read loop has been closed, please reset the websocket connection and listen to the message error." + ) try: res = await asyncio.wait_for(self._queue.get(), timeout=self.TIMEOUT) except asyncio.TimeoutError: diff --git a/tests/test_async_client_ws_futures_requests.py b/tests/test_async_client_ws_futures_requests.py index db976aec3..cde6fcfbe 100644 --- a/tests/test_async_client_ws_futures_requests.py +++ b/tests/test_async_client_ws_futures_requests.py @@ -68,7 +68,7 @@ async def test_ws_futures_create_get_edit_cancel_order_with_orjson(futuresClient type="LIMIT", timeInForce="GTC", quantity=0.1, - price=str(float(ticker["bidPrice"]) + 2), + price=str(float(ticker["bidPrice"]) + 5), ) assert_contract_order(futuresClientAsync, order) order = await futuresClientAsync.ws_futures_edit_order( @@ -101,7 +101,7 @@ async def test_ws_futures_create_get_edit_cancel_order_without_orjson(futuresCli type="LIMIT", timeInForce="GTC", quantity=0.1, - price=str(float(ticker["bidPrice"]) + 2), + price=str(float(ticker["bidPrice"]) + 5), ) assert_contract_order(futuresClientAsync, order) order = await futuresClientAsync.ws_futures_edit_order( diff --git a/tests/test_reconnecting_websocket.py b/tests/test_reconnecting_websocket.py index b294c2212..7433a411e 100644 --- a/tests/test_reconnecting_websocket.py +++ b/tests/test_reconnecting_websocket.py @@ -2,10 +2,10 @@ import pytest import gzip import json -from unittest.mock import patch, create_autospec +from unittest.mock import patch, create_autospec, Mock from binance.ws.reconnecting_websocket import ReconnectingWebsocket from binance.ws.constants import WSListenerState -from binance.exceptions import BinanceWebsocketUnableToConnect +from binance.exceptions import BinanceWebsocketUnableToConnect, ReadLoopClosed from websockets import WebSocketClientProtocol # type: ignore from websockets.protocol import State import asyncio @@ -77,6 +77,8 @@ async def test_handle_message_invalid_json(): async def test_recv_message(): ws = ReconnectingWebsocket(url="wss://test.url") await ws._queue.put({"test": "data"}) + # Simulate the read loop being active + ws._handle_read_loop = Mock() result = await ws.recv() assert result == {"test": "data"} @@ -206,3 +208,19 @@ async def test_connect_fails_to_connect_after_disconnect(): async def delayed_return(): await asyncio.sleep(0.1) # 100 ms delay return '{"e": "value"}' + + +@pytest.mark.skipif(sys.version_info < (3, 8), reason="Requires Python 3.8+") +@pytest.mark.asyncio +async def test_recv_read_loop_closed(): + """Test that recv() raises ReadLoopClosed when read loop is closed.""" + ws = ReconnectingWebsocket(url="wss://test.url") + + # Simulate read loop being closed by setting _handle_read_loop to None + ws._handle_read_loop = None + + with pytest.raises(ReadLoopClosed) as exc_info: + await ws.recv() + + assert "Read loop has been closed" in str(exc_info.value) + assert "please reset the websocket connection" in str(exc_info.value)