-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcircle_plus.py
More file actions
145 lines (130 loc) · 5.15 KB
/
Copy pathcircle_plus.py
File metadata and controls
145 lines (130 loc) · 5.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""Plugwise Circle+ node."""
from __future__ import annotations
from datetime import UTC, datetime
import logging
from ..api import NodeEvent, NodeFeature
from ..constants import MAX_TIME_DRIFT
from ..messages.requests import (
CirclePlusAllowJoiningRequest,
CirclePlusRealTimeClockGetRequest,
CirclePlusRealTimeClockSetRequest,
)
from ..messages.responses import NodeResponseType
from .circle import PlugwiseCircle
from .helpers import raise_not_loaded
from .helpers.firmware import CIRCLE_PLUS_FIRMWARE_SUPPORT
_LOGGER = logging.getLogger(__name__)
class PlugwiseCirclePlus(PlugwiseCircle):
"""Plugwise Circle+ node."""
async def load(self) -> bool:
"""Load and activate Circle+ node features."""
if self._loaded:
return True
if self._cache_enabled:
_LOGGER.debug("Loading Circle node %s from cache", self._node_info.mac)
if await self._load_from_cache():
self._loaded = True
self._setup_protocol(
CIRCLE_PLUS_FIRMWARE_SUPPORT,
(
NodeFeature.RELAY,
NodeFeature.RELAY_INIT,
NodeFeature.RELAY_LOCK,
NodeFeature.ENERGY,
NodeFeature.POWER,
NodeFeature.CIRCLE,
NodeFeature.CIRCLEPLUS,
),
)
if await self.initialize():
await self._loaded_callback(NodeEvent.LOADED, self.mac)
return True
_LOGGER.info(
"Loading Circle+ node %s from cache failed",
self._node_info.mac,
)
else:
_LOGGER.debug("Loading Circle+ node %s", self._node_info.mac)
# Check if node is online
if not self._available and not await self.is_online():
_LOGGER.warning(
"Failed to load Circle+ node %s because it is not online",
self._node_info.mac,
)
return False
# Get node info
if await self.node_info_update() is None:
_LOGGER.warning(
"Failed to load Circle+ node %s because it is not responding to information request",
self._node_info.mac,
)
return False
self._loaded = True
self._setup_protocol(
CIRCLE_PLUS_FIRMWARE_SUPPORT,
(
NodeFeature.RELAY,
NodeFeature.RELAY_INIT,
NodeFeature.RELAY_LOCK,
NodeFeature.ENERGY,
NodeFeature.POWER,
NodeFeature.CIRCLE,
NodeFeature.CIRCLEPLUS,
),
)
if not await self.initialize():
return False
await self._loaded_callback(NodeEvent.LOADED, self.mac)
return True
async def clock_synchronize(self) -> bool:
"""Synchronize realtime clock. Returns true if successful."""
clock_request = CirclePlusRealTimeClockGetRequest(
self._send, self._mac_in_bytes
)
if (clock_response := await clock_request.send()) is None:
_LOGGER.debug(
"No response for async_realtime_clock_synchronize() for %s", self.mac
)
await self._available_update_state(False)
return False
await self._available_update_state(True, clock_response.timestamp)
_dt_of_circle: datetime = datetime.now(tz=UTC).replace(
hour=clock_response.time.value.hour,
minute=clock_response.time.value.minute,
second=clock_response.time.value.second,
microsecond=0,
tzinfo=UTC,
)
clock_offset = clock_response.timestamp.replace(microsecond=0) - _dt_of_circle
if (clock_offset.seconds < MAX_TIME_DRIFT) or (
clock_offset.seconds > -(MAX_TIME_DRIFT)
):
return True
_LOGGER.info(
"Reset realtime clock of node %s because time has drifted %s seconds while max drift is set to %s seconds)",
self._node_info.mac,
str(clock_offset.seconds),
str(MAX_TIME_DRIFT),
)
clock_set_request = CirclePlusRealTimeClockSetRequest(
self._send, self._mac_in_bytes, datetime.now(tz=UTC)
)
if (node_response := await clock_set_request.send()) is not None:
return node_response.ack_id == NodeResponseType.CLOCK_ACCEPTED
_LOGGER.warning(
"Failed to (re)set the internal realtime clock of %s",
self.name,
)
return False
@raise_not_loaded
async def enable_auto_join(self) -> bool:
"""Enable auto-join on the Circle+.
Returns:
bool: True if the request was acknowledged, False otherwise.
"""
_LOGGER.info("Enabling auto-join for CirclePlus")
request = CirclePlusAllowJoiningRequest(self._send, True)
if (response := await request.send()) is None:
return False
# JOIN_ACCEPTED is the ACK for enable=True
return NodeResponseType(response.ack_id) == NodeResponseType.JOIN_ACCEPTED