-
-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathbroadcaster.py
More file actions
82 lines (67 loc) · 3.23 KB
/
Copy pathbroadcaster.py
File metadata and controls
82 lines (67 loc) · 3.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""In-process Server-Sent Events fan-out.
A tiny, dependency-free pub/sub used to push live data (node + chat updates)
to connected SSE clients. One bounded ``asyncio.Queue`` per client; producers
call :meth:`publish` from the same event loop. ``publish`` is synchronous and
never awaits, so the MQTT ingest path (the single ``data.update_node``
chokepoint and ``mqtt.handle_text``) can emit without ever blocking on a slow
or stalled client — mirroring the existing ``discord_event_queue`` discipline.
The transport (the SSE route in ``api/api.py``) is intentionally separate: this
hub knows nothing about HTTP, so a future WebSocket endpoint could subscribe to
the same instance without changing producers.
"""
import asyncio
import logging
from typing import Any, Tuple
logger = logging.getLogger(__name__)
# Per-subscriber backlog before we start dropping. Generous enough to absorb a
# burst, small enough that a dead client can't grow unbounded memory.
DEFAULT_MAX_QUEUE = 100
class Broadcaster:
"""Fan-out of ``(event_type, payload)`` items to all subscribers.
Payloads MUST already be plain, serializable snapshots — callers own
isolation. ``get_node_cached`` hands back the *live* LRU dict, so the node
producer publishes ``jsonable_encoder(n)`` rather than the dict itself.
"""
def __init__(self, max_queue: int = DEFAULT_MAX_QUEUE) -> None:
self._subscribers: set[asyncio.Queue] = set()
self._max_queue = max(1, max_queue)
self._dropped_total = 0
def subscribe(self) -> "asyncio.Queue[Tuple[str, Any]]":
"""Register a new client and return its private queue."""
q: asyncio.Queue = asyncio.Queue(maxsize=self._max_queue)
self._subscribers.add(q)
return q
def unsubscribe(self, q: asyncio.Queue) -> None:
"""Deregister a client. Safe to call more than once."""
self._subscribers.discard(q)
@property
def subscriber_count(self) -> int:
return len(self._subscribers)
@property
def dropped_total(self) -> int:
return self._dropped_total
def publish(self, event_type: str, payload: Any) -> None:
"""Enqueue an event for every subscriber. Non-blocking; never awaits.
On a full subscriber queue we drop that subscriber's OLDEST item and
enqueue the newest — a live UI cares about current state, not history,
and the client's reconnect/resync (or fallback poll) closes any gap.
"""
if not self._subscribers:
return
item: Tuple[str, Any] = (event_type, payload)
for q in self._subscribers:
try:
q.put_nowait(item)
except asyncio.QueueFull:
try:
q.get_nowait() # drop oldest
q.put_nowait(item) # make room for newest
except (asyncio.QueueEmpty, asyncio.QueueFull):
# Raced with the consumer; the next publish will catch up.
pass
self._dropped_total += 1
if self._dropped_total % 100 == 0:
logger.warning(
"SSE broadcaster dropped %d event(s) total (slow subscriber)",
self._dropped_total,
)