Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Ongoing

- Code optimizations via PR [#837](https://github.com/plugwise/python-plugwise/pull/837), [#838](https://github.com/plugwise/python-plugwise/pull/838)
- Code optimizations via PR [#837](https://github.com/plugwise/python-plugwise/pull/837), [#838](https://github.com/plugwise/python-plugwise/pull/838), [#839](https://github.com/plugwise/python-plugwise/pull/839)

## v1.11.0

Expand Down
17 changes: 7 additions & 10 deletions plugwise/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,29 +188,28 @@ def _entity_switching_group(self, entity: GwEntityData) -> None:
entity["switches"]["relay"] = counter != 0
self._count += 1

def _get_groups(self) -> dict[str, GwEntityData]:
def _get_groups(self) -> None:
"""Helper-function for smile.py: get_all_gateway_entities().

Collect switching-, pumping- or report-group info.
"""
groups: dict[str, GwEntityData] = {}
# P1 and Anna don't have groups
if self.smile.type == "power" or self.check_name(ANNA):
return groups
return

for group in self._domain_objects.findall("./group"):
members: list[str] = []
group_id = group.attrib["id"]
group_id = group.get("id")
group_name = group.find("name").text
group_type = group.find("type").text
group_appliances = group.findall("appliances/appliance")
for item in group_appliances:
# Check if members are not orphaned - stretch
if item.attrib["id"] in self.gw_entities:
members.append(item.attrib["id"])
if item.get("id") in self.gw_entities:
members.append(item.get("id"))

if group_type in GROUP_TYPES and members:
groups[group_id] = {
self.gw_entities[group_id] = {
"dev_class": group_type,
"model": "Group",
"name": group_name,
Expand All @@ -219,8 +218,6 @@ def _get_groups(self) -> dict[str, GwEntityData]:
}
self._count += 5

return groups

def _get_lock_state(
self, xml: etree.Element, data: GwEntityData, stretch_v2: bool = False
) -> None:
Expand Down Expand Up @@ -266,7 +263,7 @@ def _get_module_data(
if key is not None and key not in link_tag:
continue

link_id = appl_search.attrib["id"]
link_id = appl_search.get("id")
loc = f".//services/{link_tag}[@id='{link_id}']...."
# Not possible to walrus for some reason...
# xml_2: self._modules for legacy, self._domain_objects for actual
Expand Down
28 changes: 14 additions & 14 deletions plugwise/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def _get_appliances(self) -> None:
for appliance in self._domain_objects.findall("./appliance"):
appl = Munch()
appl.available = None
appl.entity_id = appliance.attrib["id"]
appl.entity_id = appliance.get("id")
appl.location = None
appl.name = appliance.find("name").text
appl.model = None
Expand All @@ -139,7 +139,7 @@ def _get_appliances(self) -> None:
continue

if (appl_loc := appliance.find("location")) is not None:
appl.location = appl_loc.attrib["id"]
appl.location = appl_loc.get("id")
# Set location to the _home_loc_id when the appliance-location is not found,
# except for thermostat-devices without a location, they are not active
elif appl.pwclass not in THERMOSTAT_CLASSES:
Expand Down Expand Up @@ -206,7 +206,7 @@ def _get_locations(self) -> None:
loc = Munch()
locations = self._domain_objects.findall("./location")
for location in locations:
loc.loc_id = location.attrib["id"]
loc.loc_id = location.get("id")
loc.name = location.find("name").text
loc._type = location.find("type").text
self._loc_data[loc.loc_id] = {
Expand Down Expand Up @@ -269,7 +269,7 @@ def _appliance_info_finder(self, appl: Munch, appliance: etree.Element) -> Munch

def _appl_gateway_info(self, appl: Munch, appliance: etree.Element) -> Munch:
"""Helper-function for _appliance_info_finder()."""
self._gateway_id = appliance.attrib["id"]
self._gateway_id = appliance.get("id")
appl.firmware = str(self.smile.version)
appl.hardware = self.smile.hw_version
appl.mac = self.smile.mac_address
Expand Down Expand Up @@ -319,7 +319,7 @@ def _get_appliances_with_offset_functionality(self) -> list[str]:
'.//actuator_functionalities/offset_functionality[type="temperature_offset"]/offset/../../..'
)
for item in offset_appls:
therm_list.append(item.attrib["id"])
therm_list.append(item.get("id"))

return therm_list

Expand Down Expand Up @@ -393,12 +393,12 @@ def _get_measurement_data(self, entity_id: str, entity: GwEntityData) -> None:
def _collect_group_sensors(
self,
data: GwEntityData,
entity_id: str,
group_id: str,
measurements: dict[str, UOM],
) -> None:
"""Collect group sensors."""
if (
group := self._domain_objects.find(f'./group[@id="{entity_id}"]')
group := self._domain_objects.find(f'./group[@id="{group_id}"]')
) is not None:
for measurement, attrs in measurements.items():
locator = f'.//logs/point_log[type="{measurement}"]/period/measurement'
Expand Down Expand Up @@ -502,7 +502,7 @@ def _get_plugwise_notifications(self) -> None:
self._notifications = {}
for notification in self._domain_objects.findall("./notification"):
try:
msg_id = notification.attrib["id"]
msg_id = notification.get("id")
msg_type = notification.find("type").text
msg = notification.find("message").text
self._notifications[msg_id] = {msg_type: msg}
Expand Down Expand Up @@ -903,7 +903,7 @@ def _presets(self, loc_id: str) -> dict[str, list[float]]:
directives = self._domain_objects.find(f'rule[@id="{rule_id}"]/directives')
for directive in directives:
preset = directive.find("then").attrib
presets[directive.attrib["preset"]] = [
presets[directive.get("preset")] = [
float(preset["heating_setpoint"]),
float(preset["cooling_setpoint"]),
]
Expand All @@ -920,13 +920,13 @@ def _rule_ids_by_name(self, name: str, loc_id: str) -> dict[str, dict[str, str]]
for rule in self._domain_objects.findall(f'./rule[name="{name}"]'):
active = rule.find("active").text
if rule.find(locator) is not None:
schedule_ids[rule.attrib["id"]] = {
schedule_ids[rule.get("id")] = {
"location": loc_id,
"name": name,
"active": active,
}
else:
schedule_ids[rule.attrib["id"]] = {
schedule_ids[rule.get("id")] = {
"location": NONE,
"name": name,
"active": active,
Expand All @@ -947,13 +947,13 @@ def _rule_ids_by_tag(self, tag: str, loc_id: str) -> dict[str, dict[str, str]]:
name = rule.find("name").text
active = rule.find("active").text
if rule.find(locator2) is not None:
schedule_ids[rule.attrib["id"]] = {
schedule_ids[rule.get("id")] = {
"location": loc_id,
"name": name,
"active": active,
}
else:
schedule_ids[rule.attrib["id"]] = {
schedule_ids[rule.get("id")] = {
"location": NONE,
"name": name,
"active": active,
Expand Down Expand Up @@ -1002,6 +1002,6 @@ def _thermostat_uri(self, loc_id: str) -> str:
Determine the location-set_temperature uri - from LOCATIONS.
"""
locator = f'./location[@id="{loc_id}"]/actuator_functionalities/thermostat_functionality'
thermostat_functionality_id = self._domain_objects.find(locator).attrib["id"]
thermostat_functionality_id = self._domain_objects.find(locator).get("id")

return f"{LOCATIONS};id={loc_id}/thermostat;id={thermostat_functionality_id}"
18 changes: 11 additions & 7 deletions plugwise/legacy/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def _all_appliances(self) -> None:
continue # pragma: no cover

appl.location = self._home_loc_id
appl.entity_id = appliance.attrib["id"]
appl.entity_id = appliance.get("id")
appl.name = appliance.find("name").text
# Extend device_class name when a Circle/Stealth is type heater_central -- Pw-Beta Issue #739
if (
Expand Down Expand Up @@ -148,7 +148,7 @@ def _all_locations(self) -> None:
return

for location in locations:
loc.loc_id = location.attrib["id"]
loc.loc_id = location.get("id")
loc.name = location.find("name").text
loc._type = location.find("type").text
# Filter the valid single location for P1 legacy: services not empty
Expand Down Expand Up @@ -393,10 +393,14 @@ def _presets(self) -> dict[str, list[float]]:
"""Helper-function for presets() - collect Presets for a legacy Anna."""
presets: dict[str, list[float]] = {}
for directive in self._domain_objects.findall("rule/directives/when/then"):
if directive is not None and directive.get("icon") is not None:
if (
directive is not None
and directive.get("icon") is not None
and directive.get("temperature") is not None
):
# Ensure list of heating_setpoint, cooling_setpoint
presets[directive.attrib["icon"]] = [
float(directive.attrib["temperature"]),
presets[directive.get("icon")] = [
float(directive.get("temperature")),
0,
]
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Expand All @@ -411,7 +415,7 @@ def _schedules(self) -> tuple[list[str], str]:
search = self._domain_objects
if (result := search.find("./rule[name='Thermostat schedule']")) is not None:
name = "Thermostat schedule"
rule_id = result.attrib["id"]
rule_id = result.get("id")

log_type = "schedule_state"
locator = f"./appliance[type='thermostat']/logs/point_log[type='{log_type}']/period/measurement"
Expand All @@ -432,5 +436,5 @@ def _schedules(self) -> tuple[list[str], str]:
def _thermostat_uri(self) -> str:
"""Determine the location-set_temperature uri - from APPLIANCES."""
locator = "./appliance[type='thermostat']"
appliance_id = self._appliances.find(locator).attrib["id"]
appliance_id = self._appliances.find(locator).get("id")
return f"{APPLIANCES};id={appliance_id}/thermostat"
16 changes: 9 additions & 7 deletions plugwise/legacy/smile.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ def get_all_gateway_entities(self) -> None:
Finally, collect the data and states for each entity.
"""
self._all_appliances()
if group_data := self._get_groups():
self.gw_entities.update(group_data)

self._get_groups()
self._all_entity_data()

async def async_update(self) -> dict[str, GwEntityData]:
Expand Down Expand Up @@ -154,13 +152,17 @@ async def set_offset(self, dev_id: str, offset: float) -> None:

async def set_preset(self, _: str, preset: str) -> None:
"""Set the given Preset on the relevant Thermostat - from DOMAIN_OBJECTS."""
if (presets := self._presets()) is None:
if not (presets := self._presets()):
raise PlugwiseError("Plugwise: no presets available.") # pragma: no cover
if preset not in list(presets):
raise PlugwiseError("Plugwise: invalid preset.")

locator = f'rule/directives/when/then[@icon="{preset}"].../.../...'
rule_id = self._domain_objects.find(locator).attrib["id"]
if (rule := self._domain_objects.find(locator)) is None:
raise PlugwiseError("Plugwise: no preset rule found.") # pragma: no cover
if (rule_id := rule.get("id")) is None:
raise PlugwiseError("Plugwise: no preset id found.") # pragma: no cover

data = f"<rules><rule id='{rule_id}'><active>true</active></rule></rules>"
await self.call_request(RULES, method="put", data=data)

Expand Down Expand Up @@ -192,7 +194,7 @@ async def set_schedule_state(
schedule_rule_id: str | None = None
for rule in self._domain_objects.findall("rule"):
if rule.find("name").text == name:
schedule_rule_id = rule.attrib["id"]
schedule_rule_id = rule.get("id")
break

if schedule_rule_id is None:
Expand All @@ -205,7 +207,7 @@ async def set_schedule_state(
new_state = "true"

locator = f'.//*[@id="{schedule_rule_id}"]/template'
template_id = self._domain_objects.find(locator).attrib["id"]
template_id = self._domain_objects.find(locator).get("id")
Comment thread
coderabbitai[bot] marked this conversation as resolved.

data = (
"<rules>"
Expand Down
14 changes: 6 additions & 8 deletions plugwise/smile.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ def get_all_gateway_entities(self) -> None:
)
self._scan_thermostats()

if group_data := self._get_groups():
self.gw_entities.update(group_data)

self._get_groups()
self._all_entity_data()

async def async_update(self) -> dict[str, GwEntityData]:
Expand Down Expand Up @@ -178,7 +176,7 @@ async def set_number(
if th_func_list := self._domain_objects.findall(locator):
for th_func in th_func_list:
if th_func.find("type").text == key:
thermostat_id = th_func.attrib["id"]
thermostat_id = th_func.get("id")

if thermostat_id is None:
raise PlugwiseError(f"Plugwise: cannot change setpoint, {key} not found.")
Expand Down Expand Up @@ -358,7 +356,7 @@ async def set_schedule_state(
)
if self.check_name(ANNA):
locator = f'.//*[@id="{schedule_rule_id}"]/template'
template_id = self._domain_objects.find(locator).attrib["id"]
template_id = self._domain_objects.find(locator).get("id")
template = f'<template id="{template_id}" />'

contexts = self.determine_contexts(loc_id, new_state, schedule_rule_id)
Expand Down Expand Up @@ -427,10 +425,10 @@ async def set_switch_state(
# multiple types of e.g. toggle_functionality present
if (sw_type := item.find("type")) is not None:
if sw_type.text == switch.act_type:
switch_id = item.attrib["id"]
switch_id = item.get("id")
break
else: # actuators with a single item like relay_functionality
switch_id = item.attrib["id"]
switch_id = item.get("id")

uri = f"{APPLIANCES};id={appl_id}/{switch.device};id={switch_id}"
if model == "relay":
Expand All @@ -456,7 +454,7 @@ async def _set_groupswitch_member_state(
switched = 0
for member in members:
locator = f'appliance[@id="{member}"]/{switch.actuator}/{switch.func_type}'
switch_id = self._domain_objects.find(locator).attrib["id"]
switch_id = self._domain_objects.find(locator).get("id")
uri = f"{APPLIANCES};id={member}/{switch.device};id={switch_id}"
lock_blocked = self.gw_entities[member]["switches"].get("lock")
# Assume Plugs under Plugwise control are not part of a group
Expand Down
40 changes: 23 additions & 17 deletions plugwise/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,30 +82,36 @@ def check_heater_central(xml: etree.Element) -> str:
for a system that has two heater_central appliances.
"""
locator = "./appliance[type='heater_central']"
hc_count = 0
hc_list: list[dict[str, bool]] = []
for heater_central in xml.findall(locator):
hc_count += 1
hc_id: str = heater_central.attrib["id"]
has_actuators: bool = (
heater_central.find("actuator_functionalities/") is not None
)
heater_central_count = 0
heater_central_list: list[dict[str, bool]] = []
if not (result := xml.findall(locator)):
return NONE # pragma: no cover

for heater_central in result:
if (heater_central_id := heater_central.get("id")) is None:
continue # pragma: no cover

if (heater_central_name := heater_central.find("name")) is None:
continue # pragma: no cover

has_actuators = heater_central.find("actuator_functionalities/") is not None
# Filter for Plug/Circle/Stealth heater_central -- Pw-Beta Issue #739
if heater_central.find("name").text == "Central heating boiler":
hc_list.append({hc_id: has_actuators})
if heater_central_name.text == "Central heating boiler":
heater_central_list.append({heater_central_id: has_actuators})
heater_central_count += 1

if not hc_list:
if not heater_central_list:
return NONE # pragma: no cover

heater_central_id = list(hc_list[0].keys())[0]
if hc_count > 1:
for item in hc_list:
hc_id, has_actuators = next(iter(item.items()))
heater_id = list(heater_central_list[0].keys())[0]
if heater_central_count > 1:
for item in heater_central_list:
heater_central_id, has_actuators = next(iter(item.items()))
if has_actuators:
heater_central_id = hc_id
heater_id = heater_central_id
break

return heater_central_id
return heater_id


def check_model(name: str | None, vendor_name: str | None) -> str | None:
Expand Down