Skip to content

Commit c655ec8

Browse files
committed
ws client queue_maxsize hardening
1 parent d947cac commit c655ec8

7 files changed

Lines changed: 48 additions & 34 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ All channel classes accept the following optional keyword arguments:
588588
| `auto_reconnect` | `bool` | `False` | Automatically reconnect on transient failures using exponential backoff. |
589589
| `max_reconnect_attempts` | `int` | `5` | Maximum number of reconnect attempts before giving up. |
590590
| `reconnect_backoff` | `float` | `2.0` | Base backoff delay in seconds. Doubles after each failed attempt (2s, 4s, 8s, ...). Resets on successful reconnection. |
591-
| `queue_maxsize` | `int` | `0` | Maximum messages buffered in the internal queue for `receive()`. `0` means unbounded. When set, the receive thread blocks if the queue is full, providing backpressure for high-frequency streams. |
591+
| `queue_maxsize` | `int` | `0` | Maximum messages buffered in the internal queue for `receive()`. `0` means unbounded. When set, incoming messages are dropped with a warning when the queue is full, preventing memory growth on high-frequency streams. |
592592

593593
```python
594594
ws = mistapi.websockets.sites.DeviceStatsEvents(

src/mistapi/websockets/__ws_client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,10 @@ def _handle_message(self, ws: websocket.WebSocketApp, message: str | bytes) -> N
196196
except Exception:
197197
logger.exception("on_message callback raised")
198198
else:
199-
self._queue.put(data)
199+
try:
200+
self._queue.put_nowait(data)
201+
except queue.Full:
202+
logger.warning("Receive queue full; dropping message")
200203

201204
def _handle_error(self, ws: websocket.WebSocketApp, error: Exception) -> None:
202205
if self._on_error_cb:

src/mistapi/websockets/location.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ class BleAssetsEvents(_MistWebsocket):
4242
queue_maxsize : int, default 0
4343
Maximum number of messages buffered in the internal queue for the
4444
``receive()`` generator. ``0`` means unbounded. When set, the
45-
websocket-client receive thread blocks if the queue is full,
46-
providing backpressure for high-frequency streams.
45+
incoming messages are dropped with a warning when the queue is
46+
full, preventing memory growth on high-frequency streams.
4747
4848
EXAMPLE
4949
-----------
@@ -122,8 +122,8 @@ class ConnectedClientsEvents(_MistWebsocket):
122122
queue_maxsize : int, default 0
123123
Maximum number of messages buffered in the internal queue for the
124124
``receive()`` generator. ``0`` means unbounded. When set, the
125-
websocket-client receive thread blocks if the queue is full,
126-
providing backpressure for high-frequency streams.
125+
incoming messages are dropped with a warning when the queue is
126+
full, preventing memory growth on high-frequency streams.
127127
128128
EXAMPLE
129129
-----------
@@ -202,8 +202,8 @@ class SdkClientsEvents(_MistWebsocket):
202202
queue_maxsize : int, default 0
203203
Maximum number of messages buffered in the internal queue for the
204204
``receive()`` generator. ``0`` means unbounded. When set, the
205-
websocket-client receive thread blocks if the queue is full,
206-
providing backpressure for high-frequency streams.
205+
incoming messages are dropped with a warning when the queue is
206+
full, preventing memory growth on high-frequency streams.
207207
208208
EXAMPLE
209209
-----------
@@ -282,8 +282,8 @@ class UnconnectedClientsEvents(_MistWebsocket):
282282
queue_maxsize : int, default 0
283283
Maximum number of messages buffered in the internal queue for the
284284
``receive()`` generator. ``0`` means unbounded. When set, the
285-
websocket-client receive thread blocks if the queue is full,
286-
providing backpressure for high-frequency streams.
285+
incoming messages are dropped with a warning when the queue is
286+
full, preventing memory growth on high-frequency streams.
287287
288288
EXAMPLE
289289
-----------
@@ -364,8 +364,8 @@ class DiscoveredBleAssetsEvents(_MistWebsocket):
364364
queue_maxsize : int, default 0
365365
Maximum number of messages buffered in the internal queue for the
366366
``receive()`` generator. ``0`` means unbounded. When set, the
367-
websocket-client receive thread blocks if the queue is full,
368-
providing backpressure for high-frequency streams.
367+
incoming messages are dropped with a warning when the queue is
368+
full, preventing memory growth on high-frequency streams.
369369
370370
EXAMPLE
371371
-----------

src/mistapi/websockets/orgs.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ class InsightsEvents(_MistWebsocket):
4040
queue_maxsize : int, default 0
4141
Maximum number of messages buffered in the internal queue for the
4242
``receive()`` generator. ``0`` means unbounded. When set, the
43-
websocket-client receive thread blocks if the queue is full,
44-
providing backpressure for high-frequency streams.
43+
incoming messages are dropped with a warning when the queue is
44+
full, preventing memory growth on high-frequency streams.
4545
4646
EXAMPLE
4747
-----------
@@ -116,8 +116,8 @@ class MxEdgesStatsEvents(_MistWebsocket):
116116
queue_maxsize : int, default 0
117117
Maximum number of messages buffered in the internal queue for the
118118
``receive()`` generator. ``0`` means unbounded. When set, the
119-
websocket-client receive thread blocks if the queue is full,
120-
providing backpressure for high-frequency streams.
119+
incoming messages are dropped with a warning when the queue is
120+
full, preventing memory growth on high-frequency streams.
121121
122122
EXAMPLE
123123
-----------
@@ -192,8 +192,8 @@ class MxEdgesEvents(_MistWebsocket):
192192
queue_maxsize : int, default 0
193193
Maximum number of messages buffered in the internal queue for the
194194
``receive()`` generator. ``0`` means unbounded. When set, the
195-
websocket-client receive thread blocks if the queue is full,
196-
providing backpressure for high-frequency streams.
195+
incoming messages are dropped with a warning when the queue is
196+
full, preventing memory growth on high-frequency streams.
197197
198198
EXAMPLE
199199
-----------

src/mistapi/websockets/session.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ class SessionWithUrl(_MistWebsocket):
4747
queue_maxsize : int, default 0
4848
Maximum number of messages buffered in the internal queue for the
4949
``receive()`` generator. ``0`` means unbounded. When set, the
50-
websocket-client receive thread blocks if the queue is full,
51-
providing backpressure for high-frequency streams.
50+
incoming messages are dropped with a warning when the queue is
51+
full, preventing memory growth on high-frequency streams.
5252
5353
EXAMPLE
5454
-----------

src/mistapi/websockets/sites.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ class ClientsStatsEvents(_MistWebsocket):
4040
queue_maxsize : int, default 0
4141
Maximum number of messages buffered in the internal queue for the
4242
``receive()`` generator. ``0`` means unbounded. When set, the
43-
websocket-client receive thread blocks if the queue is full,
44-
providing backpressure for high-frequency streams.
43+
incoming messages are dropped with a warning when the queue is
44+
full, preventing memory growth on high-frequency streams.
4545
4646
EXAMPLE
4747
-----------
@@ -125,8 +125,8 @@ class DeviceCmdEvents(_MistWebsocket):
125125
queue_maxsize : int, default 0
126126
Maximum number of messages buffered in the internal queue for the
127127
``receive()`` generator. ``0`` means unbounded. When set, the
128-
websocket-client receive thread blocks if the queue is full,
129-
providing backpressure for high-frequency streams.
128+
incoming messages are dropped with a warning when the queue is
129+
full, preventing memory growth on high-frequency streams.
130130
131131
EXAMPLE
132132
-----------
@@ -205,8 +205,8 @@ class DeviceStatsEvents(_MistWebsocket):
205205
queue_maxsize : int, default 0
206206
Maximum number of messages buffered in the internal queue for the
207207
``receive()`` generator. ``0`` means unbounded. When set, the
208-
websocket-client receive thread blocks if the queue is full,
209-
providing backpressure for high-frequency streams.
208+
incoming messages are dropped with a warning when the queue is
209+
full, preventing memory growth on high-frequency streams.
210210
211211
EXAMPLE
212212
-----------
@@ -282,8 +282,8 @@ class DeviceEvents(_MistWebsocket):
282282
queue_maxsize : int, default 0
283283
Maximum number of messages buffered in the internal queue for the
284284
``receive()`` generator. ``0`` means unbounded. When set, the
285-
websocket-client receive thread blocks if the queue is full,
286-
providing backpressure for high-frequency streams.
285+
incoming messages are dropped with a warning when the queue is
286+
full, preventing memory growth on high-frequency streams.
287287
288288
EXAMPLE
289289
-----------
@@ -359,8 +359,8 @@ class MxEdgesStatsEvents(_MistWebsocket):
359359
queue_maxsize : int, default 0
360360
Maximum number of messages buffered in the internal queue for the
361361
``receive()`` generator. ``0`` means unbounded. When set, the
362-
websocket-client receive thread blocks if the queue is full,
363-
providing backpressure for high-frequency streams.
362+
incoming messages are dropped with a warning when the queue is
363+
full, preventing memory growth on high-frequency streams.
364364
365365
EXAMPLE
366366
-----------
@@ -436,8 +436,8 @@ class MxEdgesEvents(_MistWebsocket):
436436
queue_maxsize : int, default 0
437437
Maximum number of messages buffered in the internal queue for the
438438
``receive()`` generator. ``0`` means unbounded. When set, the
439-
websocket-client receive thread blocks if the queue is full,
440-
providing backpressure for high-frequency streams.
439+
incoming messages are dropped with a warning when the queue is
440+
full, preventing memory growth on high-frequency streams.
441441
442442
EXAMPLE
443443
-----------
@@ -513,8 +513,8 @@ class PcapEvents(_MistWebsocket):
513513
queue_maxsize : int, default 0
514514
Maximum number of messages buffered in the internal queue for the
515515
``receive()`` generator. ``0`` means unbounded. When set, the
516-
websocket-client receive thread blocks if the queue is full,
517-
providing backpressure for high-frequency streams.
516+
incoming messages are dropped with a warning when the queue is
517+
full, preventing memory growth on high-frequency streams.
518518
519519
EXAMPLE
520520
-----------

tests/unit/test_websocket_client.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,6 +1096,17 @@ def test_receive_raises_when_message_callback_registered(self, ws_client) -> Non
10961096
with pytest.raises(RuntimeError, match="on_message callback"):
10971097
list(ws_client.receive())
10981098

1099+
def test_queue_full_drops_message(self, mock_session) -> None:
1100+
client = _MistWebsocket(mock_session, channels=["/ch"], queue_maxsize=1)
1101+
# Fill the queue
1102+
client._handle_message(Mock(), '{"event": "first"}')
1103+
assert not client._queue.empty()
1104+
# Second message should be dropped, not block
1105+
client._handle_message(Mock(), '{"event": "dropped"}')
1106+
# Only the first message should be in the queue
1107+
assert client._queue.get_nowait() == {"event": "first"}
1108+
assert client._queue.empty()
1109+
10991110

11001111
# ---------------------------------------------------------------------------
11011112
# disconnect(wait=...)

0 commit comments

Comments
 (0)