-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathplugins.py
More file actions
377 lines (299 loc) · 12.5 KB
/
Copy pathplugins.py
File metadata and controls
377 lines (299 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
"""Domain plugin registry and accessor base.
Adding a new domain to HaClient is done by creating an `Entity` subclass
and registering a `DomainSpec`. The core never imports specific domains;
it iterates the spec registry instead.
A `DomainAccessor` is the object returned by ``ha.<domain>`` (e.g.
``ha.light`` or ``ha.scene``). It provides:
* ``__call__(name)`` and ``__getitem__(name)`` for entity lookup.
* Domain-level operations registered by the spec via ``operations`` (legacy
third-party path) **or** via typed subclass methods (preferred path).
Domains with collection-level operations should subclass `DomainAccessor`
and register the subclass via ``DomainSpec.accessor_cls``. This keeps the
public API statically typed without requiring ``# type: ignore`` workarounds
or private ``_factory`` access from outside the accessor.
Third-party plugins can ship additional domains by exposing an entry
point under the ``haclient.domains`` group; see
`DomainRegistry.load_entry_points`.
"""
from __future__ import annotations
import logging
from collections.abc import Callable, Iterable, Iterator
from dataclasses import dataclass, field
from importlib import metadata
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast
from haclient.exceptions import HAClientError
if TYPE_CHECKING:
from haclient.entity.base import Entity
_LOGGER = logging.getLogger(__name__)
E = TypeVar("E", bound="Entity")
DomainEventHandler = Callable[["Entity", str, dict[str, Any]], None]
"""Per-domain handler invoked for non-``state_changed`` events.
Receives the entity instance, the event_type string, and the raw event
data dictionary. Used by domains that need direct event routing (e.g.
the ``timer`` domain consuming ``timer.finished`` / ``timer.cancelled``).
"""
@dataclass(frozen=True)
class DomainSpec(Generic[E]):
"""Declarative description of a Home Assistant domain.
Built-in domains live in `haclient.domains.*` and register a spec at
import time. Third-party packages can register additional domains via
the ``haclient.domains`` entry-point group.
Attributes
----------
name : str
The HA domain name (e.g. ``"light"``).
entity_cls : type[Entity]
The entity class instantiated for this domain.
accessor : str
Attribute name on the `HAClient` facade. Defaults to *name*.
event_subscriptions : tuple of str
Additional HA event types this domain wants delivered, beyond
the always-on ``state_changed`` subscription.
on_event : callable or None
Per-domain event handler (see `DomainEventHandler`).
operations : dict
Legacy dynamic operation dict kept for third-party plugin
compatibility. Built-in domains with collection-level operations
should prefer ``accessor_cls`` instead.
accessor_cls : type[DomainAccessor] or None
Optional typed `DomainAccessor` subclass to instantiate for this
domain. When provided, the ``HAClient`` uses this class rather than
the base `DomainAccessor`, exposing properly typed collection-level
methods (e.g. ``SceneAccessor.create``).
"""
name: str
entity_cls: type[E]
accessor: str = ""
event_subscriptions: tuple[str, ...] = ()
on_event: DomainEventHandler | None = None
operations: dict[str, Callable[..., Any]] = field(default_factory=dict)
accessor_cls: type[DomainAccessor[Any]] | None = None
def accessor_name(self) -> str:
"""Return the accessor attribute name (defaults to ``name``)."""
return self.accessor or self.name
class DomainAccessor(Generic[E]):
"""Runtime facade for one domain.
Returned by ``HAClient.<accessor>``. Exposes:
* Lookup by short name: ``ha.light("kitchen")`` or ``ha.light["kitchen"]``.
* Domain-level operations either via typed subclass methods (preferred) or
via legacy dynamic binding of ``spec.operations`` entries.
Parameters
----------
spec : DomainSpec
The spec describing this domain.
factory : EntityFactoryProtocol
Factory used to create entity instances on demand.
"""
def __init__(self, spec: DomainSpec[E], factory: EntityFactoryProtocol) -> None:
self._spec = spec
self._factory = factory
for op_name, op in spec.operations.items():
# Bind each operation as an attribute on the instance.
# This path is kept for backward-compatible third-party plugins.
setattr(self, op_name, self._bind(op))
@property
def spec(self) -> DomainSpec[E]:
"""Return the underlying `DomainSpec`."""
return self._spec
@property
def factory(self) -> EntityFactoryProtocol:
"""Return the `EntityFactoryProtocol` used to create entities.
Subclasses use this to access ``factory.services`` and
``factory.state`` without reaching into private internals.
"""
return self._factory
def _bind(self, op: Callable[..., Any]) -> Callable[..., Any]:
"""Bind a domain operation to this accessor.
Each operation is invoked with the accessor as the first argument,
analogous to a method receiving ``self``. Async operations remain
coroutine functions so introspection (e.g. by `_SyncProxy`) keeps
working.
"""
import asyncio
if asyncio.iscoroutinefunction(op):
async def async_bound(*args: Any, **kwargs: Any) -> Any:
return await op(self, *args, **kwargs)
async_bound.__name__ = getattr(op, "__name__", "operation")
async_bound.__doc__ = op.__doc__
return async_bound
def bound(*args: Any, **kwargs: Any) -> Any:
return op(self, *args, **kwargs)
bound.__name__ = getattr(op, "__name__", "operation")
bound.__doc__ = op.__doc__
return bound
def __call__(self, name: str) -> E:
"""Return the entity with short *name*, creating it if needed."""
return cast("E", self._factory.get_or_create(self._spec, name))
def __getitem__(self, name: str) -> E:
"""Return the entity with short *name*, creating it if needed."""
return cast("E", self._factory.get_or_create(self._spec, name))
def all(self) -> list[E]:
"""Return every entity currently registered for this domain.
Returns
-------
list of Entity
All entities whose id starts with ``"<domain>."``.
"""
return cast("list[E]", self._factory.in_domain(self._spec))
class EntityFactoryProtocol:
"""Structural type used by `DomainAccessor`.
Defined as a regular class to keep imports simple. Concrete
`EntityFactory` lives in `haclient.core.factory`.
"""
def get_or_create(self, spec: DomainSpec[Any], name: str) -> Any: # pragma: no cover
"""Return the entity for *spec*/*name*, creating it on first use.
Parameters
----------
spec : DomainSpec
The spec describing the entity's domain.
name : str
Short entity name or full ``<domain>.<name>`` entity id.
Returns
-------
Entity
The (possibly newly created) entity instance.
"""
raise NotImplementedError
def in_domain(self, spec: DomainSpec[Any]) -> list[Any]: # pragma: no cover
"""Return every registered entity belonging to *spec*'s domain.
Parameters
----------
spec : DomainSpec
The spec describing the domain to enumerate.
Returns
-------
list of Entity
All entities currently in the registry whose id starts with
``"<spec.name>."``.
"""
raise NotImplementedError
class DomainRegistry:
"""Mutable registry of `DomainSpec` keyed by domain name.
Built-in domains register on import (see `haclient.domains.__init__`).
Third-party domains can be discovered via entry points using
`load_entry_points`.
"""
_instance: DomainRegistry | None = None
def __init__(self) -> None:
self._specs: dict[str, DomainSpec[Any]] = {}
@classmethod
def shared(cls) -> DomainRegistry:
"""Return the process-wide shared registry instance.
Built-in domain modules use this when they register at import
time. `HAClient` reads from the same instance unless a custom
registry is passed explicitly.
"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def register(self, spec: DomainSpec[Any]) -> None:
"""Register *spec*, raising on duplicate domain names.
Parameters
----------
spec : DomainSpec
The spec to register.
Raises
------
HAClientError
If a spec with the same ``name`` is already registered for a
different entity class. Re-registering the same class is a
no-op (this happens when a module is imported twice).
"""
existing = self._specs.get(spec.name)
if existing is not None:
if existing.entity_cls is spec.entity_cls:
self._specs[spec.name] = spec
return
raise HAClientError(
f"Domain {spec.name!r} already registered with "
f"{existing.entity_cls.__name__}; cannot replace with "
f"{spec.entity_cls.__name__}"
)
self._specs[spec.name] = spec
def unregister(self, name: str) -> None:
"""Remove the spec registered under *name*, if any."""
self._specs.pop(name, None)
def get(self, name: str) -> DomainSpec[Any]:
"""Return the spec registered for *name* or raise.
Parameters
----------
name : str
The HA domain name to look up.
Returns
-------
DomainSpec
The registered spec.
Raises
------
HAClientError
If no domain *name* is registered.
"""
spec = self._specs.get(name)
if spec is None:
raise HAClientError(
f"Unknown domain {name!r}; ensure the corresponding plugin is loaded"
)
return spec
def __contains__(self, name: object) -> bool:
return isinstance(name, str) and name in self._specs
def __iter__(self) -> Iterator[DomainSpec[Any]]:
return iter(self._specs.values())
def names(self) -> list[str]:
"""Return all registered domain names."""
return list(self._specs.keys())
def filter(self, names: Iterable[str]) -> list[DomainSpec[Any]]:
"""Return only the specs whose names are in *names*.
Parameters
----------
names : iterable of str
Allowed domain names. Unknown names are silently ignored.
Returns
-------
list of DomainSpec
Registered specs filtered to the requested subset, in
registration order.
"""
wanted = set(names)
return [s for s in self._specs.values() if s.name in wanted]
def load_entry_points(self, group: str = "haclient.domains") -> list[str]:
"""Discover and load third-party domain plugins.
Each entry point is loaded inside a ``try/except`` so a single
broken plugin cannot prevent the rest from loading. The names
of the entry points that loaded successfully are returned.
Parameters
----------
group : str, optional
The entry-point group name. Defaults to ``"haclient.domains"``.
Returns
-------
list of str
Names of the entry points that loaded without raising.
"""
loaded: list[str] = []
try:
entry_points = metadata.entry_points(group=group)
except Exception: # pragma: no cover - defensive
_LOGGER.exception("Failed to enumerate entry points for %s", group)
return loaded
for ep in entry_points:
try:
ep.load()
loaded.append(ep.name)
except Exception: # noqa: BLE001
_LOGGER.exception("Failed to load haclient domain plugin %r", ep.name)
return loaded
def register_domain(spec: DomainSpec[Any]) -> DomainSpec[Any]:
"""Register *spec* on the shared registry.
This is the canonical entry point for both built-in and third-party
domain modules.
Parameters
----------
spec : DomainSpec
The spec to register.
Returns
-------
DomainSpec
The same spec, for convenience.
"""
DomainRegistry.shared().register(spec)
return spec