Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions binance/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ class BinanceWebsocketClosed(Exception):
"""Raised when websocket connection is closed."""
pass

class ReadLoopClosed(Exception):
"""Raised when trying to read from read loop but already closed"""
pass

class NotImplementedException(Exception):
def __init__(self, value):
message = f"Not implemented: {value}"
Expand Down
7 changes: 7 additions & 0 deletions binance/ws/reconnecting_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
BinanceWebsocketClosed,
BinanceWebsocketUnableToConnect,
BinanceWebsocketQueueOverflow,
ReadLoopClosed,
)
from binance.helpers import get_loop
from binance.ws.constants import WSListenerState
Expand Down Expand Up @@ -247,6 +248,8 @@ async def _read_loop(self):
"m": f"{e}",
})
break
except Exception as e:
self._log.error(f"Unknown exception: {e.__class__.__name__} ({e})")
finally:
self._handle_read_loop = None # Signal the coro is stopped
self._reconnects = 0
Expand All @@ -272,6 +275,10 @@ async def _run_reconnect(self):
async def recv(self):
res = None
while not res:
if not self._handle_read_loop:
raise ReadLoopClosed(
"Read loop has been closed, please reset the websocket connection and listen to the message error."
)
try:
res = await asyncio.wait_for(self._queue.get(), timeout=self.TIMEOUT)
except asyncio.TimeoutError:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_async_client_ws_futures_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async def test_ws_futures_create_get_edit_cancel_order_with_orjson(futuresClient
type="LIMIT",
timeInForce="GTC",
quantity=0.1,
price=str(float(ticker["bidPrice"]) + 2),
price=str(float(ticker["bidPrice"]) + 5),
)
assert_contract_order(futuresClientAsync, order)
order = await futuresClientAsync.ws_futures_edit_order(
Expand Down Expand Up @@ -101,7 +101,7 @@ async def test_ws_futures_create_get_edit_cancel_order_without_orjson(futuresCli
type="LIMIT",
timeInForce="GTC",
quantity=0.1,
price=str(float(ticker["bidPrice"]) + 2),
price=str(float(ticker["bidPrice"]) + 5),
)
assert_contract_order(futuresClientAsync, order)
order = await futuresClientAsync.ws_futures_edit_order(
Expand Down
22 changes: 20 additions & 2 deletions tests/test_reconnecting_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import pytest
import gzip
import json
from unittest.mock import patch, create_autospec
from unittest.mock import patch, create_autospec, Mock
from binance.ws.reconnecting_websocket import ReconnectingWebsocket
from binance.ws.constants import WSListenerState
from binance.exceptions import BinanceWebsocketUnableToConnect
from binance.exceptions import BinanceWebsocketUnableToConnect, ReadLoopClosed
from websockets import WebSocketClientProtocol # type: ignore
from websockets.protocol import State
import asyncio
Expand Down Expand Up @@ -77,6 +77,8 @@ async def test_handle_message_invalid_json():
async def test_recv_message():
ws = ReconnectingWebsocket(url="wss://test.url")
await ws._queue.put({"test": "data"})
# Simulate the read loop being active
ws._handle_read_loop = Mock()
result = await ws.recv()
assert result == {"test": "data"}

Expand Down Expand Up @@ -206,3 +208,19 @@ async def test_connect_fails_to_connect_after_disconnect():
async def delayed_return():
await asyncio.sleep(0.1) # 100 ms delay
return '{"e": "value"}'


@pytest.mark.skipif(sys.version_info < (3, 8), reason="Requires Python 3.8+")
@pytest.mark.asyncio
async def test_recv_read_loop_closed():
"""Test that recv() raises ReadLoopClosed when read loop is closed."""
ws = ReconnectingWebsocket(url="wss://test.url")

# Simulate read loop being closed by setting _handle_read_loop to None
ws._handle_read_loop = None

with pytest.raises(ReadLoopClosed) as exc_info:
await ws.recv()

assert "Read loop has been closed" in str(exc_info.value)
assert "please reset the websocket connection" in str(exc_info.value)
Loading