diff --git a/deebot_client/command.py b/deebot_client/command.py
index dc0e0cf38..207d6da0e 100644
--- a/deebot_client/command.py
+++ b/deebot_client/command.py
@@ -307,7 +307,9 @@ def _create_from_mqtt(cls, data: dict[str, Any]) -> CommandMqttP2P:
data.pop(name, None)
else:
try:
- values[param.name or name] = _pop_or_raise(name, param.type_, data)
+ values[param.name or name] = cls._pop_or_raise(
+ name, param.type_, data
+ )
except KeyError as err:
if not param.optional:
msg = f'"{name}" is missing in {data}'
@@ -318,14 +320,18 @@ def _create_from_mqtt(cls, data: dict[str, Any]) -> CommandMqttP2P:
return cls(**values)
+ @classmethod
+ def _pop_or_raise(cls, name: str, type_: type, data: dict[str, Any]) -> Any:
+ value = data.pop(name)
+ try:
+ return cls._decode(type_, value)
+ except ValueError as err:
+ msg = f'Could not convert "{value}" of {name} into {type_}'
+ raise DeebotError(msg) from err
-def _pop_or_raise(name: str, type_: type, data: dict[str, Any]) -> Any:
- value = data.pop(name)
- try:
+ @classmethod
+ def _decode(cls, type_: type, value: Any) -> Any:
return type_(value)
- except ValueError as err:
- msg = f'Could not convert "{value}" of {name} into {type_}'
- raise DeebotError(msg) from err
class GetCommand(CommandWithMessageHandling, ABC):
diff --git a/deebot_client/commands/__init__.py b/deebot_client/commands/__init__.py
index 5fb527c47..db326038e 100644
--- a/deebot_client/commands/__init__.py
+++ b/deebot_client/commands/__init__.py
@@ -11,14 +11,22 @@
COMMANDS as JSON_COMMANDS,
COMMANDS_WITH_MQTT_P2P_HANDLING as JSON_COMMANDS_WITH_MQTT_P2P_HANDLING,
)
+from .xml import (
+ COMMANDS as XML_COMMANDS,
+ COMMANDS_WITH_MQTT_P2P_HANDLING as XML_COMMANDS_WITH_MQTT_P2P_HANDLING,
+)
if TYPE_CHECKING:
from deebot_client.command import Command, CommandMqttP2P
-COMMANDS: dict[DataType, dict[str, type[Command]]] = {DataType.JSON: JSON_COMMANDS}
+COMMANDS: dict[DataType, dict[str, type[Command]]] = {
+ DataType.JSON: JSON_COMMANDS,
+ DataType.XML: XML_COMMANDS,
+}
COMMANDS_WITH_MQTT_P2P_HANDLING: dict[DataType, dict[str, type[CommandMqttP2P]]] = {
- DataType.JSON: JSON_COMMANDS_WITH_MQTT_P2P_HANDLING
+ DataType.JSON: JSON_COMMANDS_WITH_MQTT_P2P_HANDLING,
+ DataType.XML: XML_COMMANDS_WITH_MQTT_P2P_HANDLING,
}
diff --git a/deebot_client/commands/xml/__init__.py b/deebot_client/commands/xml/__init__.py
index a2e3c9e07..ec47efb66 100644
--- a/deebot_client/commands/xml/__init__.py
+++ b/deebot_client/commands/xml/__init__.py
@@ -13,7 +13,7 @@
from .clean_logs import GetCleanLogs
from .clean_speed import GetCleanSpeed, SetCleanSpeed
from .error import GetError
-from .life_span import GetLifeSpan
+from .life_span import GetLifeSpan, ResetLifeSpan
from .map import GetMapM, GetMapSet, GetMapSt, GetTrM, PullM, PullMP
from .play_sound import PlaySound
from .pos import GetChargerPos, GetPos
@@ -43,6 +43,7 @@
"PlaySound",
"PullM",
"PullMP",
+ "ResetLifeSpan",
"SetCleanSpeed",
]
@@ -67,6 +68,7 @@
GetError,
GetLifeSpan,
+ ResetLifeSpan,
GetMapM,
GetMapSet,
diff --git a/deebot_client/commands/xml/common.py b/deebot_client/commands/xml/common.py
index 8044c09b7..940a5daf3 100644
--- a/deebot_client/commands/xml/common.py
+++ b/deebot_client/commands/xml/common.py
@@ -3,17 +3,24 @@
from __future__ import annotations
from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, cast
+from typing import TYPE_CHECKING, cast, final, override
from xml.etree.ElementTree import Element, SubElement
from defusedxml import ElementTree # type: ignore[import-untyped]
-from deebot_client.command import Command, CommandWithMessageHandling, SetCommand
+from deebot_client.command import (
+ Command,
+ CommandMqttP2P,
+ CommandWithMessageHandling,
+ SetCommand,
+)
from deebot_client.const import DataType
from deebot_client.logging_filter import get_logger
from deebot_client.message import HandlingResult, HandlingState, MessageStr
if TYPE_CHECKING:
+ from typing import Any
+
from deebot_client.event_bus import EventBus
_LOGGER = get_logger(__name__)
@@ -76,11 +83,53 @@ def _handle_xml(cls, _: EventBus, xml: Element) -> HandlingResult:
return HandlingResult.success()
_LOGGER.warning(
- 'Command "%s" was not successful. XML response: %s', cls.NAME, xml
+ 'Command "%s" was not successful. XML response: %s',
+ cls.NAME,
+ ElementTree.tostring(xml, "unicode"),
)
return HandlingResult(HandlingState.FAILED)
+class XmlCommandMqttP2P(XmlCommand, CommandMqttP2P, ABC):
+ """Json base command for mqtt p2p channel."""
+
+ @classmethod
+ def create_from_mqtt(cls, payload: str | bytes | bytearray) -> CommandMqttP2P:
+ """Create a command from the mqtt data."""
+ xml = ElementTree.fromstring(payload)
+ return cls._create_from_mqtt(xml.attrib)
+
+ @final
+ @override
+ def handle_mqtt_p2p(
+ self, event_bus: EventBus, response_payload: str | bytes | bytearray
+ ) -> None:
+ """Handle response received over the mqtt channel "p2p"."""
+ if isinstance(response_payload, bytearray):
+ data = bytes(response_payload).decode()
+ elif isinstance(response_payload, bytes):
+ data = response_payload.decode()
+ elif isinstance(response_payload, str):
+ data = response_payload
+ else:
+ msg = "Unsupported message data type {message_type}" # type: ignore[unreachable]
+ raise TypeError(msg.format(message_type=type(response_payload)))
+
+ self._handle_mqtt_p2p(event_bus, data)
+
+ @classmethod
+ def _decode(cls, type_: type, value: Any) -> Any:
+ if hasattr(type_, "from_xml"):
+ return type_.from_xml(value)
+ return type_(value)
+
+ @abstractmethod
+ def _handle_mqtt_p2p(
+ self, event_bus: EventBus, response: dict[str, Any] | str
+ ) -> None:
+ """Handle response received over the mqtt channel "p2p"."""
+
+
class XmlSetCommand(ExecuteCommand, SetCommand, ABC):
"""Xml base set command.
diff --git a/deebot_client/commands/xml/life_span.py b/deebot_client/commands/xml/life_span.py
index 52cf0bf7a..97493f9ce 100644
--- a/deebot_client/commands/xml/life_span.py
+++ b/deebot_client/commands/xml/life_span.py
@@ -2,12 +2,14 @@
from __future__ import annotations
-from typing import TYPE_CHECKING
+from types import MappingProxyType
+from typing import TYPE_CHECKING, Any
+from deebot_client.command import InitParam
from deebot_client.events import LifeSpan, LifeSpanEvent
-from deebot_client.message import HandlingResult
+from deebot_client.message import HandlingResult, HandlingState
-from .common import XmlCommandWithMessageHandling
+from .common import ExecuteCommand, XmlCommandMqttP2P, XmlCommandWithMessageHandling
if TYPE_CHECKING:
from xml.etree.ElementTree import Element
@@ -20,8 +22,11 @@ class GetLifeSpan(XmlCommandWithMessageHandling):
NAME = "GetLifeSpan"
- def __init__(self, life_span: LifeSpan) -> None:
- super().__init__({"type": life_span.xml_value})
+ def __init__(self, life_span: LifeSpan | str) -> None:
+ xml_value = (
+ life_span.xml_value if isinstance(life_span, LifeSpan) else life_span
+ )
+ super().__init__({"type": xml_value})
@classmethod
def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
@@ -47,3 +52,24 @@ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
LifeSpanEvent(LifeSpan.from_xml(component_type), percent, left)
)
return HandlingResult.success()
+
+
+class ResetLifeSpan(ExecuteCommand, XmlCommandMqttP2P):
+ """ResetLifeSpan command."""
+
+ NAME = "ResetLifeSpan"
+ _mqtt_params = MappingProxyType({"type": InitParam(LifeSpan, "life_span")})
+
+ def __init__(self, life_span: LifeSpan | str) -> None:
+ xml_value = (
+ life_span.xml_value if isinstance(life_span, LifeSpan) else life_span
+ )
+ super().__init__({"type": xml_value})
+
+ def _handle_mqtt_p2p(
+ self, event_bus: EventBus, response: dict[str, Any] | str
+ ) -> None:
+ """Handle response received over the mqtt channel "p2p"."""
+ result = self.handle(event_bus, response)
+ if result.state == HandlingState.SUCCESS:
+ event_bus.request_refresh(LifeSpanEvent)
diff --git a/tests/commands/xml/__init__.py b/tests/commands/xml/__init__.py
index fadc827a0..2fc9ecda4 100644
--- a/tests/commands/xml/__init__.py
+++ b/tests/commands/xml/__init__.py
@@ -1,6 +1,65 @@
from __future__ import annotations
-from typing import Any
+from typing import TYPE_CHECKING, Any, cast
+from xml.etree.ElementTree import Element, SubElement
+
+from defusedxml import ElementTree # type: ignore[import-untyped]
+from testfixtures import LogCapture
+
+from deebot_client.command import CommandResult
+from deebot_client.message import HandlingState
+from tests.commands import assert_command
+
+if TYPE_CHECKING:
+ from deebot_client.commands.xml.common import ExecuteCommand
+
+
+def get_success_body(
+ extra_attrs: dict[str, Any] | None = None, sub_element_name: str | None = None
+) -> str:
+ element = ctl_element = Element("ctl")
+ element.set("ret", "ok")
+
+ if extra_attrs is not None and len(extra_attrs) > 0:
+ if sub_element_name is not None:
+ element = SubElement(element, sub_element_name.lower())
+
+ if isinstance(extra_attrs, dict):
+ for key, value in extra_attrs.items():
+ element.set(key, value)
+
+ return cast("str", ElementTree.tostring(ctl_element, "unicode"))
+
+
+def get_failure_body() -> str:
+ return ''
+
+
+async def assert_execute_command(
+ command: ExecuteCommand, args: dict[str, Any] | list[Any] | None
+) -> None:
+ assert command.NAME != "invalid"
+ assert command._args == args
+
+ # success
+ xml = get_request_xml(get_success_body())
+ await assert_command(command, xml, None)
+
+ # failed
+ with LogCapture() as log:
+ body = get_failure_body()
+ xml = get_request_xml(body)
+ await assert_command(
+ command, xml, None, command_result=CommandResult(HandlingState.FAILED)
+ )
+
+ log.check_present(
+ (
+ "deebot_client.commands.xml.common",
+ "WARNING",
+ f'Command "{command.NAME}" was not successful. XML response: {body}',
+ )
+ )
def get_request_xml(data: str | None) -> dict[str, Any]:
diff --git a/tests/commands/xml/test_common.py b/tests/commands/xml/test_common.py
new file mode 100644
index 000000000..c048dc6d3
--- /dev/null
+++ b/tests/commands/xml/test_common.py
@@ -0,0 +1,81 @@
+from __future__ import annotations
+
+from types import MappingProxyType
+from typing import Any
+from unittest.mock import Mock, patch
+
+import pytest
+
+from deebot_client.command import CommandResult, InitParam
+from deebot_client.commands.xml.common import XmlCommandMqttP2P
+from deebot_client.event_bus import EventBus
+from deebot_client.events import LifeSpan
+
+
+@pytest.mark.parametrize(
+ ("payload", "decoded_payload"),
+ [
+ (bytearray(bytes("test", "utf-8")), "test"),
+ (bytes("test", "utf-8"), "test"),
+ ("test", "test"),
+ ],
+ ids=["bytearray", "bytes", "str"],
+)
+@patch.multiple(XmlCommandMqttP2P, __abstractmethods__=set())
+def test_XmlCommandMqttP2P_decoding(
+ payload: bytearray | bytes | str, decoded_payload: str
+) -> None:
+ command = XmlCommandMqttP2P() # type: ignore[abstract]
+ event_bus = Mock(spec_set=EventBus)
+ with patch.object(command, "_handle_mqtt_p2p", return_value=None) as mqtt_handler:
+ command.handle_mqtt_p2p(event_bus, payload)
+
+ mqtt_handler.assert_called_once_with(event_bus, decoded_payload)
+
+
+@patch.multiple(XmlCommandMqttP2P, __abstractmethods__=set())
+def test_XmlCommandMqttP2P_invalid_decoding() -> None:
+ command = XmlCommandMqttP2P() # type: ignore[abstract]
+ event_bus = Mock(spec_set=EventBus)
+ with (
+ patch.object(command, "_handle_mqtt_p2p", return_value=None),
+ pytest.raises(TypeError),
+ ):
+ command.handle_mqtt_p2p(event_bus, {}) # type: ignore[arg-type]
+
+
+@pytest.mark.parametrize(
+ ("command_payload", "payload_type", "expected_argument"),
+ [
+ ("test", str, "test"),
+ (LifeSpan.BRUSH.xml_value, LifeSpan, LifeSpan.BRUSH),
+ ],
+ ids=["native", "with xml_value"],
+)
+def test_XmlCommandMqttP2P_create_from_mqtt(
+ command_payload: str, payload_type: type, expected_argument: type
+) -> None:
+ class Sut(XmlCommandMqttP2P):
+ NAME = "Sut"
+ _mqtt_params = MappingProxyType(
+ {"payload": InitParam(payload_type), "remove": None}
+ )
+
+ def __init__(self, payload: Any) -> None:
+ super().__init__(args={"payload": payload})
+
+ def _handle_mqtt_p2p(
+ self, _event_bus: EventBus, _response: dict[str, Any] | str
+ ) -> None:
+ pass
+
+ def _handle_response(
+ self, _event_bus: EventBus, _response: dict[str, Any]
+ ) -> CommandResult:
+ return CommandResult.analyse()
+
+ xml_message = f""
+
+ sut = Sut.create_from_mqtt(xml_message)
+
+ assert sut._args == {"payload": expected_argument}
diff --git a/tests/commands/xml/test_life_span.py b/tests/commands/xml/test_life_span.py
index fb039729e..88da76d32 100644
--- a/tests/commands/xml/test_life_span.py
+++ b/tests/commands/xml/test_life_span.py
@@ -1,20 +1,38 @@
from __future__ import annotations
+from typing import TYPE_CHECKING
+from unittest.mock import patch
+
import pytest
from deebot_client.command import CommandResult
-from deebot_client.commands.xml import GetLifeSpan
+from deebot_client.commands.xml import GetLifeSpan, ResetLifeSpan
from deebot_client.events import LifeSpan, LifeSpanEvent
from deebot_client.message import HandlingState
from tests.commands import assert_command
-from . import get_request_xml
+from . import (
+ assert_execute_command,
+ get_failure_body,
+ get_request_xml,
+ get_success_body,
+)
+
+if TYPE_CHECKING:
+ from deebot_client.event_bus import EventBus
@pytest.mark.parametrize(
("component_type", "lifespan_type", "left", "total", "expected_event"),
[
("Brush", LifeSpan.BRUSH, 50, 100, LifeSpanEvent(LifeSpan.BRUSH, 50, 50)),
+ (
+ "Brush",
+ LifeSpan.BRUSH.xml_value,
+ 50,
+ 100,
+ LifeSpanEvent(LifeSpan.BRUSH, 50, 50),
+ ),
(
"DustCaseHeap",
LifeSpan.DUST_CASE_HEAP,
@@ -33,15 +51,15 @@
)
async def test_get_life_span(
component_type: str,
- lifespan_type: LifeSpan,
+ lifespan_type: LifeSpan | str,
left: int,
total: int,
expected_event: LifeSpanEvent,
) -> None:
- json = get_request_xml(
+ xml = get_request_xml(
f""
)
- await assert_command(GetLifeSpan(lifespan_type), json, expected_event)
+ await assert_command(GetLifeSpan(lifespan_type), xml, expected_event)
@pytest.mark.parametrize(
@@ -57,3 +75,42 @@ async def test_get_life_span_error(xml: str) -> None:
None,
command_result=CommandResult(HandlingState.ANALYSE_LOGGED),
)
+
+
+@pytest.mark.parametrize(
+ ("command", "args"),
+ [
+ (ResetLifeSpan(LifeSpan.FILTER), {"type": LifeSpan.FILTER.xml_value}),
+ (ResetLifeSpan(LifeSpan.FILTER.xml_value), {"type": LifeSpan.FILTER.xml_value}),
+ (
+ ResetLifeSpan.create_from_mqtt(b''),
+ {"type": LifeSpan.BRUSH.xml_value},
+ ),
+ ],
+)
+async def test_ResetLifeSpan(command: ResetLifeSpan, args: dict[str, str]) -> None:
+ await assert_execute_command(command, args)
+
+
+def test_ResetLifeSpan_invokes_refresh(event_bus: EventBus) -> None:
+ command = ResetLifeSpan(LifeSpan.FILTER)
+ success_response = get_success_body()
+
+ with patch.object(
+ event_bus, "request_refresh", return_value=None
+ ) as mock_request_refresh:
+ command.handle_mqtt_p2p(event_bus, success_response)
+
+ mock_request_refresh.assert_called_with(LifeSpanEvent)
+
+
+def test_ResetLifeSpan_not_invokes_refresh(event_bus: EventBus) -> None:
+ command = ResetLifeSpan(LifeSpan.FILTER)
+ failure_response = get_failure_body()
+
+ with patch.object(
+ event_bus, "request_refresh", return_value=None
+ ) as mock_request_refresh:
+ command.handle_mqtt_p2p(event_bus, failure_response)
+
+ mock_request_refresh.assert_not_called()