diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c3f533863..0180cd249 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -32,7 +32,7 @@ repos: hooks: - id: codespell args: - - --ignore-words-list=deebot + - --ignore-words-list=deebot,MapP - --skip="./.*,*.csv,*.json" - --quiet-level=2 - --exclude-file=deebot_client/util/continents.py diff --git a/deebot_client/capabilities.py b/deebot_client/capabilities.py index d03b2fc23..2c72539f9 100644 --- a/deebot_client/capabilities.py +++ b/deebot_client/capabilities.py @@ -175,9 +175,9 @@ class CapabilityMap: changed: CapabilityEvent[MapChangedEvent] clear: CapabilityExecute[[]] | None = None major: CapabilityEvent[MajorMapEvent] - multi_state: CapabilitySetEnable[MultimapStateEvent] + multi_state: CapabilitySetEnable[MultimapStateEvent] | None = None position: CapabilityEvent[PositionsEvent] - relocation: CapabilityExecute[[]] + relocation: CapabilityExecute[[]] | None = None rooms: CapabilityEvent[RoomsEvent] trace: CapabilityEvent[MapTraceEvent] @@ -213,7 +213,7 @@ class CapabilitySettings: sweep_mode: CapabilitySetEnable[SweepModeEvent] | None = None true_detect: CapabilitySetEnable[TrueDetectEvent] | None = None voice_assistant: CapabilitySetEnable[VoiceAssistantStateEvent] | None = None - volume: CapabilitySet[VolumeEvent, [int]] + volume: CapabilitySet[VolumeEvent, [int]] | None = None @dataclass(frozen=True, kw_only=True) diff --git a/deebot_client/commands/__init__.py b/deebot_client/commands/__init__.py index 5fb527c47..fc4574353 100644 --- a/deebot_client/commands/__init__.py +++ b/deebot_client/commands/__init__.py @@ -11,6 +11,9 @@ COMMANDS as JSON_COMMANDS, COMMANDS_WITH_MQTT_P2P_HANDLING as JSON_COMMANDS_WITH_MQTT_P2P_HANDLING, ) +from .xml import ( + COMMANDS_WITH_MQTT_P2P_HANDLING as XML_COMMANDS_WITH_MQTT_P2P_HANDLING, +) if TYPE_CHECKING: from deebot_client.command import Command, CommandMqttP2P @@ -18,7 +21,8 @@ COMMANDS: dict[DataType, dict[str, type[Command]]] = {DataType.JSON: JSON_COMMANDS} COMMANDS_WITH_MQTT_P2P_HANDLING: dict[DataType, dict[str, type[CommandMqttP2P]]] = { - DataType.JSON: JSON_COMMANDS_WITH_MQTT_P2P_HANDLING + DataType.JSON: JSON_COMMANDS_WITH_MQTT_P2P_HANDLING, + DataType.XML: XML_COMMANDS_WITH_MQTT_P2P_HANDLING, } diff --git a/deebot_client/commands/json/common.py b/deebot_client/commands/json/common.py index 623a608b5..ed69eb5f6 100644 --- a/deebot_client/commands/json/common.py +++ b/deebot_client/commands/json/common.py @@ -23,6 +23,7 @@ HandlingState, MessageBody, MessageBodyDataDict, + MessageDict, ) from deebot_client.util import verify_required_class_variables_exists @@ -62,6 +63,12 @@ class JsonCommandWithMessageHandling( """Command, which handle response by itself.""" +class JsonCommandWithRawMessageHandling( + JsonCommand, CommandWithMessageHandling, MessageDict, ABC +): + """Command, which handle raw response by itself.""" + + class ExecuteCommand(JsonCommandWithMessageHandling, ABC): """Command, which is executing something (ex. Charge).""" diff --git a/deebot_client/commands/xml/__init__.py b/deebot_client/commands/xml/__init__.py index 3ac6e073d..18ee94445 100644 --- a/deebot_client/commands/xml/__init__.py +++ b/deebot_client/commands/xml/__init__.py @@ -9,35 +9,71 @@ from .battery import GetBatteryInfo from .charge import Charge from .charge_state import GetChargeState +from .clean import Clean, CleanArea, GetCleanState +from .clean_logs import GetCleanLogs from .error import GetError -from .fan_speed import GetFanSpeed +from .fan_speed import GetCleanSpeed, SetCleanSpeed from .life_span import GetLifeSpan +from .map import GetMapM, GetMapSet, GetMapSt, GetTrM, PullM, PullMP from .play_sound import PlaySound from .pos import GetPos from .stats import GetCleanSum +from .water_info import GetWaterBoxInfo, GetWaterPermeability if TYPE_CHECKING: from .common import XmlCommand __all__ = [ "Charge", + "Clean", + "CleanArea", "GetBatteryInfo", "GetChargeState", + "GetCleanLogs", + "GetCleanSpeed", + "GetCleanState", "GetCleanSum", "GetError", - "GetFanSpeed", "GetLifeSpan", + "GetMapM", + "GetMapSet", + "GetMapSt", "GetPos", + "GetTrM", + "GetWaterBoxInfo", + "GetWaterPermeability", "PlaySound", + "PullM", + "PullMP", + "SetCleanSpeed", ] # fmt: off # ordered by file asc _COMMANDS: list[type[XmlCommand]] = [ + Charge, + Clean, + CleanArea, + GetError, GetBatteryInfo, + GetChargeState, + GetCleanLogs, + GetCleanSpeed, + GetCleanState, + GetCleanSum, GetError, GetLifeSpan, + GetMapM, + GetMapSet, + GetMapSt, + GetPos, + GetTrM, + GetWaterBoxInfo, + GetWaterPermeability, PlaySound, + PullM, + PullMP, + SetCleanSpeed, ] # fmt: on diff --git a/deebot_client/commands/xml/charge_state.py b/deebot_client/commands/xml/charge_state.py index 78b7110a2..6fff4d568 100644 --- a/deebot_client/commands/xml/charge_state.py +++ b/deebot_client/commands/xml/charge_state.py @@ -38,7 +38,7 @@ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: case "slotcharging" | "slot_charging" | "wirecharging": status = State.DOCKED case "idle": - status = State.IDLE + pass case "going": status = State.RETURNING case _: diff --git a/deebot_client/commands/xml/clean.py b/deebot_client/commands/xml/clean.py new file mode 100644 index 000000000..b04690992 --- /dev/null +++ b/deebot_client/commands/xml/clean.py @@ -0,0 +1,97 @@ +"""Clean commands.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events import FanSpeedEvent, FanSpeedLevel, StateEvent +from deebot_client.logging_filter import get_logger +from deebot_client.message import HandlingResult +from deebot_client.models import CleanAction, CleanMode, State + +from .common import ExecuteCommand, XmlCommandWithMessageHandling + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + +_LOGGER = get_logger(__name__) + + +class Clean(ExecuteCommand): + """Generic start/pause/stop cleaning command.""" + + NAME = "Clean" + HAS_SUB_ELEMENT = True + + def __init__( + self, action: CleanAction, speed: FanSpeedLevel = FanSpeedLevel.NORMAL + ) -> None: + # + + super().__init__( + { + "type": CleanMode.AUTO.xml_value, + "act": action.xml_value, + "speed": speed.xml_value, + } + ) + + +class CleanArea(ExecuteCommand): + """Clean area command.""" + + NAME = "Clean" + HAS_SUB_ELEMENT = True + + def __init__( + self, + mode: CleanMode, + area: str, + cleanings: int = 1, + speed: FanSpeedLevel = FanSpeedLevel.NORMAL, + ) -> None: + # + + super().__init__( + { + "type": mode.xml_value, + "act": CleanAction.START.xml_value, + "speed": speed.xml_value, + "deep": str(cleanings), + "mid": area, + } + ) + + +class GetCleanState(XmlCommandWithMessageHandling): + """GetCleanState command.""" + + NAME = "GetCleanState" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if xml.attrib.get("ret") != "ok" or (clean := xml.find("clean")) is None: + return HandlingResult.analyse() + + speed_attrib = clean.attrib.get("speed") + if speed_attrib is not None: + fan_speed_level = FanSpeedLevel.from_xml(speed_attrib) + event_bus.notify(FanSpeedEvent(fan_speed_level)) + + clean_attrib = clean.attrib.get("st") + if clean_attrib is not None: + clean_action = CleanAction.from_xml(clean_attrib) + if clean_action == CleanAction.START: + event_bus.notify(StateEvent(State.CLEANING)) + elif clean_action == CleanAction.PAUSE: + event_bus.notify(StateEvent(State.PAUSED)) + else: + _LOGGER.debug("Ignored CleanState %s", clean_action) + + return HandlingResult.success() diff --git a/deebot_client/commands/xml/clean_logs.py b/deebot_client/commands/xml/clean_logs.py new file mode 100644 index 000000000..ed6361cc3 --- /dev/null +++ b/deebot_client/commands/xml/clean_logs.py @@ -0,0 +1,75 @@ +"""Clean Logs commands.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.command import CommandResult +from deebot_client.events import ( + CleanLogEntry, + CleanLogEvent, +) +from deebot_client.logging_filter import get_logger +from deebot_client.message import HandlingResult +from deebot_client.util import get_enum + +from .common import XmlCommandWithMessageHandling +from .enum import XmlStopReason + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + +_LOGGER = get_logger(__name__) + + +class GetCleanLogs(XmlCommandWithMessageHandling): + """GetCleanLogs command.""" + + NAME = "GetCleanLogs" + + def __init__(self, count: int = 0) -> None: + super().__init__({"count": str(count)}) + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if ( + xml.attrib.get("ret") != "ok" + or (resp_logs := xml.findall("CleanSt")) is None + ): + return HandlingResult.analyse() + + if len(resp_logs) >= 0: + logs: list[CleanLogEntry] = [] + for log in resp_logs: + xml_stop_reason_attrib = str(log.attrib["f"]) + stop_reason = XmlStopReason.FINISHED + try: + stop_reason = get_enum(XmlStopReason, xml_stop_reason_attrib) + except Exception as e: + _LOGGER.error( + "Could not decode stop reason: %s", + xml_stop_reason_attrib, + exc_info=e, + ) + try: + logs.append( + CleanLogEntry( + timestamp=int(log.attrib["s"]), + image_url="", # Missing + type=log.attrib["t"], + area=int(log.attrib["a"]), + stop_reason=stop_reason.to_clean_job_status(), # To be extracted + duration=int(log.attrib["l"]), + ) + ) + except Exception: # pylint: disable=broad-except + _LOGGER.warning("Skipping log entry: %s", log, exc_info=True) + event_bus.notify(CleanLogEvent(logs)) + return CommandResult.success() + return HandlingResult.analyse() diff --git a/deebot_client/commands/xml/common.py b/deebot_client/commands/xml/common.py index 8044c09b7..4d84bf042 100644 --- a/deebot_client/commands/xml/common.py +++ b/deebot_client/commands/xml/common.py @@ -3,12 +3,18 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Any, cast from xml.etree.ElementTree import Element, SubElement from defusedxml import ElementTree # type: ignore[import-untyped] -from deebot_client.command import Command, CommandWithMessageHandling, SetCommand +from deebot_client.command import ( + Command, + CommandMqttP2P, + CommandWithMessageHandling, + GetCommand, + SetCommand, +) from deebot_client.const import DataType from deebot_client.logging_filter import get_logger from deebot_client.message import HandlingResult, HandlingState, MessageStr @@ -81,8 +87,50 @@ def _handle_xml(cls, _: EventBus, xml: Element) -> HandlingResult: return HandlingResult(HandlingState.FAILED) -class XmlSetCommand(ExecuteCommand, SetCommand, ABC): +class XmlCommandMqttP2P(XmlCommand, CommandMqttP2P, ABC): + """Json base command for mqtt p2p channel.""" + + @classmethod + def create_from_mqtt(cls, payload: str | bytes | bytearray) -> CommandMqttP2P: + """Create a command from the mqtt data.""" + xml = ElementTree.fromstring(payload) + return cls._create_from_mqtt(xml.attrib) + + def handle_mqtt_p2p( + self, event_bus: EventBus, response_payload: str | bytes | bytearray + ) -> None: + """Handle response received over the mqtt channel "p2p".""" + if isinstance(response_payload, bytearray): + data = bytes(response_payload).decode() + elif isinstance(response_payload, bytes): + data = response_payload.decode() + elif isinstance(response_payload, str): + data = response_payload + else: + msg = "Unsupported message data type {message_type}" + raise TypeError(msg.format(essage_type=type(response_payload))) + self._handle_mqtt_p2p(event_bus, data) + + @abstractmethod + def _handle_mqtt_p2p( + self, event_bus: EventBus, response: dict[str, Any] | str + ) -> None: + """Handle response received over the mqtt channel "p2p".""" + + +class XmlSetCommand(ExecuteCommand, SetCommand, XmlCommandMqttP2P, ABC): """Xml base set command. Command needs to be linked to the "get" command, for handling (updating) the sensors. """ + + +class XmlGetCommand(XmlCommandWithMessageHandling, GetCommand, ABC): + """Xml get command.""" + + @classmethod + @abstractmethod + def handle_set_args( + cls, event_bus: EventBus, args: dict[str, Any] + ) -> HandlingResult: + """Handle arguments of set command.""" diff --git a/deebot_client/commands/xml/enum.py b/deebot_client/commands/xml/enum.py new file mode 100644 index 000000000..f3f6fd49a --- /dev/null +++ b/deebot_client/commands/xml/enum.py @@ -0,0 +1,35 @@ +"""Enums used only by XML messages.""" + +from __future__ import annotations + +from enum import StrEnum + +from deebot_client.events import CleanJobStatus + + +class XmlStopReason(StrEnum): + """Reasons why cleaning has been stopped.""" + + FINISHED = "s" + BATTERY_LOW = "r" + STOPPED_BY_APP = "a" + STOPPED_BY_REMOTE_CONTROL = "i" + STOPPED_BY_BUTTON = "b" + STOPPED_BY_WARNING = "w" + STOPPED_BY_NO_DISTURB = "f" + STOPPED_BY_CLEARMAP = "m" + STOPPED_BY_NO_PATH = "n" + STOPPED_BY_NOT_IN_MAP = "u" + STOPPED_BY_VIRTUAL_WALL = "v" + + def to_clean_job_status(self) -> CleanJobStatus: + """Convert this value to a CleanJobStatus for compatibility proposes.""" + if self == XmlStopReason.FINISHED: + return CleanJobStatus.FINISHED + if self in ( + XmlStopReason.STOPPED_BY_APP, + XmlStopReason.STOPPED_BY_REMOTE_CONTROL, + XmlStopReason.STOPPED_BY_BUTTON, + ): + return CleanJobStatus.MANUALLY_STOPPED + return CleanJobStatus.FINISHED_WITH_WARNINGS diff --git a/deebot_client/commands/xml/fan_speed.py b/deebot_client/commands/xml/fan_speed.py index 2a834554c..e567d232c 100644 --- a/deebot_client/commands/xml/fan_speed.py +++ b/deebot_client/commands/xml/fan_speed.py @@ -2,12 +2,14 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from types import MappingProxyType +from typing import TYPE_CHECKING, Any +from deebot_client.command import InitParam from deebot_client.events import FanSpeedEvent, FanSpeedLevel from deebot_client.message import HandlingResult -from .common import XmlCommandWithMessageHandling +from .common import XmlGetCommand, XmlSetCommand if TYPE_CHECKING: from xml.etree.ElementTree import Element @@ -15,11 +17,22 @@ from deebot_client.event_bus import EventBus -class GetFanSpeed(XmlCommandWithMessageHandling): - """GetFanSpeed command.""" +class GetCleanSpeed(XmlGetCommand): + """GetCleanSpeed command.""" NAME = "GetCleanSpeed" + @classmethod + def handle_set_args( + cls, event_bus: EventBus, args: dict[str, Any] + ) -> HandlingResult: + """Handle message->body->data and notify the correct event subscribers. + + :return: A message response + """ + event_bus.notify(FanSpeedEvent(FanSpeedLevel.from_xml(str(args["speed"])))) + return HandlingResult.success() + @classmethod def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: """Handle xml message and notify the correct event subscribers. @@ -29,16 +42,17 @@ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: if xml.attrib.get("ret") != "ok" or not (speed := xml.attrib.get("speed")): return HandlingResult.analyse() - event: FanSpeedEvent | None = None + event_bus.notify(FanSpeedEvent(FanSpeedLevel.from_xml(speed))) + return HandlingResult.success() + - match speed.lower(): - case "standard": - event = FanSpeedEvent(FanSpeedLevel.NORMAL) - case "strong": - event = FanSpeedEvent(FanSpeedLevel.MAX) +class SetCleanSpeed(XmlSetCommand): + """SetCleanSpeed command.""" - if event: - event_bus.notify(event) - return HandlingResult.success() + NAME = "SetCleanSpeed" + get_command = GetCleanSpeed + _mqtt_params = MappingProxyType({"speed": InitParam(FanSpeedLevel)}) - return HandlingResult.analyse() + def __init__(self, speed: FanSpeedLevel | str) -> None: + speed_level = speed.xml_value if isinstance(speed, FanSpeedLevel) else speed + super().__init__({"speed": speed_level}) diff --git a/deebot_client/commands/xml/map.py b/deebot_client/commands/xml/map.py new file mode 100644 index 000000000..7f2fdc57a --- /dev/null +++ b/deebot_client/commands/xml/map.py @@ -0,0 +1,281 @@ +"""Map commands.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.command import Command, CommandResult +from deebot_client.const import DataType +from deebot_client.events import MajorMapEvent, MapSetEvent, MapSetType, MinorMapEvent +from deebot_client.events.map import CachedMapInfoEvent, MapSubsetEvent +from deebot_client.logging_filter import get_logger +from deebot_client.message import HandlingResult, HandlingState + +from .common import XmlCommandWithMessageHandling + +if TYPE_CHECKING: + from typing import Any + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + +_LOGGER = get_logger(__name__) + + +class GetMapSt(XmlCommandWithMessageHandling): + """GetMapSt command. + + This command checks whether the current map has been built successfully or not. + """ + + NAME = "GetMapSt" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + Sample message response: + "" + + :return: A message response + """ + if xml.attrib.get("ret") != "ok" or not (st := xml.attrib.get("st")): + return HandlingResult.analyse() + + built = st == "built" + event_bus.notify(CachedMapInfoEvent(name="", active=built)) + return HandlingResult.success() + + +class GetMapSet(XmlCommandWithMessageHandling): + """GetMapSet command. + + This commands gets the list of logical pieces each map is composed of. + XML robots do not support multiple maps, so the mid parameter is ignored. + """ + + _ARGS_MSID = "msid" + _ARGS_TYPE = "type" + _ARGS_SUBSETS = "subsets" + + NAME = "GetMapSet" + + def __init__( + self, + # pylint: disable=unused-argument + mid: str, # noqa: ARG002 + # pylint: disable=redefined-builtin + type: (MapSetType | str) = MapSetType.VIRTUAL_WALLS, + ) -> None: + if isinstance(type, MapSetType): + type = type.value + + super().__init__({"tp": type}) + + @classmethod + def _find_subsets(cls, maps: list[Element]) -> list[int]: + return [int(mid) for map in maps if (mid := map.attrib.get("mid")) is not None] + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + Sample message response: + b'' + + :return: A message response + """ + if ( + xml.attrib.get("ret") != "ok" + or not (msid := xml.attrib.get("msid")) + or not (area_type := xml.attrib.get("tp")) + or not (m := xml.findall("m")) + ): + return HandlingResult.analyse() + subsets = cls._find_subsets(m) + event_bus.notify(MapSetEvent(MapSetType(area_type), subsets=subsets)) + args = { + cls._ARGS_MSID: msid, + cls._ARGS_TYPE: area_type, + cls._ARGS_SUBSETS: subsets, + } + return HandlingResult(HandlingState.SUCCESS, args) + + def _handle_response( + self, event_bus: EventBus, response: dict[str, Any] + ) -> CommandResult: + """Handle response from a command. + + :return: A message response + """ + result = super()._handle_response(event_bus, response) + if result.state == HandlingState.SUCCESS and result.args: + commands: list[Command] = [ + PullM( + mid=subset, + msid=result.args[self._ARGS_MSID], + type=result.args[self._ARGS_TYPE], + ) + for subset in result.args[self._ARGS_SUBSETS] + ] + return CommandResult(result.state, result.args, commands) + + return result + + +class GetMapM(XmlCommandWithMessageHandling): + """GetMapM command.""" + + NAME = "GetMapM" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + Sample message response: + b'' + + :return: A message response + """ + if not (map_hashes := xml.attrib.get("m")) or not (idx := xml.attrib.get("i")): + return HandlingResult.analyse() + event_bus.notify( + MajorMapEvent( + idx, + values=[int(map_hash.strip()) for map_hash in map_hashes.split(",")], + requested=True, + type=DataType.XML, + ) + ) + return HandlingResult.success() + + +class PullM(XmlCommandWithMessageHandling): + """PullM command. + + Pulls map subset coordinates + """ + + _ARG_COORDS = "coordinates" + + NAME = "PullM" + + def __init__( + self, + *, + mid: str | int, + msid: str | int, + # pylint: disable=redefined-builtin + type: (MapSetType | str) = MapSetType.ROOMS, + ) -> None: + if isinstance(type, MapSetType): + type = type.value + + self._map_type = type + self._map_subset_id = int(mid) + + super().__init__( + { + "mid": str(mid), + "msid": str(msid), + "tp": type, + "seq": "0", + }, + ) + + @classmethod + def _handle_xml(cls, _event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + Sample message responses: + + + + :return: A message response + """ + if xml.attrib.get("ret") != "ok" or not (coords := xml.attrib.get("m")): + return HandlingResult.analyse() + + args = {cls._ARG_COORDS: coords} + return HandlingResult(HandlingState.SUCCESS, args) + + def _handle_response( + self, event_bus: EventBus, response: dict[str, Any] + ) -> CommandResult: + """Handle response from a command. + + :return: A message response + """ + result = super()._handle_response(event_bus, response) + if result.state == HandlingState.SUCCESS and result.args: + coords = result.args[self._ARG_COORDS] + event_bus.notify( + MapSubsetEvent( + id=self._map_subset_id, + type=MapSetType(self._map_type), + coordinates=coords, + ) + ) + return CommandResult(result.state, result.args) + + return result + + +class PullMP(XmlCommandWithMessageHandling): + """PullMP command.""" + + _ARG_PIECE = "piece" + + NAME = "PullMP" + + def __init__(self, *, piece_index: int) -> None: + self._piece_index = piece_index + super().__init__({"pid": str(piece_index)}) + + @classmethod + def _handle_xml(cls, _event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + Sample message response: + b{'ret': 'ok', 'i': '1839263381', 'p': 'x_q_a_a_b_a_a_q_jw_a_a_a_a_bv/f//o7f/_rz5_i_f_x_i5_y_v_g4kijmo4_y_h+e7k_ho_l_t_l8_u6_p_a_f_ls_x7_jhrz0_kg_a=', 'event': 'pull_m_p'} + + :return: A message response + """ + if xml.attrib.get("ret") != "ok" or not (piece := xml.attrib.get("p")): + return HandlingResult.analyse() + args = {cls._ARG_PIECE: piece} + return HandlingResult(HandlingState.SUCCESS, args) + + def _handle_response( + self, event_bus: EventBus, response: dict[str, Any] + ) -> CommandResult: + """Handle response from a command. + + :return: A message response + """ + result = super()._handle_response(event_bus, response) + if result.state == HandlingState.SUCCESS and result.args: + piece = result.args[self._ARG_PIECE] + event_bus.notify(MinorMapEvent(index=self._piece_index, value=piece)) + return CommandResult(result.state, result.args) + + return result + + +class GetTrM(XmlCommandWithMessageHandling): + """GetTrM command. + + Enables trace reporting from the bot. + """ + + NAME = "GetTrM" + + @classmethod + def _handle_xml(cls, _event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if xml.attrib.get("ret") != "ok": + return HandlingResult.analyse() + return HandlingResult(HandlingState.SUCCESS) diff --git a/deebot_client/commands/xml/water_info.py b/deebot_client/commands/xml/water_info.py new file mode 100644 index 000000000..e5dd251a4 --- /dev/null +++ b/deebot_client/commands/xml/water_info.py @@ -0,0 +1,96 @@ +"""WaterBox command module.""" + +from __future__ import annotations + +from types import MappingProxyType +from typing import TYPE_CHECKING, Any + +from deebot_client.command import InitParam +from deebot_client.events import ( + WaterAmount, + WaterInfoEvent, +) +from deebot_client.message import HandlingResult +from deebot_client.util import get_enum + +from .common import XmlGetCommand, XmlSetCommand + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class GetWaterPermeability(XmlGetCommand): + """GetWaterPermeability command.""" + + NAME = "GetWaterPermeability" + + @classmethod + def handle_set_args( + cls, event_bus: EventBus, args: dict[str, Any] + ) -> HandlingResult: + """Handle message->body->data and notify the correct event subscribers. + + :return: A message response + """ + event_bus.notify(WaterInfoEvent(amount=WaterAmount(int(args["v"])))) + return HandlingResult.success() + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if xml.attrib.get("ret") != "ok" or not (value := xml.attrib.get("v")): + return HandlingResult.analyse() + + event_bus.notify(WaterInfoEvent(amount=WaterAmount(int(value)))) + return HandlingResult.success() + + +class SetWaterPermeability(XmlSetCommand): + """SetWaterPermeability command.""" + + NAME = "SetWaterPermeability" + get_command = GetWaterPermeability + _mqtt_params = MappingProxyType( + { + "amount": InitParam(WaterAmount), + } + ) + + def __init__(self, amount: WaterAmount | str) -> None: + if isinstance(amount, str): + amount = get_enum(WaterAmount, amount) + super().__init__({"v": str(amount.value)}) + + +class GetWaterBoxInfo(XmlGetCommand): + """GetWaterBoxInfo command.""" + + NAME = "GetWaterBoxInfo" + + @classmethod + def handle_set_args( + cls, event_bus: EventBus, args: dict[str, Any] + ) -> HandlingResult: + """Handle message->body->data and notify the correct event subscribers. + + :return: A message response + """ + event_bus.notify(WaterInfoEvent(mop_attached=(str(args["on"]) != "0"))) + return HandlingResult.success() + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if xml.attrib.get("ret") != "ok" or not (on := xml.attrib.get("on")): + return HandlingResult.analyse() + + event_bus.notify(WaterInfoEvent(mop_attached=on != "0")) + return HandlingResult.success() diff --git a/deebot_client/device.py b/deebot_client/device.py index 0b2838e7c..9cc2f12d8 100644 --- a/deebot_client/device.py +++ b/deebot_client/device.py @@ -14,6 +14,7 @@ from deebot_client.util import cancel from .command import Command +from .const import DataType from .event_bus import EventBus from .events import ( AvailabilityEvent, @@ -38,7 +39,6 @@ _LOGGER = get_logger(__name__) _AVAILABLE_CHECK_INTERVAL = 60 - DeviceCommandExecute = Callable[[Command], Coroutine[Any, Any, dict[str, Any]]] @@ -200,15 +200,31 @@ def _handle_message( try: _LOGGER.debug("Try to handle message %s: %s", message_name, message_data) - if message := get_message(message_name, self._device_info.static.data_type): + message_data_type = self._device_info.static.data_type + if message := get_message(message_name, message_data_type): + data: dict[str, Any] | str if isinstance(message_data, dict): data = message_data - else: + elif message_data_type == DataType.JSON: data = json.loads(message_data) + elif isinstance(message_data, bytes): + data = message_data.decode() + elif isinstance(message_data, bytearray): + data = bytes(message_data).decode() + elif isinstance(message_data, str): + data = message_data + else: + msg = "Unsupported message data type {message_name}: {message_type}" + raise TypeError( + msg.format( + message_name=message_name, message_type=type(message_data) + ) + ) - fw_version = data.get("header", {}).get("fwVer", None) - if fw_version: - self.fw_version = fw_version + if isinstance(data, dict): + fw_version = data.get("header", {}).get("fwVer", None) + if fw_version: + self.fw_version = fw_version message.handle(self.events, data) except Exception: # pylint: disable=broad-except diff --git a/deebot_client/events/map.py b/deebot_client/events/map.py index b4d4526d7..ca2ad52c1 100644 --- a/deebot_client/events/map.py +++ b/deebot_client/events/map.py @@ -6,6 +6,7 @@ from enum import Enum, unique from typing import TYPE_CHECKING, Any +from deebot_client.const import DataType from deebot_client.events import Event if TYPE_CHECKING: @@ -47,6 +48,7 @@ class MajorMapEvent(Event): map_id: str values: list[int] requested: bool = field(kw_only=True) + type: DataType = DataType.JSON @dataclass(frozen=True) diff --git a/deebot_client/events/water_info.py b/deebot_client/events/water_info.py index 1f543f5bf..b4b67814c 100644 --- a/deebot_client/events/water_info.py +++ b/deebot_client/events/water_info.py @@ -30,7 +30,7 @@ class SweepType(IntEnum): class WaterInfoEvent(Event): """Water info event representation.""" - amount: WaterAmount # None means no data available + amount: WaterAmount | None = None sweep_type: SweepType | None = None mop_attached: bool | None = field(kw_only=True, default=None) diff --git a/deebot_client/hardware/deebot/2pv572.py b/deebot_client/hardware/deebot/2pv572.py new file mode 100644 index 000000000..d5c126267 --- /dev/null +++ b/deebot_client/hardware/deebot/2pv572.py @@ -0,0 +1,134 @@ +"""2pv572 Capabilities.""" + +from __future__ import annotations + +from deebot_client.capabilities import ( + Capabilities, + CapabilityClean, + CapabilityCleanAction, + CapabilityCustomCommand, + CapabilityEvent, + CapabilityExecute, + CapabilityLifeSpan, + CapabilityMap, + CapabilitySettings, + CapabilitySetTypes, + CapabilityStats, + DeviceType, +) +from deebot_client.commands.json import GetNetInfoLegacy +from deebot_client.commands.json.custom import CustomCommand +from deebot_client.commands.xml import ( + Charge, + Clean, + CleanArea, + GetBatteryInfo, + GetCleanLogs, + GetCleanSpeed, + GetCleanState, + GetLifeSpan, + GetWaterPermeability, + PlaySound, + SetCleanSpeed, +) +from deebot_client.commands.xml.charge_state import GetChargeState +from deebot_client.commands.xml.error import GetError +from deebot_client.commands.xml.map import GetMapM, GetMapSt, GetTrM +from deebot_client.commands.xml.pos import GetPos +from deebot_client.commands.xml.stats import GetCleanSum +from deebot_client.commands.xml.water_info import GetWaterBoxInfo, SetWaterPermeability +from deebot_client.const import DataType +from deebot_client.events import ( + AvailabilityEvent, + BatteryEvent, + CleanLogEvent, + CustomCommandEvent, + ErrorEvent, + FanSpeedEvent, + FanSpeedLevel, + LifeSpan, + LifeSpanEvent, + NetworkInfoEvent, + ReportStatsEvent, + RoomsEvent, + StateEvent, + StatsEvent, + TotalStatsEvent, + WaterAmount, + WaterInfoEvent, +) +from deebot_client.events.map import ( + CachedMapInfoEvent, + MajorMapEvent, + MapChangedEvent, + MapTraceEvent, + PositionsEvent, +) +from deebot_client.models import StaticDeviceInfo +from deebot_client.util import short_name + +from . import DEVICES + +DEVICES[short_name(__name__)] = StaticDeviceInfo( + DataType.XML, + Capabilities( + availability=CapabilityEvent(AvailabilityEvent, []), + battery=CapabilityEvent(BatteryEvent, [GetBatteryInfo()]), + charge=CapabilityExecute(Charge), + clean=CapabilityClean( + action=CapabilityCleanAction(command=Clean, area=CleanArea), + log=CapabilityEvent(CleanLogEvent, [GetCleanLogs()]), + ), + custom=CapabilityCustomCommand( + event=CustomCommandEvent, get=[], set=CustomCommand + ), + device_type=DeviceType.VACUUM, + error=CapabilityEvent(ErrorEvent, [GetError()]), + fan_speed=CapabilitySetTypes( + event=FanSpeedEvent, + get=[GetCleanSpeed()], + set=SetCleanSpeed, + types=( + FanSpeedLevel.NORMAL, + FanSpeedLevel.MAX, + ), + ), + life_span=CapabilityLifeSpan( + types=(LifeSpan.BRUSH, LifeSpan.SIDE_BRUSH, LifeSpan.DUST_CASE_HEAP), + event=LifeSpanEvent, + get=[ + GetLifeSpan(LifeSpan.BRUSH), + GetLifeSpan(LifeSpan.SIDE_BRUSH), + GetLifeSpan(LifeSpan.DUST_CASE_HEAP), + ], + reset=CustomCommand, + ), + map=CapabilityMap( + cached_info=CapabilityEvent(CachedMapInfoEvent, [GetMapSt()]), + changed=CapabilityEvent(MapChangedEvent, []), + major=CapabilityEvent(MajorMapEvent, [GetMapM()]), + position=CapabilityEvent(PositionsEvent, [GetPos()]), + rooms=CapabilityEvent(RoomsEvent, [GetMapSt()]), + trace=CapabilityEvent(MapTraceEvent, [GetTrM()]), + ), + network=CapabilityEvent(NetworkInfoEvent, [GetNetInfoLegacy()]), + play_sound=CapabilityExecute(PlaySound), + state=CapabilityEvent(StateEvent, [GetChargeState(), GetCleanState()]), + stats=CapabilityStats( + clean=CapabilityEvent(StatsEvent, []), + report=CapabilityEvent(ReportStatsEvent, []), + total=CapabilityEvent(TotalStatsEvent, [GetCleanSum()]), + ), + settings=CapabilitySettings(), + water=CapabilitySetTypes( + event=WaterInfoEvent, + get=[GetWaterPermeability(), GetWaterBoxInfo()], + set=SetWaterPermeability, + types=( + WaterAmount.LOW, + WaterAmount.MEDIUM, + WaterAmount.HIGH, + ), + ), + ), +) diff --git a/deebot_client/map.py b/deebot_client/map.py index c6afe8348..9065c37a2 100644 --- a/deebot_client/map.py +++ b/deebot_client/map.py @@ -6,9 +6,9 @@ from datetime import UTC, datetime from typing import TYPE_CHECKING, Final +from deebot_client.const import DataType from deebot_client.events.map import CachedMapInfoEvent, MapChangedEvent -from .commands.json import GetMinorMap from .events import ( MajorMapEvent, MapSetEvent, @@ -31,6 +31,8 @@ if TYPE_CHECKING: from collections.abc import Callable + from deebot_client.command import Command + from .device import DeviceCommandExecute from .event_bus import EventBus @@ -102,11 +104,16 @@ async def on_major_map(event: MajorMapEvent) -> None: self._map_data.map_piece_crc32_indicates_update(idx, value) and event.requested ): - tg.create_task( - self._execute_command( - GetMinorMap(map_id=event.map_id, piece_index=idx) - ) - ) + command: Command + if event.type == DataType.JSON: + from deebot_client.commands.json.map import GetMinorMap + + command = GetMinorMap(map_id=event.map_id, piece_index=idx) + else: + from deebot_client.commands.xml.map import PullMP + + command = PullMP(piece_index=idx) + tg.create_task(self._execute_command(command)) unsubscribers.append(self._event_bus.subscribe(MajorMapEvent, on_major_map)) diff --git a/deebot_client/messages/xml/__init__.py b/deebot_client/messages/xml/__init__.py index 21d5370a9..5364ec21a 100644 --- a/deebot_client/messages/xml/__init__.py +++ b/deebot_client/messages/xml/__init__.py @@ -5,17 +5,48 @@ from typing import TYPE_CHECKING from deebot_client.messages.xml.battery import BatteryInfo +from deebot_client.messages.xml.charge import ChargeState +from deebot_client.messages.xml.clean import ( + CleanedPos, + CleanReport, + CleanReportServer, + CleanSt, +) +from deebot_client.messages.xml.map import MapP, Trace +from deebot_client.messages.xml.pos import Pos +from deebot_client.messages.xml.sleep import SleepStatus +from deebot_client.messages.xml.water_info import WaterBoxInfo if TYPE_CHECKING: - from collections.abc import Sequence - from deebot_client.message import Message -__all__: Sequence[str] = ["BatteryInfo"] +__all__ = [ + "BatteryInfo", + "ChargeState", + "CleanReport", + "CleanReportServer", + "CleanSt", + "CleanedPos", + "MapP", + "Pos", + "SleepStatus", + "Trace", + "WaterBoxInfo", +] # fmt: off # ordered by file asc _MESSAGES: list[type[Message]] = [ - BatteryInfo + BatteryInfo, + ChargeState, + CleanReport, + CleanReportServer, + CleanSt, + CleanedPos, + MapP, + Pos, + SleepStatus, + Trace, + WaterBoxInfo, ] # fmt: on diff --git a/deebot_client/messages/xml/charge.py b/deebot_client/messages/xml/charge.py new file mode 100644 index 000000000..e6fff7ea6 --- /dev/null +++ b/deebot_client/messages/xml/charge.py @@ -0,0 +1,47 @@ +"""Charge messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events import StateEvent +from deebot_client.message import HandlingResult +from deebot_client.messages.xml.common import XmlMessage +from deebot_client.models import State + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class ChargeState(XmlMessage): + """ChargeState message.""" + + NAME = "ChargeState" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + status: State | None = None + + if (charge := xml.find("charge")) is not None: + type_ = charge.attrib["type"].lower() + match type_: + case "slotcharging" | "slot_charging" | "wirecharging": + status = State.DOCKED + case "idle": + pass + case "going": + status = State.RETURNING + case _: + status = State.ERROR + + if status: + event_bus.notify(StateEvent(status)) + return HandlingResult.success() + + return HandlingResult.analyse() diff --git a/deebot_client/messages/xml/clean.py b/deebot_client/messages/xml/clean.py new file mode 100644 index 000000000..631608a17 --- /dev/null +++ b/deebot_client/messages/xml/clean.py @@ -0,0 +1,158 @@ +"""Clean messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events import ( + CleanJobStatus, + FanSpeedEvent, + FanSpeedLevel, + Position, + PositionsEvent, + ReportStatsEvent, + StateEvent, + StatsEvent, +) +from deebot_client.logging_filter import get_logger +from deebot_client.message import HandlingResult +from deebot_client.messages.xml.common import XmlMessage +from deebot_client.models import CleanAction, State +from deebot_client.rs.map import PositionType + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + +_LOGGER = get_logger(__name__) + + +class CleanSt(XmlMessage): + """CleanSt message.""" + + NAME = "CleanSt" + + @classmethod + def _handle_xml(cls, _event_bus: EventBus, _xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + b"" + + We currently ignore this message as we prefer to use CleanReport + :return: A message response + """ + return HandlingResult.success() + + +class CleanReport(XmlMessage): + """CleanReport message.""" + + NAME = "CleanReport" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + b"" + + :return: A message response + """ + if (clean := xml.find("clean")) is None: + return HandlingResult.analyse() + + speed_attrib = clean.attrib.get("speed") + if speed_attrib is not None: + fan_speed_level = FanSpeedLevel.from_xml(speed_attrib) + event_bus.notify(FanSpeedEvent(fan_speed_level)) + + clean_attrib = clean.attrib.get("st") + if clean_attrib is not None: + clean_action = CleanAction.from_xml(clean_attrib) + if clean_action == CleanAction.START: + event_bus.notify(StateEvent(State.CLEANING)) + elif clean_action == CleanAction.PAUSE: + event_bus.notify(StateEvent(State.PAUSED)) + else: + _LOGGER.debug("Ignored CleanState %s", clean_action) + + return HandlingResult.success() + + +class CleanReportServer(XmlMessage): + """CleanReportServer message.""" + + NAME = "CleanReportServer" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + b"" + b"" + + :return: A message response + """ + event_reported = False + if ( + (act := xml.attrib.get("act")) is not None + and (last := xml.attrib.get("last")) is not None + and (area := xml.attrib.get("area")) is not None + ): + clean_session = xml.attrib.get("cs") + clean_type = xml.attrib.get("type") + + clean_action = CleanAction.from_xml(act) + if clean_action == CleanAction.STOP: + event_bus.notify( + StatsEvent(area=int(area), time=int(last), type=clean_type) + ) + event_reported = True + if clean_session: + if clean_action == CleanAction.STOP: + job_status = CleanJobStatus.FINISHED + elif clean_action == CleanAction.START: + job_status = CleanJobStatus.CLEANING + elif clean_action == CleanAction.PAUSE: + job_status = CleanJobStatus.MANUALLY_STOPPED + else: + job_status = CleanJobStatus.NO_STATUS + event_bus.notify( + ReportStatsEvent( + area=int(area), + time=int(last), + type=clean_type, + cleaning_id=clean_session, + status=job_status, + content=[], + ) + ) + event_reported = True + if event_reported: + return HandlingResult.success() + return HandlingResult.analyse() + + +class CleanedPos(XmlMessage): + """CleanedPos message.""" + + NAME = "CleanedPos" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + b"" + + :return: A message response + """ + if p := xml.attrib.get("p"): + p_x, p_y = p.split(",", 2) + p_a = xml.attrib.get("a", 0) + position = Position( + type=PositionType.DEEBOT, x=int(p_x), y=int(p_y), a=int(p_a) + ) + event_bus.notify(PositionsEvent(positions=[position])) + return HandlingResult.success() + + return HandlingResult.analyse() diff --git a/deebot_client/messages/xml/common.py b/deebot_client/messages/xml/common.py index 05e7399bc..5c9d4aca1 100644 --- a/deebot_client/messages/xml/common.py +++ b/deebot_client/messages/xml/common.py @@ -5,7 +5,7 @@ from abc import ABC, abstractmethod from typing import TYPE_CHECKING -from defusedxml import ElementTree # type: ignore[import-untyped] +from defusedxml import ElementTree as ET # type: ignore[import-untyped] from deebot_client.message import MessageStr @@ -25,7 +25,7 @@ def _handle_str(cls, event_bus: EventBus, message: str) -> HandlingResult: :return: A message response """ - xml = ElementTree.fromstring(message) + xml = ET.fromstring(message) return cls._handle_xml(event_bus, xml) @classmethod diff --git a/deebot_client/messages/xml/map.py b/deebot_client/messages/xml/map.py new file mode 100644 index 000000000..df41f3f1b --- /dev/null +++ b/deebot_client/messages/xml/map.py @@ -0,0 +1,60 @@ +"""Map messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events.map import MapTraceEvent, MinorMapEvent +from deebot_client.message import HandlingResult +from deebot_client.messages.xml.common import XmlMessage + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class MapP(XmlMessage): + """MapP message. + + Probably a map piece + """ + + NAME = "MapP" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + b"" + + :return: A message response + """ + if (pid := xml.attrib.get("pid")) and (piece := xml.attrib.get("p")): + event_bus.notify(MinorMapEvent(index=int(pid), value=piece)) + return HandlingResult.success() + return HandlingResult.analyse() + + +class Trace(XmlMessage): + """Trace message.""" + + NAME = "trace" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + Sample message: + + + :return: A message response + """ + if ( + (tf := xml.attrib.get("tf")) + and (tt := xml.attrib.get("tt")) + and (tr := xml.attrib.get("tr")) + ): + event_bus.notify(MapTraceEvent(start=int(tf), total=int(tt), data=tr)) + return HandlingResult.success() + return HandlingResult.analyse() diff --git a/deebot_client/messages/xml/pos.py b/deebot_client/messages/xml/pos.py new file mode 100644 index 000000000..703495337 --- /dev/null +++ b/deebot_client/messages/xml/pos.py @@ -0,0 +1,34 @@ +"""Pos messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events import Position, PositionsEvent +from deebot_client.message import HandlingResult +from deebot_client.messages.xml.common import XmlMessage +from deebot_client.rs.map import PositionType + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class Pos(XmlMessage): + """Pos message.""" + + NAME = "Pos" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + if p := xml.attrib.get("p"): + p_x, p_y = p.split(",", 2) + p_a = xml.attrib.get("a", 0) + position = Position( + type=PositionType.DEEBOT, x=int(p_x), y=int(p_y), a=int(p_a) + ) + event_bus.notify(PositionsEvent(positions=[position])) + return HandlingResult.success() + + return HandlingResult.analyse() diff --git a/deebot_client/messages/xml/sleep.py b/deebot_client/messages/xml/sleep.py new file mode 100644 index 000000000..d31ab0b28 --- /dev/null +++ b/deebot_client/messages/xml/sleep.py @@ -0,0 +1,31 @@ +"""Sleep messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.message import HandlingResult +from deebot_client.messages.xml.common import XmlMessage + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class SleepStatus(XmlMessage): + """SleepStatus message.""" + + NAME = "SleepStatus" + + @classmethod + def _handle_xml(cls, _event_bus: EventBus, _xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + b"" + + We currently ignore this message + + :return: A message response + """ + return HandlingResult.success() diff --git a/deebot_client/messages/xml/water_info.py b/deebot_client/messages/xml/water_info.py new file mode 100644 index 000000000..701c18bdf --- /dev/null +++ b/deebot_client/messages/xml/water_info.py @@ -0,0 +1,32 @@ +"""Water messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events import WaterInfoEvent +from deebot_client.message import HandlingResult +from deebot_client.messages.xml.common import XmlMessage + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class WaterBoxInfo(XmlMessage): + """WaterBoxInfo message.""" + + NAME = "WaterBoxInfo" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if (on := xml.attrib.get("on")) is None: + return HandlingResult.analyse() + + event_bus.notify(WaterInfoEvent(mop_attached=on != "0")) + return HandlingResult.success() diff --git a/tests/commands/xml/test_charge_state.py b/tests/commands/xml/test_charge_state.py index 926ac3ae5..2a3c6e9e9 100644 --- a/tests/commands/xml/test_charge_state.py +++ b/tests/commands/xml/test_charge_state.py @@ -21,11 +21,10 @@ ("state", "expected_event"), [ ("SlotCharging", StateEvent(State.DOCKED)), - ("Idle", StateEvent(State.IDLE)), ("Going", StateEvent(State.RETURNING)), ("unknown state returned", StateEvent(State.ERROR)), ], - ids=["slot_charging", "idle", "going", "unknown"], + ids=["slot_charging", "going", "unknown"], ) async def test_get_charge_state(state: str, expected_event: Event) -> None: json = get_request_xml(f"") @@ -34,8 +33,12 @@ async def test_get_charge_state(state: str, expected_event: Event) -> None: @pytest.mark.parametrize( "xml", - ["", ""], - ids=["error", "no_state"], + [ + "", + "", + "", + ], + ids=["error", "no_state", "idle"], ) async def test_get_charge_state_error(xml: str) -> None: json = get_request_xml(xml) diff --git a/tests/commands/xml/test_clean.py b/tests/commands/xml/test_clean.py new file mode 100644 index 000000000..14daae18c --- /dev/null +++ b/tests/commands/xml/test_clean.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import pytest + +from deebot_client.command import CommandResult +from deebot_client.commands.xml import GetCleanState +from deebot_client.commands.xml.clean import CleanArea +from deebot_client.events import FanSpeedEvent, FanSpeedLevel, StateEvent +from deebot_client.message import HandlingState +from deebot_client.models import CleanMode, State +from tests.commands import assert_command + +from . import get_request_xml + + +@pytest.mark.parametrize( + ("command", "command_result"), + [ + (CleanArea(CleanMode.SPOT_AREA, "4", 1), HandlingState.SUCCESS), + ], +) +async def test_CleanArea(command: CleanArea, command_result: HandlingState) -> None: + json = get_request_xml("") + await assert_command( + command, json, None, command_result=CommandResult(command_result) + ) + + +@pytest.mark.parametrize( + ("speed", "state", "expected_fan_speed_event", "expected_state_event"), + [ + ( + "standard", + "s", + FanSpeedEvent(FanSpeedLevel.NORMAL), + StateEvent(State.CLEANING), + ), + ( + "strong", + "s", + FanSpeedEvent(FanSpeedLevel.MAX), + StateEvent(State.CLEANING), + ), + ( + "standard", + "p", + FanSpeedEvent(FanSpeedLevel.NORMAL), + StateEvent(State.PAUSED), + ), + ], + ids=["standard_cleaning", "strong_cleaning", "paused"], +) +async def test_get_clean_state( + speed: str, + state: str, + expected_fan_speed_event: FanSpeedEvent, + expected_state_event: StateEvent, +) -> None: + json = get_request_xml( + f"" + ) + await assert_command( + GetCleanState(), + json, + [x for x in [expected_fan_speed_event, expected_state_event] if x is not None], + ) + + +@pytest.mark.parametrize( + "xml", + ["", ""], + ids=["error", "no_state"], +) +async def test_get_clean_state_error(xml: str) -> None: + json = get_request_xml(xml) + await assert_command( + GetCleanState(), + json, + None, + command_result=CommandResult(HandlingState.ANALYSE_LOGGED), + ) diff --git a/tests/commands/xml/test_fan_speed.py b/tests/commands/xml/test_fan_speed.py index bf728ecfc..288b3c1e2 100644 --- a/tests/commands/xml/test_fan_speed.py +++ b/tests/commands/xml/test_fan_speed.py @@ -4,8 +4,8 @@ import pytest -from deebot_client.command import CommandResult -from deebot_client.commands.xml import GetFanSpeed +from deebot_client.command import CommandResult, CommandWithMessageHandling +from deebot_client.commands.xml import GetCleanSpeed, SetCleanSpeed from deebot_client.events import FanSpeedEvent, FanSpeedLevel from deebot_client.message import HandlingState from tests.commands import assert_command @@ -26,19 +26,36 @@ ) async def test_get_fan_speed(speed: str, expected_event: Event) -> None: json = get_request_xml(f"") - await assert_command(GetFanSpeed(), json, expected_event) + await assert_command(GetCleanSpeed(), json, expected_event) @pytest.mark.parametrize( "xml", - ["", ""], - ids=["error", "no_state"], + [""], + ids=["error"], ) async def test_get_fan_speed_error(xml: str) -> None: json = get_request_xml(xml) await assert_command( - GetFanSpeed(), + GetCleanSpeed(), json, None, command_result=CommandResult(HandlingState.ANALYSE_LOGGED), ) + + +@pytest.mark.parametrize( + ("command", "xml", "result"), + [ + ( + SetCleanSpeed(FanSpeedLevel.MAX), + "", + HandlingState.SUCCESS, + ), + ], +) +async def test_set_fan_speed( + command: CommandWithMessageHandling, xml: str, result: HandlingState +) -> None: + json = get_request_xml(xml) + await assert_command(command, json, None, command_result=CommandResult(result))