-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathswitch.py
More file actions
163 lines (136 loc) · 5.23 KB
/
Copy pathswitch.py
File metadata and controls
163 lines (136 loc) · 5.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""Plugwise switch node object."""
from __future__ import annotations
from asyncio import gather
from collections.abc import Awaitable, Callable
from dataclasses import replace
from datetime import UTC, datetime
import logging
from typing import Any, Final
from ..api import NodeEvent, NodeFeature, NodeType, SwitchGroup
from ..connection import StickController
from ..exceptions import MessageError, NodeError
from ..messages.responses import (
NODE_SWITCH_GROUP_ID,
NodeSwitchGroupResponse,
PlugwiseResponse,
)
from ..nodes.sed import NodeSED
from .helpers import raise_not_loaded
from .helpers.firmware import SWITCH_FIRMWARE_SUPPORT
_LOGGER = logging.getLogger(__name__)
# Switch Features
SWITCH_FEATURES: Final = (NodeFeature.SWITCH,)
# Default firmware if not known
DEFAULT_FIRMWARE: Final = datetime(2009, 9, 8, 14, 7, 4, tzinfo=UTC)
class PlugwiseSwitch(NodeSED):
"""Plugwise Switch node."""
def __init__(
self,
mac: str,
node_type: NodeType,
controller: StickController,
loaded_callback: Callable[[NodeEvent, str], Awaitable[None]],
):
"""Initialize Scan Device."""
super().__init__(mac, node_type, controller, loaded_callback)
self._switch_subscription: Callable[[], None] | None = None
self._switch = SwitchGroup()
async def load(self) -> None:
"""Load and activate Switch node features."""
if self._loaded:
return
_LOGGER.debug("Loading Switch node %s", self._node_info.mac)
await super().load()
self._setup_protocol(SWITCH_FIRMWARE_SUPPORT, SWITCH_FEATURES)
await self.initialize()
await self._loaded_callback(NodeEvent.LOADED, self.mac)
@raise_not_loaded
async def initialize(self) -> None:
"""Initialize Switch node."""
if self._initialized:
return
self._switch_subscription = await self._message_subscribe(
self._switch_response,
self._mac_in_bytes,
(NODE_SWITCH_GROUP_ID,),
)
await super().initialize()
async def unload(self) -> None:
"""Unload node."""
if self._switch_subscription is not None:
self._switch_subscription()
await super().unload()
# region Caching
async def _load_defaults(self) -> None:
"""Load default configuration settings."""
await super()._load_defaults()
if self._node_info.model is None:
self._node_info.model = "Switch"
self._sed_node_info_update_task_scheduled = True
if self._node_info.name is None:
self._node_info.name = f"Switch {self._node_info.mac[-5:]}"
self._sed_node_info_update_task_scheduled = True
if self._node_info.firmware is None:
self._node_info.firmware = DEFAULT_FIRMWARE
self._sed_node_info_update_task_scheduled = True
# endregion
# region Properties
@property
@raise_not_loaded
def switch(self) -> bool:
"""Current state of switch."""
return bool(self._switch.state)
# endregion
async def _switch_response(self, response: PlugwiseResponse) -> bool:
"""Switch group request from Switch."""
if not isinstance(response, NodeSwitchGroupResponse):
raise MessageError(
f"Invalid response message type ({response.__class__.__name__}) received, expected NodeSwitchGroupResponse"
)
await gather(
self._available_update_state(True, response.timestamp),
self._switch_state_update(
response.switch_state, response.switch_group, response.timestamp
),
)
return True
async def _switch_state_update(
self, switch_state: bool, switch_group: int, timestamp: datetime
) -> None:
"""Process switch state update."""
_LOGGER.debug(
"_switch_state_update for %s: %s",
self.name,
switch_state,
)
self._switch = replace(
self._switch, state=switch_state, group=switch_group, timestamp=timestamp
)
await self.publish_feature_update_to_subscribers(
NodeFeature.SWITCH, self._switch
)
@raise_not_loaded
async def get_state(self, features: tuple[NodeFeature]) -> dict[NodeFeature, Any]:
"""Update latest state for given feature."""
states: dict[NodeFeature, Any] = {}
for feature in features:
_LOGGER.debug(
"Updating node %s - feature '%s'",
self._node_info.mac,
feature,
)
if feature not in self._features:
raise NodeError(
f"Update of feature '{feature.name}' is "
+ f"not supported for {self.mac}"
)
match feature:
case NodeFeature.SWITCH:
states[NodeFeature.SWITCH] = self._switch
case _:
state_result = await super().get_state((feature,))
if feature in state_result:
states[feature] = state_result[feature]
if NodeFeature.AVAILABLE not in states:
states[NodeFeature.AVAILABLE] = self.available_state
return states