Skip to content

Commit dca90ef

Browse files
authored
Merge pull request #82 from graphras-com/fix/74-observable-subscription-failures
Make EventBus subscription failures observable (#74)
2 parents 499d209 + c9d0686 commit dca90ef

2 files changed

Lines changed: 404 additions & 20 deletions

File tree

src/haclient/core/events.py

Lines changed: 210 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
from __future__ import annotations
2020

21+
import asyncio
22+
import contextlib
2123
import logging
2224
from collections import defaultdict, deque
2325
from collections.abc import Awaitable
@@ -35,6 +37,19 @@ class EventBus:
3537
----------
3638
ws : WebSocketPort
3739
The transport used to subscribe to Home Assistant events.
40+
41+
Notes
42+
-----
43+
Subscriptions can fail at the transport layer. The bus exposes two
44+
APIs to handle this:
45+
46+
* `subscribe` / `unsubscribe` — fire-and-forget; failures are logged
47+
and recorded on the bus so callers can inspect them via
48+
`subscription_failure` and `pending_subscription`.
49+
* `subscribe_async` / `unsubscribe_async` — awaitable; transport
50+
errors are raised to the caller. Prefer these whenever the caller
51+
needs confirmation that Home Assistant actually accepted the
52+
subscription.
3853
"""
3954

4055
def __init__(self, ws: WebSocketPort) -> None:
@@ -43,13 +58,22 @@ def __init__(self, ws: WebSocketPort) -> None:
4358
self._subscription_ids: dict[str, int] = {}
4459
self._buffers: dict[str, deque[dict[str, Any]]] = {}
4560
self._started = False
61+
# Per-event-type background subscription task scheduled by the
62+
# fire-and-forget `subscribe()` path. Replaced (and the previous
63+
# task discarded) on each new attempt.
64+
self._pending_subscriptions: dict[str, asyncio.Task[None]] = {}
65+
# Last subscription failure observed for a given event type via
66+
# the fire-and-forget path. Cleared on a successful retry.
67+
self._subscription_failures: dict[str, BaseException] = {}
4668

4769
def subscribe(self, event_type: str, handler: EventHandler) -> EventHandler:
4870
"""Register *handler* for the given *event_type*.
4971
5072
Subscriptions registered before `start` are batched; those added
5173
afterwards trigger an immediate WebSocket subscribe if it is the
52-
first handler for the type.
74+
first handler for the type. The scheduled task is tracked so
75+
callers can await it (`pending_subscription`) or inspect its
76+
outcome (`subscription_failure`).
5377
5478
Parameters
5579
----------
@@ -62,21 +86,75 @@ def subscribe(self, event_type: str, handler: EventHandler) -> EventHandler:
6286
-------
6387
callable
6488
The same *handler*, for use as a decorator.
89+
90+
Notes
91+
-----
92+
This method does not raise transport errors. Use
93+
`subscribe_async` when the caller needs to know whether Home
94+
Assistant accepted the subscription.
6595
"""
6696
first_for_type = event_type not in self._handlers
6797
self._handlers[event_type].append(handler)
6898
if self._started and first_for_type:
6999
# Subscribe lazily; the WS adapter handles re-subscription on reconnect.
70-
import asyncio
100+
task = asyncio.ensure_future(self._ensure_subscription(event_type))
101+
self._pending_subscriptions[event_type] = task
102+
103+
def _done(t: asyncio.Task[None], et: str = event_type) -> None:
104+
self._on_subscription_task_done(et, t)
105+
106+
task.add_done_callback(_done)
107+
return handler
108+
109+
async def subscribe_async(self, event_type: str, handler: EventHandler) -> EventHandler:
110+
"""Register *handler* and await the underlying WebSocket subscribe.
111+
112+
Like `subscribe`, but transport failures propagate to the caller
113+
and the handler is rolled back if the first subscribe for an
114+
event type fails — so callers can rely on the returned handler
115+
being live.
116+
117+
Parameters
118+
----------
119+
event_type : str
120+
The Home Assistant event type.
121+
handler : callable
122+
Sync or async callable receiving the event dict.
123+
124+
Returns
125+
-------
126+
callable
127+
The registered handler.
71128
72-
asyncio.ensure_future(self._ensure_subscription(event_type))
129+
Raises
130+
------
131+
Exception
132+
Any exception raised by the underlying `WebSocketPort` when
133+
the first handler for *event_type* triggers a subscribe.
134+
"""
135+
first_for_type = event_type not in self._handlers
136+
self._handlers[event_type].append(handler)
137+
if not (self._started and first_for_type):
138+
return handler
139+
try:
140+
await self._subscribe_now(event_type)
141+
except BaseException:
142+
# Roll back the handler so the caller's view is consistent
143+
# with the transport state.
144+
handlers = self._handlers.get(event_type)
145+
if handlers is not None:
146+
with contextlib.suppress(ValueError): # pragma: no cover - defensive
147+
handlers.remove(handler)
148+
if not handlers:
149+
self._handlers.pop(event_type, None)
150+
raise
73151
return handler
74152

75153
def unsubscribe(self, event_type: str, handler: EventHandler) -> None:
76154
"""Remove a previously registered handler.
77155
78156
If the last handler for *event_type* is removed the WebSocket
79-
subscription is also cancelled.
157+
subscription is also cancelled in the background.
80158
81159
Parameters
82160
----------
@@ -86,20 +164,105 @@ def unsubscribe(self, event_type: str, handler: EventHandler) -> None:
86164
The exact handler previously passed to `subscribe`. Removing
87165
an unknown handler is a no-op.
88166
"""
167+
sub_id = self._drop_handler(event_type, handler)
168+
if sub_id is not None and self._ws.connected:
169+
asyncio.ensure_future(self._safe_unsubscribe(sub_id))
170+
171+
async def unsubscribe_async(self, event_type: str, handler: EventHandler) -> None:
172+
"""Remove a handler and await any resulting WebSocket unsubscribe.
173+
174+
Unlike `unsubscribe`, transport errors raised while telling Home
175+
Assistant to stop sending events propagate to the caller.
176+
177+
Parameters
178+
----------
179+
event_type : str
180+
The Home Assistant event type to unsubscribe from.
181+
handler : callable
182+
The exact handler previously passed to `subscribe` or
183+
`subscribe_async`. Removing an unknown handler is a no-op.
184+
185+
Raises
186+
------
187+
Exception
188+
Any exception raised by `WebSocketPort.unsubscribe`.
189+
"""
190+
sub_id = self._drop_handler(event_type, handler)
191+
if sub_id is not None and self._ws.connected:
192+
await self._ws.unsubscribe(sub_id)
193+
194+
def _drop_handler(self, event_type: str, handler: EventHandler) -> int | None:
195+
"""Remove *handler* and return the WS subscription id to release.
196+
197+
Returns ``None`` if the handler was unknown or other handlers
198+
remain for *event_type*.
199+
"""
89200
handlers = self._handlers.get(event_type)
90201
if not handlers:
91-
return
202+
return None
92203
try:
93204
handlers.remove(handler)
94205
except ValueError:
95-
return
96-
if not handlers:
97-
self._handlers.pop(event_type, None)
98-
sub_id = self._subscription_ids.pop(event_type, None)
99-
if sub_id is not None and self._ws.connected:
100-
import asyncio
206+
return None
207+
if handlers:
208+
return None
209+
self._handlers.pop(event_type, None)
210+
return self._subscription_ids.pop(event_type, None)
101211

102-
asyncio.ensure_future(self._safe_unsubscribe(sub_id))
212+
def subscription_failure(self, event_type: str) -> BaseException | None:
213+
"""Return the last fire-and-forget subscribe failure, if any.
214+
215+
Parameters
216+
----------
217+
event_type : str
218+
The event type to inspect.
219+
220+
Returns
221+
-------
222+
BaseException or None
223+
The exception raised by the most recent fire-and-forget
224+
subscribe attempt, or ``None`` if the current subscription
225+
is healthy (or no attempt has been made).
226+
"""
227+
return self._subscription_failures.get(event_type)
228+
229+
def pending_subscription(self, event_type: str) -> asyncio.Task[None] | None:
230+
"""Return the in-flight subscribe task for *event_type*, if any.
231+
232+
Awaiting the returned task lets callers convert a fire-and-forget
233+
`subscribe` into a confirmed registration without changing the
234+
original call site.
235+
236+
Parameters
237+
----------
238+
event_type : str
239+
The event type whose pending subscribe task to return.
240+
241+
Returns
242+
-------
243+
asyncio.Task or None
244+
The scheduled task, or ``None`` if no subscribe is in flight
245+
for *event_type*.
246+
"""
247+
task = self._pending_subscriptions.get(event_type)
248+
if task is None or task.done():
249+
return None
250+
return task
251+
252+
def _on_subscription_task_done(self, event_type: str, task: asyncio.Task[None]) -> None:
253+
"""Record the outcome of a fire-and-forget subscribe task."""
254+
# Only forget the task if it is still the registered one — a
255+
# later attempt may have replaced it.
256+
if self._pending_subscriptions.get(event_type) is task:
257+
self._pending_subscriptions.pop(event_type, None)
258+
if task.cancelled():
259+
return
260+
exc = task.exception()
261+
if exc is None:
262+
# Success: clear any stale failure.
263+
self._subscription_failures.pop(event_type, None)
264+
else:
265+
self._subscription_failures[event_type] = exc
103266

104267
async def _safe_unsubscribe(self, sub_id: int) -> None:
105268
"""Unsubscribe, swallowing transport errors."""
@@ -112,22 +275,51 @@ async def start(self) -> None:
112275
"""Subscribe to every registered event type and arm reconnect.
113276
114277
Safe to call multiple times.
278+
279+
Notes
280+
-----
281+
Transport failures during the initial batch are recorded on the
282+
bus (see `subscription_failure`) but not raised, so a single
283+
flaky event type does not abort startup. Use `subscribe_async`
284+
afterwards if you need confirmation that a specific subscription
285+
is live.
115286
"""
116287
if self._started:
117288
return
118289
for event_type in list(self._handlers.keys()):
119-
await self._ensure_subscription(event_type)
290+
try:
291+
await self._ensure_subscription(event_type)
292+
except Exception: # noqa: BLE001 - logged & recorded by _ensure_subscription
293+
continue
120294
self._started = True
121295

122296
async def _ensure_subscription(self, event_type: str) -> None:
123-
"""Subscribe on the WS if not already subscribed."""
124-
if event_type in self._subscription_ids:
125-
return
297+
"""Subscribe on the WS, logging transport errors and recording them.
298+
299+
Used by `start` and by the fire-and-forget `subscribe` path. The
300+
exception is re-raised so the post-start path's task done
301+
callback can store it on `_subscription_failures`; the `start`
302+
path catches it again to preserve the historic "start never
303+
raises on subscribe failure" behaviour, but still records the
304+
failure for observability.
305+
"""
126306
try:
127-
sub_id = await self._ws.subscribe_events(self._make_dispatcher(event_type), event_type)
128-
except Exception:
307+
await self._subscribe_now(event_type)
308+
except Exception as exc:
129309
_LOGGER.exception("EventBus failed to subscribe to %s", event_type)
310+
self._subscription_failures[event_type] = exc
311+
raise
312+
313+
async def _subscribe_now(self, event_type: str) -> None:
314+
"""Subscribe on the WS, propagating transport errors.
315+
316+
Used by `subscribe_async` and (indirectly) by
317+
`_ensure_subscription`. Idempotent — returns immediately if a
318+
subscription id is already recorded for *event_type*.
319+
"""
320+
if event_type in self._subscription_ids:
130321
return
322+
sub_id = await self._ws.subscribe_events(self._make_dispatcher(event_type), event_type)
131323
self._subscription_ids[event_type] = sub_id
132324

133325
def _make_dispatcher(self, event_type: str) -> EventHandler:

0 commit comments

Comments
 (0)