diff --git a/deebot_client/command.py b/deebot_client/command.py index dc0e0cf38..207d6da0e 100644 --- a/deebot_client/command.py +++ b/deebot_client/command.py @@ -307,7 +307,9 @@ def _create_from_mqtt(cls, data: dict[str, Any]) -> CommandMqttP2P: data.pop(name, None) else: try: - values[param.name or name] = _pop_or_raise(name, param.type_, data) + values[param.name or name] = cls._pop_or_raise( + name, param.type_, data + ) except KeyError as err: if not param.optional: msg = f'"{name}" is missing in {data}' @@ -318,14 +320,18 @@ def _create_from_mqtt(cls, data: dict[str, Any]) -> CommandMqttP2P: return cls(**values) + @classmethod + def _pop_or_raise(cls, name: str, type_: type, data: dict[str, Any]) -> Any: + value = data.pop(name) + try: + return cls._decode(type_, value) + except ValueError as err: + msg = f'Could not convert "{value}" of {name} into {type_}' + raise DeebotError(msg) from err -def _pop_or_raise(name: str, type_: type, data: dict[str, Any]) -> Any: - value = data.pop(name) - try: + @classmethod + def _decode(cls, type_: type, value: Any) -> Any: return type_(value) - except ValueError as err: - msg = f'Could not convert "{value}" of {name} into {type_}' - raise DeebotError(msg) from err class GetCommand(CommandWithMessageHandling, ABC): diff --git a/deebot_client/commands/__init__.py b/deebot_client/commands/__init__.py index 5fb527c47..db326038e 100644 --- a/deebot_client/commands/__init__.py +++ b/deebot_client/commands/__init__.py @@ -11,14 +11,22 @@ COMMANDS as JSON_COMMANDS, COMMANDS_WITH_MQTT_P2P_HANDLING as JSON_COMMANDS_WITH_MQTT_P2P_HANDLING, ) +from .xml import ( + COMMANDS as XML_COMMANDS, + COMMANDS_WITH_MQTT_P2P_HANDLING as XML_COMMANDS_WITH_MQTT_P2P_HANDLING, +) if TYPE_CHECKING: from deebot_client.command import Command, CommandMqttP2P -COMMANDS: dict[DataType, dict[str, type[Command]]] = {DataType.JSON: JSON_COMMANDS} +COMMANDS: dict[DataType, dict[str, type[Command]]] = { + DataType.JSON: JSON_COMMANDS, + DataType.XML: XML_COMMANDS, +} COMMANDS_WITH_MQTT_P2P_HANDLING: dict[DataType, dict[str, type[CommandMqttP2P]]] = { - DataType.JSON: JSON_COMMANDS_WITH_MQTT_P2P_HANDLING + DataType.JSON: JSON_COMMANDS_WITH_MQTT_P2P_HANDLING, + DataType.XML: XML_COMMANDS_WITH_MQTT_P2P_HANDLING, } diff --git a/deebot_client/commands/xml/__init__.py b/deebot_client/commands/xml/__init__.py index a2e3c9e07..ec47efb66 100644 --- a/deebot_client/commands/xml/__init__.py +++ b/deebot_client/commands/xml/__init__.py @@ -13,7 +13,7 @@ from .clean_logs import GetCleanLogs from .clean_speed import GetCleanSpeed, SetCleanSpeed from .error import GetError -from .life_span import GetLifeSpan +from .life_span import GetLifeSpan, ResetLifeSpan from .map import GetMapM, GetMapSet, GetMapSt, GetTrM, PullM, PullMP from .play_sound import PlaySound from .pos import GetChargerPos, GetPos @@ -43,6 +43,7 @@ "PlaySound", "PullM", "PullMP", + "ResetLifeSpan", "SetCleanSpeed", ] @@ -67,6 +68,7 @@ GetError, GetLifeSpan, + ResetLifeSpan, GetMapM, GetMapSet, diff --git a/deebot_client/commands/xml/common.py b/deebot_client/commands/xml/common.py index 8044c09b7..940a5daf3 100644 --- a/deebot_client/commands/xml/common.py +++ b/deebot_client/commands/xml/common.py @@ -3,17 +3,24 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, cast, final, override from xml.etree.ElementTree import Element, SubElement from defusedxml import ElementTree # type: ignore[import-untyped] -from deebot_client.command import Command, CommandWithMessageHandling, SetCommand +from deebot_client.command import ( + Command, + CommandMqttP2P, + CommandWithMessageHandling, + SetCommand, +) from deebot_client.const import DataType from deebot_client.logging_filter import get_logger from deebot_client.message import HandlingResult, HandlingState, MessageStr if TYPE_CHECKING: + from typing import Any + from deebot_client.event_bus import EventBus _LOGGER = get_logger(__name__) @@ -76,11 +83,53 @@ def _handle_xml(cls, _: EventBus, xml: Element) -> HandlingResult: return HandlingResult.success() _LOGGER.warning( - 'Command "%s" was not successful. XML response: %s', cls.NAME, xml + 'Command "%s" was not successful. XML response: %s', + cls.NAME, + ElementTree.tostring(xml, "unicode"), ) return HandlingResult(HandlingState.FAILED) +class XmlCommandMqttP2P(XmlCommand, CommandMqttP2P, ABC): + """Json base command for mqtt p2p channel.""" + + @classmethod + def create_from_mqtt(cls, payload: str | bytes | bytearray) -> CommandMqttP2P: + """Create a command from the mqtt data.""" + xml = ElementTree.fromstring(payload) + return cls._create_from_mqtt(xml.attrib) + + @final + @override + def handle_mqtt_p2p( + self, event_bus: EventBus, response_payload: str | bytes | bytearray + ) -> None: + """Handle response received over the mqtt channel "p2p".""" + if isinstance(response_payload, bytearray): + data = bytes(response_payload).decode() + elif isinstance(response_payload, bytes): + data = response_payload.decode() + elif isinstance(response_payload, str): + data = response_payload + else: + msg = "Unsupported message data type {message_type}" # type: ignore[unreachable] + raise TypeError(msg.format(message_type=type(response_payload))) + + self._handle_mqtt_p2p(event_bus, data) + + @classmethod + def _decode(cls, type_: type, value: Any) -> Any: + if hasattr(type_, "from_xml"): + return type_.from_xml(value) + return type_(value) + + @abstractmethod + def _handle_mqtt_p2p( + self, event_bus: EventBus, response: dict[str, Any] | str + ) -> None: + """Handle response received over the mqtt channel "p2p".""" + + class XmlSetCommand(ExecuteCommand, SetCommand, ABC): """Xml base set command. diff --git a/deebot_client/commands/xml/life_span.py b/deebot_client/commands/xml/life_span.py index 52cf0bf7a..97493f9ce 100644 --- a/deebot_client/commands/xml/life_span.py +++ b/deebot_client/commands/xml/life_span.py @@ -2,12 +2,14 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from types import MappingProxyType +from typing import TYPE_CHECKING, Any +from deebot_client.command import InitParam from deebot_client.events import LifeSpan, LifeSpanEvent -from deebot_client.message import HandlingResult +from deebot_client.message import HandlingResult, HandlingState -from .common import XmlCommandWithMessageHandling +from .common import ExecuteCommand, XmlCommandMqttP2P, XmlCommandWithMessageHandling if TYPE_CHECKING: from xml.etree.ElementTree import Element @@ -20,8 +22,11 @@ class GetLifeSpan(XmlCommandWithMessageHandling): NAME = "GetLifeSpan" - def __init__(self, life_span: LifeSpan) -> None: - super().__init__({"type": life_span.xml_value}) + def __init__(self, life_span: LifeSpan | str) -> None: + xml_value = ( + life_span.xml_value if isinstance(life_span, LifeSpan) else life_span + ) + super().__init__({"type": xml_value}) @classmethod def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: @@ -47,3 +52,24 @@ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: LifeSpanEvent(LifeSpan.from_xml(component_type), percent, left) ) return HandlingResult.success() + + +class ResetLifeSpan(ExecuteCommand, XmlCommandMqttP2P): + """ResetLifeSpan command.""" + + NAME = "ResetLifeSpan" + _mqtt_params = MappingProxyType({"type": InitParam(LifeSpan, "life_span")}) + + def __init__(self, life_span: LifeSpan | str) -> None: + xml_value = ( + life_span.xml_value if isinstance(life_span, LifeSpan) else life_span + ) + super().__init__({"type": xml_value}) + + def _handle_mqtt_p2p( + self, event_bus: EventBus, response: dict[str, Any] | str + ) -> None: + """Handle response received over the mqtt channel "p2p".""" + result = self.handle(event_bus, response) + if result.state == HandlingState.SUCCESS: + event_bus.request_refresh(LifeSpanEvent) diff --git a/tests/commands/xml/__init__.py b/tests/commands/xml/__init__.py index fadc827a0..2fc9ecda4 100644 --- a/tests/commands/xml/__init__.py +++ b/tests/commands/xml/__init__.py @@ -1,6 +1,65 @@ from __future__ import annotations -from typing import Any +from typing import TYPE_CHECKING, Any, cast +from xml.etree.ElementTree import Element, SubElement + +from defusedxml import ElementTree # type: ignore[import-untyped] +from testfixtures import LogCapture + +from deebot_client.command import CommandResult +from deebot_client.message import HandlingState +from tests.commands import assert_command + +if TYPE_CHECKING: + from deebot_client.commands.xml.common import ExecuteCommand + + +def get_success_body( + extra_attrs: dict[str, Any] | None = None, sub_element_name: str | None = None +) -> str: + element = ctl_element = Element("ctl") + element.set("ret", "ok") + + if extra_attrs is not None and len(extra_attrs) > 0: + if sub_element_name is not None: + element = SubElement(element, sub_element_name.lower()) + + if isinstance(extra_attrs, dict): + for key, value in extra_attrs.items(): + element.set(key, value) + + return cast("str", ElementTree.tostring(ctl_element, "unicode")) + + +def get_failure_body() -> str: + return '' + + +async def assert_execute_command( + command: ExecuteCommand, args: dict[str, Any] | list[Any] | None +) -> None: + assert command.NAME != "invalid" + assert command._args == args + + # success + xml = get_request_xml(get_success_body()) + await assert_command(command, xml, None) + + # failed + with LogCapture() as log: + body = get_failure_body() + xml = get_request_xml(body) + await assert_command( + command, xml, None, command_result=CommandResult(HandlingState.FAILED) + ) + + log.check_present( + ( + "deebot_client.commands.xml.common", + "WARNING", + f'Command "{command.NAME}" was not successful. XML response: {body}', + ) + ) def get_request_xml(data: str | None) -> dict[str, Any]: diff --git a/tests/commands/xml/test_common.py b/tests/commands/xml/test_common.py new file mode 100644 index 000000000..c048dc6d3 --- /dev/null +++ b/tests/commands/xml/test_common.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from types import MappingProxyType +from typing import Any +from unittest.mock import Mock, patch + +import pytest + +from deebot_client.command import CommandResult, InitParam +from deebot_client.commands.xml.common import XmlCommandMqttP2P +from deebot_client.event_bus import EventBus +from deebot_client.events import LifeSpan + + +@pytest.mark.parametrize( + ("payload", "decoded_payload"), + [ + (bytearray(bytes("test", "utf-8")), "test"), + (bytes("test", "utf-8"), "test"), + ("test", "test"), + ], + ids=["bytearray", "bytes", "str"], +) +@patch.multiple(XmlCommandMqttP2P, __abstractmethods__=set()) +def test_XmlCommandMqttP2P_decoding( + payload: bytearray | bytes | str, decoded_payload: str +) -> None: + command = XmlCommandMqttP2P() # type: ignore[abstract] + event_bus = Mock(spec_set=EventBus) + with patch.object(command, "_handle_mqtt_p2p", return_value=None) as mqtt_handler: + command.handle_mqtt_p2p(event_bus, payload) + + mqtt_handler.assert_called_once_with(event_bus, decoded_payload) + + +@patch.multiple(XmlCommandMqttP2P, __abstractmethods__=set()) +def test_XmlCommandMqttP2P_invalid_decoding() -> None: + command = XmlCommandMqttP2P() # type: ignore[abstract] + event_bus = Mock(spec_set=EventBus) + with ( + patch.object(command, "_handle_mqtt_p2p", return_value=None), + pytest.raises(TypeError), + ): + command.handle_mqtt_p2p(event_bus, {}) # type: ignore[arg-type] + + +@pytest.mark.parametrize( + ("command_payload", "payload_type", "expected_argument"), + [ + ("test", str, "test"), + (LifeSpan.BRUSH.xml_value, LifeSpan, LifeSpan.BRUSH), + ], + ids=["native", "with xml_value"], +) +def test_XmlCommandMqttP2P_create_from_mqtt( + command_payload: str, payload_type: type, expected_argument: type +) -> None: + class Sut(XmlCommandMqttP2P): + NAME = "Sut" + _mqtt_params = MappingProxyType( + {"payload": InitParam(payload_type), "remove": None} + ) + + def __init__(self, payload: Any) -> None: + super().__init__(args={"payload": payload}) + + def _handle_mqtt_p2p( + self, _event_bus: EventBus, _response: dict[str, Any] | str + ) -> None: + pass + + def _handle_response( + self, _event_bus: EventBus, _response: dict[str, Any] + ) -> CommandResult: + return CommandResult.analyse() + + xml_message = f"" + + sut = Sut.create_from_mqtt(xml_message) + + assert sut._args == {"payload": expected_argument} diff --git a/tests/commands/xml/test_life_span.py b/tests/commands/xml/test_life_span.py index fb039729e..88da76d32 100644 --- a/tests/commands/xml/test_life_span.py +++ b/tests/commands/xml/test_life_span.py @@ -1,20 +1,38 @@ from __future__ import annotations +from typing import TYPE_CHECKING +from unittest.mock import patch + import pytest from deebot_client.command import CommandResult -from deebot_client.commands.xml import GetLifeSpan +from deebot_client.commands.xml import GetLifeSpan, ResetLifeSpan from deebot_client.events import LifeSpan, LifeSpanEvent from deebot_client.message import HandlingState from tests.commands import assert_command -from . import get_request_xml +from . import ( + assert_execute_command, + get_failure_body, + get_request_xml, + get_success_body, +) + +if TYPE_CHECKING: + from deebot_client.event_bus import EventBus @pytest.mark.parametrize( ("component_type", "lifespan_type", "left", "total", "expected_event"), [ ("Brush", LifeSpan.BRUSH, 50, 100, LifeSpanEvent(LifeSpan.BRUSH, 50, 50)), + ( + "Brush", + LifeSpan.BRUSH.xml_value, + 50, + 100, + LifeSpanEvent(LifeSpan.BRUSH, 50, 50), + ), ( "DustCaseHeap", LifeSpan.DUST_CASE_HEAP, @@ -33,15 +51,15 @@ ) async def test_get_life_span( component_type: str, - lifespan_type: LifeSpan, + lifespan_type: LifeSpan | str, left: int, total: int, expected_event: LifeSpanEvent, ) -> None: - json = get_request_xml( + xml = get_request_xml( f"" ) - await assert_command(GetLifeSpan(lifespan_type), json, expected_event) + await assert_command(GetLifeSpan(lifespan_type), xml, expected_event) @pytest.mark.parametrize( @@ -57,3 +75,42 @@ async def test_get_life_span_error(xml: str) -> None: None, command_result=CommandResult(HandlingState.ANALYSE_LOGGED), ) + + +@pytest.mark.parametrize( + ("command", "args"), + [ + (ResetLifeSpan(LifeSpan.FILTER), {"type": LifeSpan.FILTER.xml_value}), + (ResetLifeSpan(LifeSpan.FILTER.xml_value), {"type": LifeSpan.FILTER.xml_value}), + ( + ResetLifeSpan.create_from_mqtt(b''), + {"type": LifeSpan.BRUSH.xml_value}, + ), + ], +) +async def test_ResetLifeSpan(command: ResetLifeSpan, args: dict[str, str]) -> None: + await assert_execute_command(command, args) + + +def test_ResetLifeSpan_invokes_refresh(event_bus: EventBus) -> None: + command = ResetLifeSpan(LifeSpan.FILTER) + success_response = get_success_body() + + with patch.object( + event_bus, "request_refresh", return_value=None + ) as mock_request_refresh: + command.handle_mqtt_p2p(event_bus, success_response) + + mock_request_refresh.assert_called_with(LifeSpanEvent) + + +def test_ResetLifeSpan_not_invokes_refresh(event_bus: EventBus) -> None: + command = ResetLifeSpan(LifeSpan.FILTER) + failure_response = get_failure_body() + + with patch.object( + event_bus, "request_refresh", return_value=None + ) as mock_request_refresh: + command.handle_mqtt_p2p(event_bus, failure_response) + + mock_request_refresh.assert_not_called()