diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index c3f533863..0180cd249 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -32,7 +32,7 @@ repos:
hooks:
- id: codespell
args:
- - --ignore-words-list=deebot
+ - --ignore-words-list=deebot,MapP
- --skip="./.*,*.csv,*.json"
- --quiet-level=2
- --exclude-file=deebot_client/util/continents.py
diff --git a/deebot_client/capabilities.py b/deebot_client/capabilities.py
index d03b2fc23..2c72539f9 100644
--- a/deebot_client/capabilities.py
+++ b/deebot_client/capabilities.py
@@ -175,9 +175,9 @@ class CapabilityMap:
changed: CapabilityEvent[MapChangedEvent]
clear: CapabilityExecute[[]] | None = None
major: CapabilityEvent[MajorMapEvent]
- multi_state: CapabilitySetEnable[MultimapStateEvent]
+ multi_state: CapabilitySetEnable[MultimapStateEvent] | None = None
position: CapabilityEvent[PositionsEvent]
- relocation: CapabilityExecute[[]]
+ relocation: CapabilityExecute[[]] | None = None
rooms: CapabilityEvent[RoomsEvent]
trace: CapabilityEvent[MapTraceEvent]
@@ -213,7 +213,7 @@ class CapabilitySettings:
sweep_mode: CapabilitySetEnable[SweepModeEvent] | None = None
true_detect: CapabilitySetEnable[TrueDetectEvent] | None = None
voice_assistant: CapabilitySetEnable[VoiceAssistantStateEvent] | None = None
- volume: CapabilitySet[VolumeEvent, [int]]
+ volume: CapabilitySet[VolumeEvent, [int]] | None = None
@dataclass(frozen=True, kw_only=True)
diff --git a/deebot_client/commands/__init__.py b/deebot_client/commands/__init__.py
index 5fb527c47..fc4574353 100644
--- a/deebot_client/commands/__init__.py
+++ b/deebot_client/commands/__init__.py
@@ -11,6 +11,9 @@
COMMANDS as JSON_COMMANDS,
COMMANDS_WITH_MQTT_P2P_HANDLING as JSON_COMMANDS_WITH_MQTT_P2P_HANDLING,
)
+from .xml import (
+ COMMANDS_WITH_MQTT_P2P_HANDLING as XML_COMMANDS_WITH_MQTT_P2P_HANDLING,
+)
if TYPE_CHECKING:
from deebot_client.command import Command, CommandMqttP2P
@@ -18,7 +21,8 @@
COMMANDS: dict[DataType, dict[str, type[Command]]] = {DataType.JSON: JSON_COMMANDS}
COMMANDS_WITH_MQTT_P2P_HANDLING: dict[DataType, dict[str, type[CommandMqttP2P]]] = {
- DataType.JSON: JSON_COMMANDS_WITH_MQTT_P2P_HANDLING
+ DataType.JSON: JSON_COMMANDS_WITH_MQTT_P2P_HANDLING,
+ DataType.XML: XML_COMMANDS_WITH_MQTT_P2P_HANDLING,
}
diff --git a/deebot_client/commands/json/common.py b/deebot_client/commands/json/common.py
index 623a608b5..ed69eb5f6 100644
--- a/deebot_client/commands/json/common.py
+++ b/deebot_client/commands/json/common.py
@@ -23,6 +23,7 @@
HandlingState,
MessageBody,
MessageBodyDataDict,
+ MessageDict,
)
from deebot_client.util import verify_required_class_variables_exists
@@ -62,6 +63,12 @@ class JsonCommandWithMessageHandling(
"""Command, which handle response by itself."""
+class JsonCommandWithRawMessageHandling(
+ JsonCommand, CommandWithMessageHandling, MessageDict, ABC
+):
+ """Command, which handle raw response by itself."""
+
+
class ExecuteCommand(JsonCommandWithMessageHandling, ABC):
"""Command, which is executing something (ex. Charge)."""
diff --git a/deebot_client/commands/xml/__init__.py b/deebot_client/commands/xml/__init__.py
index 3ac6e073d..18ee94445 100644
--- a/deebot_client/commands/xml/__init__.py
+++ b/deebot_client/commands/xml/__init__.py
@@ -9,35 +9,71 @@
from .battery import GetBatteryInfo
from .charge import Charge
from .charge_state import GetChargeState
+from .clean import Clean, CleanArea, GetCleanState
+from .clean_logs import GetCleanLogs
from .error import GetError
-from .fan_speed import GetFanSpeed
+from .fan_speed import GetCleanSpeed, SetCleanSpeed
from .life_span import GetLifeSpan
+from .map import GetMapM, GetMapSet, GetMapSt, GetTrM, PullM, PullMP
from .play_sound import PlaySound
from .pos import GetPos
from .stats import GetCleanSum
+from .water_info import GetWaterBoxInfo, GetWaterPermeability
if TYPE_CHECKING:
from .common import XmlCommand
__all__ = [
"Charge",
+ "Clean",
+ "CleanArea",
"GetBatteryInfo",
"GetChargeState",
+ "GetCleanLogs",
+ "GetCleanSpeed",
+ "GetCleanState",
"GetCleanSum",
"GetError",
- "GetFanSpeed",
"GetLifeSpan",
+ "GetMapM",
+ "GetMapSet",
+ "GetMapSt",
"GetPos",
+ "GetTrM",
+ "GetWaterBoxInfo",
+ "GetWaterPermeability",
"PlaySound",
+ "PullM",
+ "PullMP",
+ "SetCleanSpeed",
]
# fmt: off
# ordered by file asc
_COMMANDS: list[type[XmlCommand]] = [
+ Charge,
+ Clean,
+ CleanArea,
+ GetError,
GetBatteryInfo,
+ GetChargeState,
+ GetCleanLogs,
+ GetCleanSpeed,
+ GetCleanState,
+ GetCleanSum,
GetError,
GetLifeSpan,
+ GetMapM,
+ GetMapSet,
+ GetMapSt,
+ GetPos,
+ GetTrM,
+ GetWaterBoxInfo,
+ GetWaterPermeability,
PlaySound,
+ PullM,
+ PullMP,
+ SetCleanSpeed,
]
# fmt: on
diff --git a/deebot_client/commands/xml/charge_state.py b/deebot_client/commands/xml/charge_state.py
index 78b7110a2..6fff4d568 100644
--- a/deebot_client/commands/xml/charge_state.py
+++ b/deebot_client/commands/xml/charge_state.py
@@ -38,7 +38,7 @@ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
case "slotcharging" | "slot_charging" | "wirecharging":
status = State.DOCKED
case "idle":
- status = State.IDLE
+ pass
case "going":
status = State.RETURNING
case _:
diff --git a/deebot_client/commands/xml/clean.py b/deebot_client/commands/xml/clean.py
new file mode 100644
index 000000000..b04690992
--- /dev/null
+++ b/deebot_client/commands/xml/clean.py
@@ -0,0 +1,97 @@
+"""Clean commands."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from deebot_client.events import FanSpeedEvent, FanSpeedLevel, StateEvent
+from deebot_client.logging_filter import get_logger
+from deebot_client.message import HandlingResult
+from deebot_client.models import CleanAction, CleanMode, State
+
+from .common import ExecuteCommand, XmlCommandWithMessageHandling
+
+if TYPE_CHECKING:
+ from xml.etree.ElementTree import Element
+
+ from deebot_client.event_bus import EventBus
+
+_LOGGER = get_logger(__name__)
+
+
+class Clean(ExecuteCommand):
+ """Generic start/pause/stop cleaning command."""
+
+ NAME = "Clean"
+ HAS_SUB_ELEMENT = True
+
+ def __init__(
+ self, action: CleanAction, speed: FanSpeedLevel = FanSpeedLevel.NORMAL
+ ) -> None:
+ #
+
+ super().__init__(
+ {
+ "type": CleanMode.AUTO.xml_value,
+ "act": action.xml_value,
+ "speed": speed.xml_value,
+ }
+ )
+
+
+class CleanArea(ExecuteCommand):
+ """Clean area command."""
+
+ NAME = "Clean"
+ HAS_SUB_ELEMENT = True
+
+ def __init__(
+ self,
+ mode: CleanMode,
+ area: str,
+ cleanings: int = 1,
+ speed: FanSpeedLevel = FanSpeedLevel.NORMAL,
+ ) -> None:
+ #
+
+ super().__init__(
+ {
+ "type": mode.xml_value,
+ "act": CleanAction.START.xml_value,
+ "speed": speed.xml_value,
+ "deep": str(cleanings),
+ "mid": area,
+ }
+ )
+
+
+class GetCleanState(XmlCommandWithMessageHandling):
+ """GetCleanState command."""
+
+ NAME = "GetCleanState"
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ :return: A message response
+ """
+ if xml.attrib.get("ret") != "ok" or (clean := xml.find("clean")) is None:
+ return HandlingResult.analyse()
+
+ speed_attrib = clean.attrib.get("speed")
+ if speed_attrib is not None:
+ fan_speed_level = FanSpeedLevel.from_xml(speed_attrib)
+ event_bus.notify(FanSpeedEvent(fan_speed_level))
+
+ clean_attrib = clean.attrib.get("st")
+ if clean_attrib is not None:
+ clean_action = CleanAction.from_xml(clean_attrib)
+ if clean_action == CleanAction.START:
+ event_bus.notify(StateEvent(State.CLEANING))
+ elif clean_action == CleanAction.PAUSE:
+ event_bus.notify(StateEvent(State.PAUSED))
+ else:
+ _LOGGER.debug("Ignored CleanState %s", clean_action)
+
+ return HandlingResult.success()
diff --git a/deebot_client/commands/xml/clean_logs.py b/deebot_client/commands/xml/clean_logs.py
new file mode 100644
index 000000000..ed6361cc3
--- /dev/null
+++ b/deebot_client/commands/xml/clean_logs.py
@@ -0,0 +1,75 @@
+"""Clean Logs commands."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from deebot_client.command import CommandResult
+from deebot_client.events import (
+ CleanLogEntry,
+ CleanLogEvent,
+)
+from deebot_client.logging_filter import get_logger
+from deebot_client.message import HandlingResult
+from deebot_client.util import get_enum
+
+from .common import XmlCommandWithMessageHandling
+from .enum import XmlStopReason
+
+if TYPE_CHECKING:
+ from xml.etree.ElementTree import Element
+
+ from deebot_client.event_bus import EventBus
+
+_LOGGER = get_logger(__name__)
+
+
+class GetCleanLogs(XmlCommandWithMessageHandling):
+ """GetCleanLogs command."""
+
+ NAME = "GetCleanLogs"
+
+ def __init__(self, count: int = 0) -> None:
+ super().__init__({"count": str(count)})
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ :return: A message response
+ """
+ if (
+ xml.attrib.get("ret") != "ok"
+ or (resp_logs := xml.findall("CleanSt")) is None
+ ):
+ return HandlingResult.analyse()
+
+ if len(resp_logs) >= 0:
+ logs: list[CleanLogEntry] = []
+ for log in resp_logs:
+ xml_stop_reason_attrib = str(log.attrib["f"])
+ stop_reason = XmlStopReason.FINISHED
+ try:
+ stop_reason = get_enum(XmlStopReason, xml_stop_reason_attrib)
+ except Exception as e:
+ _LOGGER.error(
+ "Could not decode stop reason: %s",
+ xml_stop_reason_attrib,
+ exc_info=e,
+ )
+ try:
+ logs.append(
+ CleanLogEntry(
+ timestamp=int(log.attrib["s"]),
+ image_url="", # Missing
+ type=log.attrib["t"],
+ area=int(log.attrib["a"]),
+ stop_reason=stop_reason.to_clean_job_status(), # To be extracted
+ duration=int(log.attrib["l"]),
+ )
+ )
+ except Exception: # pylint: disable=broad-except
+ _LOGGER.warning("Skipping log entry: %s", log, exc_info=True)
+ event_bus.notify(CleanLogEvent(logs))
+ return CommandResult.success()
+ return HandlingResult.analyse()
diff --git a/deebot_client/commands/xml/common.py b/deebot_client/commands/xml/common.py
index 8044c09b7..4d84bf042 100644
--- a/deebot_client/commands/xml/common.py
+++ b/deebot_client/commands/xml/common.py
@@ -3,12 +3,18 @@
from __future__ import annotations
from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, cast
+from typing import TYPE_CHECKING, Any, cast
from xml.etree.ElementTree import Element, SubElement
from defusedxml import ElementTree # type: ignore[import-untyped]
-from deebot_client.command import Command, CommandWithMessageHandling, SetCommand
+from deebot_client.command import (
+ Command,
+ CommandMqttP2P,
+ CommandWithMessageHandling,
+ GetCommand,
+ SetCommand,
+)
from deebot_client.const import DataType
from deebot_client.logging_filter import get_logger
from deebot_client.message import HandlingResult, HandlingState, MessageStr
@@ -81,8 +87,50 @@ def _handle_xml(cls, _: EventBus, xml: Element) -> HandlingResult:
return HandlingResult(HandlingState.FAILED)
-class XmlSetCommand(ExecuteCommand, SetCommand, ABC):
+class XmlCommandMqttP2P(XmlCommand, CommandMqttP2P, ABC):
+ """Json base command for mqtt p2p channel."""
+
+ @classmethod
+ def create_from_mqtt(cls, payload: str | bytes | bytearray) -> CommandMqttP2P:
+ """Create a command from the mqtt data."""
+ xml = ElementTree.fromstring(payload)
+ return cls._create_from_mqtt(xml.attrib)
+
+ def handle_mqtt_p2p(
+ self, event_bus: EventBus, response_payload: str | bytes | bytearray
+ ) -> None:
+ """Handle response received over the mqtt channel "p2p"."""
+ if isinstance(response_payload, bytearray):
+ data = bytes(response_payload).decode()
+ elif isinstance(response_payload, bytes):
+ data = response_payload.decode()
+ elif isinstance(response_payload, str):
+ data = response_payload
+ else:
+ msg = "Unsupported message data type {message_type}"
+ raise TypeError(msg.format(essage_type=type(response_payload)))
+ self._handle_mqtt_p2p(event_bus, data)
+
+ @abstractmethod
+ def _handle_mqtt_p2p(
+ self, event_bus: EventBus, response: dict[str, Any] | str
+ ) -> None:
+ """Handle response received over the mqtt channel "p2p"."""
+
+
+class XmlSetCommand(ExecuteCommand, SetCommand, XmlCommandMqttP2P, ABC):
"""Xml base set command.
Command needs to be linked to the "get" command, for handling (updating) the sensors.
"""
+
+
+class XmlGetCommand(XmlCommandWithMessageHandling, GetCommand, ABC):
+ """Xml get command."""
+
+ @classmethod
+ @abstractmethod
+ def handle_set_args(
+ cls, event_bus: EventBus, args: dict[str, Any]
+ ) -> HandlingResult:
+ """Handle arguments of set command."""
diff --git a/deebot_client/commands/xml/enum.py b/deebot_client/commands/xml/enum.py
new file mode 100644
index 000000000..f3f6fd49a
--- /dev/null
+++ b/deebot_client/commands/xml/enum.py
@@ -0,0 +1,35 @@
+"""Enums used only by XML messages."""
+
+from __future__ import annotations
+
+from enum import StrEnum
+
+from deebot_client.events import CleanJobStatus
+
+
+class XmlStopReason(StrEnum):
+ """Reasons why cleaning has been stopped."""
+
+ FINISHED = "s"
+ BATTERY_LOW = "r"
+ STOPPED_BY_APP = "a"
+ STOPPED_BY_REMOTE_CONTROL = "i"
+ STOPPED_BY_BUTTON = "b"
+ STOPPED_BY_WARNING = "w"
+ STOPPED_BY_NO_DISTURB = "f"
+ STOPPED_BY_CLEARMAP = "m"
+ STOPPED_BY_NO_PATH = "n"
+ STOPPED_BY_NOT_IN_MAP = "u"
+ STOPPED_BY_VIRTUAL_WALL = "v"
+
+ def to_clean_job_status(self) -> CleanJobStatus:
+ """Convert this value to a CleanJobStatus for compatibility proposes."""
+ if self == XmlStopReason.FINISHED:
+ return CleanJobStatus.FINISHED
+ if self in (
+ XmlStopReason.STOPPED_BY_APP,
+ XmlStopReason.STOPPED_BY_REMOTE_CONTROL,
+ XmlStopReason.STOPPED_BY_BUTTON,
+ ):
+ return CleanJobStatus.MANUALLY_STOPPED
+ return CleanJobStatus.FINISHED_WITH_WARNINGS
diff --git a/deebot_client/commands/xml/fan_speed.py b/deebot_client/commands/xml/fan_speed.py
index 2a834554c..e567d232c 100644
--- a/deebot_client/commands/xml/fan_speed.py
+++ b/deebot_client/commands/xml/fan_speed.py
@@ -2,12 +2,14 @@
from __future__ import annotations
-from typing import TYPE_CHECKING
+from types import MappingProxyType
+from typing import TYPE_CHECKING, Any
+from deebot_client.command import InitParam
from deebot_client.events import FanSpeedEvent, FanSpeedLevel
from deebot_client.message import HandlingResult
-from .common import XmlCommandWithMessageHandling
+from .common import XmlGetCommand, XmlSetCommand
if TYPE_CHECKING:
from xml.etree.ElementTree import Element
@@ -15,11 +17,22 @@
from deebot_client.event_bus import EventBus
-class GetFanSpeed(XmlCommandWithMessageHandling):
- """GetFanSpeed command."""
+class GetCleanSpeed(XmlGetCommand):
+ """GetCleanSpeed command."""
NAME = "GetCleanSpeed"
+ @classmethod
+ def handle_set_args(
+ cls, event_bus: EventBus, args: dict[str, Any]
+ ) -> HandlingResult:
+ """Handle message->body->data and notify the correct event subscribers.
+
+ :return: A message response
+ """
+ event_bus.notify(FanSpeedEvent(FanSpeedLevel.from_xml(str(args["speed"]))))
+ return HandlingResult.success()
+
@classmethod
def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
"""Handle xml message and notify the correct event subscribers.
@@ -29,16 +42,17 @@ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
if xml.attrib.get("ret") != "ok" or not (speed := xml.attrib.get("speed")):
return HandlingResult.analyse()
- event: FanSpeedEvent | None = None
+ event_bus.notify(FanSpeedEvent(FanSpeedLevel.from_xml(speed)))
+ return HandlingResult.success()
+
- match speed.lower():
- case "standard":
- event = FanSpeedEvent(FanSpeedLevel.NORMAL)
- case "strong":
- event = FanSpeedEvent(FanSpeedLevel.MAX)
+class SetCleanSpeed(XmlSetCommand):
+ """SetCleanSpeed command."""
- if event:
- event_bus.notify(event)
- return HandlingResult.success()
+ NAME = "SetCleanSpeed"
+ get_command = GetCleanSpeed
+ _mqtt_params = MappingProxyType({"speed": InitParam(FanSpeedLevel)})
- return HandlingResult.analyse()
+ def __init__(self, speed: FanSpeedLevel | str) -> None:
+ speed_level = speed.xml_value if isinstance(speed, FanSpeedLevel) else speed
+ super().__init__({"speed": speed_level})
diff --git a/deebot_client/commands/xml/map.py b/deebot_client/commands/xml/map.py
new file mode 100644
index 000000000..7f2fdc57a
--- /dev/null
+++ b/deebot_client/commands/xml/map.py
@@ -0,0 +1,281 @@
+"""Map commands."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from deebot_client.command import Command, CommandResult
+from deebot_client.const import DataType
+from deebot_client.events import MajorMapEvent, MapSetEvent, MapSetType, MinorMapEvent
+from deebot_client.events.map import CachedMapInfoEvent, MapSubsetEvent
+from deebot_client.logging_filter import get_logger
+from deebot_client.message import HandlingResult, HandlingState
+
+from .common import XmlCommandWithMessageHandling
+
+if TYPE_CHECKING:
+ from typing import Any
+ from xml.etree.ElementTree import Element
+
+ from deebot_client.event_bus import EventBus
+
+_LOGGER = get_logger(__name__)
+
+
+class GetMapSt(XmlCommandWithMessageHandling):
+ """GetMapSt command.
+
+ This command checks whether the current map has been built successfully or not.
+ """
+
+ NAME = "GetMapSt"
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ Sample message response:
+ ""
+
+ :return: A message response
+ """
+ if xml.attrib.get("ret") != "ok" or not (st := xml.attrib.get("st")):
+ return HandlingResult.analyse()
+
+ built = st == "built"
+ event_bus.notify(CachedMapInfoEvent(name="", active=built))
+ return HandlingResult.success()
+
+
+class GetMapSet(XmlCommandWithMessageHandling):
+ """GetMapSet command.
+
+ This commands gets the list of logical pieces each map is composed of.
+ XML robots do not support multiple maps, so the mid parameter is ignored.
+ """
+
+ _ARGS_MSID = "msid"
+ _ARGS_TYPE = "type"
+ _ARGS_SUBSETS = "subsets"
+
+ NAME = "GetMapSet"
+
+ def __init__(
+ self,
+ # pylint: disable=unused-argument
+ mid: str, # noqa: ARG002
+ # pylint: disable=redefined-builtin
+ type: (MapSetType | str) = MapSetType.VIRTUAL_WALLS,
+ ) -> None:
+ if isinstance(type, MapSetType):
+ type = type.value
+
+ super().__init__({"tp": type})
+
+ @classmethod
+ def _find_subsets(cls, maps: list[Element]) -> list[int]:
+ return [int(mid) for map in maps if (mid := map.attrib.get("mid")) is not None]
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ Sample message response:
+ b''
+
+ :return: A message response
+ """
+ if (
+ xml.attrib.get("ret") != "ok"
+ or not (msid := xml.attrib.get("msid"))
+ or not (area_type := xml.attrib.get("tp"))
+ or not (m := xml.findall("m"))
+ ):
+ return HandlingResult.analyse()
+ subsets = cls._find_subsets(m)
+ event_bus.notify(MapSetEvent(MapSetType(area_type), subsets=subsets))
+ args = {
+ cls._ARGS_MSID: msid,
+ cls._ARGS_TYPE: area_type,
+ cls._ARGS_SUBSETS: subsets,
+ }
+ return HandlingResult(HandlingState.SUCCESS, args)
+
+ def _handle_response(
+ self, event_bus: EventBus, response: dict[str, Any]
+ ) -> CommandResult:
+ """Handle response from a command.
+
+ :return: A message response
+ """
+ result = super()._handle_response(event_bus, response)
+ if result.state == HandlingState.SUCCESS and result.args:
+ commands: list[Command] = [
+ PullM(
+ mid=subset,
+ msid=result.args[self._ARGS_MSID],
+ type=result.args[self._ARGS_TYPE],
+ )
+ for subset in result.args[self._ARGS_SUBSETS]
+ ]
+ return CommandResult(result.state, result.args, commands)
+
+ return result
+
+
+class GetMapM(XmlCommandWithMessageHandling):
+ """GetMapM command."""
+
+ NAME = "GetMapM"
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ Sample message response:
+ b''
+
+ :return: A message response
+ """
+ if not (map_hashes := xml.attrib.get("m")) or not (idx := xml.attrib.get("i")):
+ return HandlingResult.analyse()
+ event_bus.notify(
+ MajorMapEvent(
+ idx,
+ values=[int(map_hash.strip()) for map_hash in map_hashes.split(",")],
+ requested=True,
+ type=DataType.XML,
+ )
+ )
+ return HandlingResult.success()
+
+
+class PullM(XmlCommandWithMessageHandling):
+ """PullM command.
+
+ Pulls map subset coordinates
+ """
+
+ _ARG_COORDS = "coordinates"
+
+ NAME = "PullM"
+
+ def __init__(
+ self,
+ *,
+ mid: str | int,
+ msid: str | int,
+ # pylint: disable=redefined-builtin
+ type: (MapSetType | str) = MapSetType.ROOMS,
+ ) -> None:
+ if isinstance(type, MapSetType):
+ type = type.value
+
+ self._map_type = type
+ self._map_subset_id = int(mid)
+
+ super().__init__(
+ {
+ "mid": str(mid),
+ "msid": str(msid),
+ "tp": type,
+ "seq": "0",
+ },
+ )
+
+ @classmethod
+ def _handle_xml(cls, _event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ Sample message responses:
+
+
+
+ :return: A message response
+ """
+ if xml.attrib.get("ret") != "ok" or not (coords := xml.attrib.get("m")):
+ return HandlingResult.analyse()
+
+ args = {cls._ARG_COORDS: coords}
+ return HandlingResult(HandlingState.SUCCESS, args)
+
+ def _handle_response(
+ self, event_bus: EventBus, response: dict[str, Any]
+ ) -> CommandResult:
+ """Handle response from a command.
+
+ :return: A message response
+ """
+ result = super()._handle_response(event_bus, response)
+ if result.state == HandlingState.SUCCESS and result.args:
+ coords = result.args[self._ARG_COORDS]
+ event_bus.notify(
+ MapSubsetEvent(
+ id=self._map_subset_id,
+ type=MapSetType(self._map_type),
+ coordinates=coords,
+ )
+ )
+ return CommandResult(result.state, result.args)
+
+ return result
+
+
+class PullMP(XmlCommandWithMessageHandling):
+ """PullMP command."""
+
+ _ARG_PIECE = "piece"
+
+ NAME = "PullMP"
+
+ def __init__(self, *, piece_index: int) -> None:
+ self._piece_index = piece_index
+ super().__init__({"pid": str(piece_index)})
+
+ @classmethod
+ def _handle_xml(cls, _event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ Sample message response:
+ b{'ret': 'ok', 'i': '1839263381', 'p': 'x_q_a_a_b_a_a_q_jw_a_a_a_a_bv/f//o7f/_rz5_i_f_x_i5_y_v_g4kijmo4_y_h+e7k_ho_l_t_l8_u6_p_a_f_ls_x7_jhrz0_kg_a=', 'event': 'pull_m_p'}
+
+ :return: A message response
+ """
+ if xml.attrib.get("ret") != "ok" or not (piece := xml.attrib.get("p")):
+ return HandlingResult.analyse()
+ args = {cls._ARG_PIECE: piece}
+ return HandlingResult(HandlingState.SUCCESS, args)
+
+ def _handle_response(
+ self, event_bus: EventBus, response: dict[str, Any]
+ ) -> CommandResult:
+ """Handle response from a command.
+
+ :return: A message response
+ """
+ result = super()._handle_response(event_bus, response)
+ if result.state == HandlingState.SUCCESS and result.args:
+ piece = result.args[self._ARG_PIECE]
+ event_bus.notify(MinorMapEvent(index=self._piece_index, value=piece))
+ return CommandResult(result.state, result.args)
+
+ return result
+
+
+class GetTrM(XmlCommandWithMessageHandling):
+ """GetTrM command.
+
+ Enables trace reporting from the bot.
+ """
+
+ NAME = "GetTrM"
+
+ @classmethod
+ def _handle_xml(cls, _event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ :return: A message response
+ """
+ if xml.attrib.get("ret") != "ok":
+ return HandlingResult.analyse()
+ return HandlingResult(HandlingState.SUCCESS)
diff --git a/deebot_client/commands/xml/water_info.py b/deebot_client/commands/xml/water_info.py
new file mode 100644
index 000000000..e5dd251a4
--- /dev/null
+++ b/deebot_client/commands/xml/water_info.py
@@ -0,0 +1,96 @@
+"""WaterBox command module."""
+
+from __future__ import annotations
+
+from types import MappingProxyType
+from typing import TYPE_CHECKING, Any
+
+from deebot_client.command import InitParam
+from deebot_client.events import (
+ WaterAmount,
+ WaterInfoEvent,
+)
+from deebot_client.message import HandlingResult
+from deebot_client.util import get_enum
+
+from .common import XmlGetCommand, XmlSetCommand
+
+if TYPE_CHECKING:
+ from xml.etree.ElementTree import Element
+
+ from deebot_client.event_bus import EventBus
+
+
+class GetWaterPermeability(XmlGetCommand):
+ """GetWaterPermeability command."""
+
+ NAME = "GetWaterPermeability"
+
+ @classmethod
+ def handle_set_args(
+ cls, event_bus: EventBus, args: dict[str, Any]
+ ) -> HandlingResult:
+ """Handle message->body->data and notify the correct event subscribers.
+
+ :return: A message response
+ """
+ event_bus.notify(WaterInfoEvent(amount=WaterAmount(int(args["v"]))))
+ return HandlingResult.success()
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ :return: A message response
+ """
+ if xml.attrib.get("ret") != "ok" or not (value := xml.attrib.get("v")):
+ return HandlingResult.analyse()
+
+ event_bus.notify(WaterInfoEvent(amount=WaterAmount(int(value))))
+ return HandlingResult.success()
+
+
+class SetWaterPermeability(XmlSetCommand):
+ """SetWaterPermeability command."""
+
+ NAME = "SetWaterPermeability"
+ get_command = GetWaterPermeability
+ _mqtt_params = MappingProxyType(
+ {
+ "amount": InitParam(WaterAmount),
+ }
+ )
+
+ def __init__(self, amount: WaterAmount | str) -> None:
+ if isinstance(amount, str):
+ amount = get_enum(WaterAmount, amount)
+ super().__init__({"v": str(amount.value)})
+
+
+class GetWaterBoxInfo(XmlGetCommand):
+ """GetWaterBoxInfo command."""
+
+ NAME = "GetWaterBoxInfo"
+
+ @classmethod
+ def handle_set_args(
+ cls, event_bus: EventBus, args: dict[str, Any]
+ ) -> HandlingResult:
+ """Handle message->body->data and notify the correct event subscribers.
+
+ :return: A message response
+ """
+ event_bus.notify(WaterInfoEvent(mop_attached=(str(args["on"]) != "0")))
+ return HandlingResult.success()
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ :return: A message response
+ """
+ if xml.attrib.get("ret") != "ok" or not (on := xml.attrib.get("on")):
+ return HandlingResult.analyse()
+
+ event_bus.notify(WaterInfoEvent(mop_attached=on != "0"))
+ return HandlingResult.success()
diff --git a/deebot_client/device.py b/deebot_client/device.py
index 0b2838e7c..9cc2f12d8 100644
--- a/deebot_client/device.py
+++ b/deebot_client/device.py
@@ -14,6 +14,7 @@
from deebot_client.util import cancel
from .command import Command
+from .const import DataType
from .event_bus import EventBus
from .events import (
AvailabilityEvent,
@@ -38,7 +39,6 @@
_LOGGER = get_logger(__name__)
_AVAILABLE_CHECK_INTERVAL = 60
-
DeviceCommandExecute = Callable[[Command], Coroutine[Any, Any, dict[str, Any]]]
@@ -200,15 +200,31 @@ def _handle_message(
try:
_LOGGER.debug("Try to handle message %s: %s", message_name, message_data)
- if message := get_message(message_name, self._device_info.static.data_type):
+ message_data_type = self._device_info.static.data_type
+ if message := get_message(message_name, message_data_type):
+ data: dict[str, Any] | str
if isinstance(message_data, dict):
data = message_data
- else:
+ elif message_data_type == DataType.JSON:
data = json.loads(message_data)
+ elif isinstance(message_data, bytes):
+ data = message_data.decode()
+ elif isinstance(message_data, bytearray):
+ data = bytes(message_data).decode()
+ elif isinstance(message_data, str):
+ data = message_data
+ else:
+ msg = "Unsupported message data type {message_name}: {message_type}"
+ raise TypeError(
+ msg.format(
+ message_name=message_name, message_type=type(message_data)
+ )
+ )
- fw_version = data.get("header", {}).get("fwVer", None)
- if fw_version:
- self.fw_version = fw_version
+ if isinstance(data, dict):
+ fw_version = data.get("header", {}).get("fwVer", None)
+ if fw_version:
+ self.fw_version = fw_version
message.handle(self.events, data)
except Exception: # pylint: disable=broad-except
diff --git a/deebot_client/events/map.py b/deebot_client/events/map.py
index b4d4526d7..ca2ad52c1 100644
--- a/deebot_client/events/map.py
+++ b/deebot_client/events/map.py
@@ -6,6 +6,7 @@
from enum import Enum, unique
from typing import TYPE_CHECKING, Any
+from deebot_client.const import DataType
from deebot_client.events import Event
if TYPE_CHECKING:
@@ -47,6 +48,7 @@ class MajorMapEvent(Event):
map_id: str
values: list[int]
requested: bool = field(kw_only=True)
+ type: DataType = DataType.JSON
@dataclass(frozen=True)
diff --git a/deebot_client/events/water_info.py b/deebot_client/events/water_info.py
index 1f543f5bf..b4b67814c 100644
--- a/deebot_client/events/water_info.py
+++ b/deebot_client/events/water_info.py
@@ -30,7 +30,7 @@ class SweepType(IntEnum):
class WaterInfoEvent(Event):
"""Water info event representation."""
- amount: WaterAmount
# None means no data available
+ amount: WaterAmount | None = None
sweep_type: SweepType | None = None
mop_attached: bool | None = field(kw_only=True, default=None)
diff --git a/deebot_client/hardware/deebot/2pv572.py b/deebot_client/hardware/deebot/2pv572.py
new file mode 100644
index 000000000..d5c126267
--- /dev/null
+++ b/deebot_client/hardware/deebot/2pv572.py
@@ -0,0 +1,134 @@
+"""2pv572 Capabilities."""
+
+from __future__ import annotations
+
+from deebot_client.capabilities import (
+ Capabilities,
+ CapabilityClean,
+ CapabilityCleanAction,
+ CapabilityCustomCommand,
+ CapabilityEvent,
+ CapabilityExecute,
+ CapabilityLifeSpan,
+ CapabilityMap,
+ CapabilitySettings,
+ CapabilitySetTypes,
+ CapabilityStats,
+ DeviceType,
+)
+from deebot_client.commands.json import GetNetInfoLegacy
+from deebot_client.commands.json.custom import CustomCommand
+from deebot_client.commands.xml import (
+ Charge,
+ Clean,
+ CleanArea,
+ GetBatteryInfo,
+ GetCleanLogs,
+ GetCleanSpeed,
+ GetCleanState,
+ GetLifeSpan,
+ GetWaterPermeability,
+ PlaySound,
+ SetCleanSpeed,
+)
+from deebot_client.commands.xml.charge_state import GetChargeState
+from deebot_client.commands.xml.error import GetError
+from deebot_client.commands.xml.map import GetMapM, GetMapSt, GetTrM
+from deebot_client.commands.xml.pos import GetPos
+from deebot_client.commands.xml.stats import GetCleanSum
+from deebot_client.commands.xml.water_info import GetWaterBoxInfo, SetWaterPermeability
+from deebot_client.const import DataType
+from deebot_client.events import (
+ AvailabilityEvent,
+ BatteryEvent,
+ CleanLogEvent,
+ CustomCommandEvent,
+ ErrorEvent,
+ FanSpeedEvent,
+ FanSpeedLevel,
+ LifeSpan,
+ LifeSpanEvent,
+ NetworkInfoEvent,
+ ReportStatsEvent,
+ RoomsEvent,
+ StateEvent,
+ StatsEvent,
+ TotalStatsEvent,
+ WaterAmount,
+ WaterInfoEvent,
+)
+from deebot_client.events.map import (
+ CachedMapInfoEvent,
+ MajorMapEvent,
+ MapChangedEvent,
+ MapTraceEvent,
+ PositionsEvent,
+)
+from deebot_client.models import StaticDeviceInfo
+from deebot_client.util import short_name
+
+from . import DEVICES
+
+DEVICES[short_name(__name__)] = StaticDeviceInfo(
+ DataType.XML,
+ Capabilities(
+ availability=CapabilityEvent(AvailabilityEvent, []),
+ battery=CapabilityEvent(BatteryEvent, [GetBatteryInfo()]),
+ charge=CapabilityExecute(Charge),
+ clean=CapabilityClean(
+ action=CapabilityCleanAction(command=Clean, area=CleanArea),
+ log=CapabilityEvent(CleanLogEvent, [GetCleanLogs()]),
+ ),
+ custom=CapabilityCustomCommand(
+ event=CustomCommandEvent, get=[], set=CustomCommand
+ ),
+ device_type=DeviceType.VACUUM,
+ error=CapabilityEvent(ErrorEvent, [GetError()]),
+ fan_speed=CapabilitySetTypes(
+ event=FanSpeedEvent,
+ get=[GetCleanSpeed()],
+ set=SetCleanSpeed,
+ types=(
+ FanSpeedLevel.NORMAL,
+ FanSpeedLevel.MAX,
+ ),
+ ),
+ life_span=CapabilityLifeSpan(
+ types=(LifeSpan.BRUSH, LifeSpan.SIDE_BRUSH, LifeSpan.DUST_CASE_HEAP),
+ event=LifeSpanEvent,
+ get=[
+ GetLifeSpan(LifeSpan.BRUSH),
+ GetLifeSpan(LifeSpan.SIDE_BRUSH),
+ GetLifeSpan(LifeSpan.DUST_CASE_HEAP),
+ ],
+ reset=CustomCommand,
+ ),
+ map=CapabilityMap(
+ cached_info=CapabilityEvent(CachedMapInfoEvent, [GetMapSt()]),
+ changed=CapabilityEvent(MapChangedEvent, []),
+ major=CapabilityEvent(MajorMapEvent, [GetMapM()]),
+ position=CapabilityEvent(PositionsEvent, [GetPos()]),
+ rooms=CapabilityEvent(RoomsEvent, [GetMapSt()]),
+ trace=CapabilityEvent(MapTraceEvent, [GetTrM()]),
+ ),
+ network=CapabilityEvent(NetworkInfoEvent, [GetNetInfoLegacy()]),
+ play_sound=CapabilityExecute(PlaySound),
+ state=CapabilityEvent(StateEvent, [GetChargeState(), GetCleanState()]),
+ stats=CapabilityStats(
+ clean=CapabilityEvent(StatsEvent, []),
+ report=CapabilityEvent(ReportStatsEvent, []),
+ total=CapabilityEvent(TotalStatsEvent, [GetCleanSum()]),
+ ),
+ settings=CapabilitySettings(),
+ water=CapabilitySetTypes(
+ event=WaterInfoEvent,
+ get=[GetWaterPermeability(), GetWaterBoxInfo()],
+ set=SetWaterPermeability,
+ types=(
+ WaterAmount.LOW,
+ WaterAmount.MEDIUM,
+ WaterAmount.HIGH,
+ ),
+ ),
+ ),
+)
diff --git a/deebot_client/map.py b/deebot_client/map.py
index c6afe8348..9065c37a2 100644
--- a/deebot_client/map.py
+++ b/deebot_client/map.py
@@ -6,9 +6,9 @@
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Final
+from deebot_client.const import DataType
from deebot_client.events.map import CachedMapInfoEvent, MapChangedEvent
-from .commands.json import GetMinorMap
from .events import (
MajorMapEvent,
MapSetEvent,
@@ -31,6 +31,8 @@
if TYPE_CHECKING:
from collections.abc import Callable
+ from deebot_client.command import Command
+
from .device import DeviceCommandExecute
from .event_bus import EventBus
@@ -102,11 +104,16 @@ async def on_major_map(event: MajorMapEvent) -> None:
self._map_data.map_piece_crc32_indicates_update(idx, value)
and event.requested
):
- tg.create_task(
- self._execute_command(
- GetMinorMap(map_id=event.map_id, piece_index=idx)
- )
- )
+ command: Command
+ if event.type == DataType.JSON:
+ from deebot_client.commands.json.map import GetMinorMap
+
+ command = GetMinorMap(map_id=event.map_id, piece_index=idx)
+ else:
+ from deebot_client.commands.xml.map import PullMP
+
+ command = PullMP(piece_index=idx)
+ tg.create_task(self._execute_command(command))
unsubscribers.append(self._event_bus.subscribe(MajorMapEvent, on_major_map))
diff --git a/deebot_client/messages/xml/__init__.py b/deebot_client/messages/xml/__init__.py
index 21d5370a9..5364ec21a 100644
--- a/deebot_client/messages/xml/__init__.py
+++ b/deebot_client/messages/xml/__init__.py
@@ -5,17 +5,48 @@
from typing import TYPE_CHECKING
from deebot_client.messages.xml.battery import BatteryInfo
+from deebot_client.messages.xml.charge import ChargeState
+from deebot_client.messages.xml.clean import (
+ CleanedPos,
+ CleanReport,
+ CleanReportServer,
+ CleanSt,
+)
+from deebot_client.messages.xml.map import MapP, Trace
+from deebot_client.messages.xml.pos import Pos
+from deebot_client.messages.xml.sleep import SleepStatus
+from deebot_client.messages.xml.water_info import WaterBoxInfo
if TYPE_CHECKING:
- from collections.abc import Sequence
-
from deebot_client.message import Message
-__all__: Sequence[str] = ["BatteryInfo"]
+__all__ = [
+ "BatteryInfo",
+ "ChargeState",
+ "CleanReport",
+ "CleanReportServer",
+ "CleanSt",
+ "CleanedPos",
+ "MapP",
+ "Pos",
+ "SleepStatus",
+ "Trace",
+ "WaterBoxInfo",
+]
# fmt: off
# ordered by file asc
_MESSAGES: list[type[Message]] = [
- BatteryInfo
+ BatteryInfo,
+ ChargeState,
+ CleanReport,
+ CleanReportServer,
+ CleanSt,
+ CleanedPos,
+ MapP,
+ Pos,
+ SleepStatus,
+ Trace,
+ WaterBoxInfo,
]
# fmt: on
diff --git a/deebot_client/messages/xml/charge.py b/deebot_client/messages/xml/charge.py
new file mode 100644
index 000000000..e6fff7ea6
--- /dev/null
+++ b/deebot_client/messages/xml/charge.py
@@ -0,0 +1,47 @@
+"""Charge messages."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from deebot_client.events import StateEvent
+from deebot_client.message import HandlingResult
+from deebot_client.messages.xml.common import XmlMessage
+from deebot_client.models import State
+
+if TYPE_CHECKING:
+ from xml.etree.ElementTree import Element
+
+ from deebot_client.event_bus import EventBus
+
+
+class ChargeState(XmlMessage):
+ """ChargeState message."""
+
+ NAME = "ChargeState"
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ :return: A message response
+ """
+ status: State | None = None
+
+ if (charge := xml.find("charge")) is not None:
+ type_ = charge.attrib["type"].lower()
+ match type_:
+ case "slotcharging" | "slot_charging" | "wirecharging":
+ status = State.DOCKED
+ case "idle":
+ pass
+ case "going":
+ status = State.RETURNING
+ case _:
+ status = State.ERROR
+
+ if status:
+ event_bus.notify(StateEvent(status))
+ return HandlingResult.success()
+
+ return HandlingResult.analyse()
diff --git a/deebot_client/messages/xml/clean.py b/deebot_client/messages/xml/clean.py
new file mode 100644
index 000000000..631608a17
--- /dev/null
+++ b/deebot_client/messages/xml/clean.py
@@ -0,0 +1,158 @@
+"""Clean messages."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from deebot_client.events import (
+ CleanJobStatus,
+ FanSpeedEvent,
+ FanSpeedLevel,
+ Position,
+ PositionsEvent,
+ ReportStatsEvent,
+ StateEvent,
+ StatsEvent,
+)
+from deebot_client.logging_filter import get_logger
+from deebot_client.message import HandlingResult
+from deebot_client.messages.xml.common import XmlMessage
+from deebot_client.models import CleanAction, State
+from deebot_client.rs.map import PositionType
+
+if TYPE_CHECKING:
+ from xml.etree.ElementTree import Element
+
+ from deebot_client.event_bus import EventBus
+
+_LOGGER = get_logger(__name__)
+
+
+class CleanSt(XmlMessage):
+ """CleanSt message."""
+
+ NAME = "CleanSt"
+
+ @classmethod
+ def _handle_xml(cls, _event_bus: EventBus, _xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ b""
+
+ We currently ignore this message as we prefer to use CleanReport
+ :return: A message response
+ """
+ return HandlingResult.success()
+
+
+class CleanReport(XmlMessage):
+ """CleanReport message."""
+
+ NAME = "CleanReport"
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ b""
+
+ :return: A message response
+ """
+ if (clean := xml.find("clean")) is None:
+ return HandlingResult.analyse()
+
+ speed_attrib = clean.attrib.get("speed")
+ if speed_attrib is not None:
+ fan_speed_level = FanSpeedLevel.from_xml(speed_attrib)
+ event_bus.notify(FanSpeedEvent(fan_speed_level))
+
+ clean_attrib = clean.attrib.get("st")
+ if clean_attrib is not None:
+ clean_action = CleanAction.from_xml(clean_attrib)
+ if clean_action == CleanAction.START:
+ event_bus.notify(StateEvent(State.CLEANING))
+ elif clean_action == CleanAction.PAUSE:
+ event_bus.notify(StateEvent(State.PAUSED))
+ else:
+ _LOGGER.debug("Ignored CleanState %s", clean_action)
+
+ return HandlingResult.success()
+
+
+class CleanReportServer(XmlMessage):
+ """CleanReportServer message."""
+
+ NAME = "CleanReportServer"
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ b""
+ b""
+
+ :return: A message response
+ """
+ event_reported = False
+ if (
+ (act := xml.attrib.get("act")) is not None
+ and (last := xml.attrib.get("last")) is not None
+ and (area := xml.attrib.get("area")) is not None
+ ):
+ clean_session = xml.attrib.get("cs")
+ clean_type = xml.attrib.get("type")
+
+ clean_action = CleanAction.from_xml(act)
+ if clean_action == CleanAction.STOP:
+ event_bus.notify(
+ StatsEvent(area=int(area), time=int(last), type=clean_type)
+ )
+ event_reported = True
+ if clean_session:
+ if clean_action == CleanAction.STOP:
+ job_status = CleanJobStatus.FINISHED
+ elif clean_action == CleanAction.START:
+ job_status = CleanJobStatus.CLEANING
+ elif clean_action == CleanAction.PAUSE:
+ job_status = CleanJobStatus.MANUALLY_STOPPED
+ else:
+ job_status = CleanJobStatus.NO_STATUS
+ event_bus.notify(
+ ReportStatsEvent(
+ area=int(area),
+ time=int(last),
+ type=clean_type,
+ cleaning_id=clean_session,
+ status=job_status,
+ content=[],
+ )
+ )
+ event_reported = True
+ if event_reported:
+ return HandlingResult.success()
+ return HandlingResult.analyse()
+
+
+class CleanedPos(XmlMessage):
+ """CleanedPos message."""
+
+ NAME = "CleanedPos"
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ b""
+
+ :return: A message response
+ """
+ if p := xml.attrib.get("p"):
+ p_x, p_y = p.split(",", 2)
+ p_a = xml.attrib.get("a", 0)
+ position = Position(
+ type=PositionType.DEEBOT, x=int(p_x), y=int(p_y), a=int(p_a)
+ )
+ event_bus.notify(PositionsEvent(positions=[position]))
+ return HandlingResult.success()
+
+ return HandlingResult.analyse()
diff --git a/deebot_client/messages/xml/common.py b/deebot_client/messages/xml/common.py
index 05e7399bc..5c9d4aca1 100644
--- a/deebot_client/messages/xml/common.py
+++ b/deebot_client/messages/xml/common.py
@@ -5,7 +5,7 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
-from defusedxml import ElementTree # type: ignore[import-untyped]
+from defusedxml import ElementTree as ET # type: ignore[import-untyped]
from deebot_client.message import MessageStr
@@ -25,7 +25,7 @@ def _handle_str(cls, event_bus: EventBus, message: str) -> HandlingResult:
:return: A message response
"""
- xml = ElementTree.fromstring(message)
+ xml = ET.fromstring(message)
return cls._handle_xml(event_bus, xml)
@classmethod
diff --git a/deebot_client/messages/xml/map.py b/deebot_client/messages/xml/map.py
new file mode 100644
index 000000000..df41f3f1b
--- /dev/null
+++ b/deebot_client/messages/xml/map.py
@@ -0,0 +1,60 @@
+"""Map messages."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from deebot_client.events.map import MapTraceEvent, MinorMapEvent
+from deebot_client.message import HandlingResult
+from deebot_client.messages.xml.common import XmlMessage
+
+if TYPE_CHECKING:
+ from xml.etree.ElementTree import Element
+
+ from deebot_client.event_bus import EventBus
+
+
+class MapP(XmlMessage):
+ """MapP message.
+
+ Probably a map piece
+ """
+
+ NAME = "MapP"
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ b""
+
+ :return: A message response
+ """
+ if (pid := xml.attrib.get("pid")) and (piece := xml.attrib.get("p")):
+ event_bus.notify(MinorMapEvent(index=int(pid), value=piece))
+ return HandlingResult.success()
+ return HandlingResult.analyse()
+
+
+class Trace(XmlMessage):
+ """Trace message."""
+
+ NAME = "trace"
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ Sample message:
+
+
+ :return: A message response
+ """
+ if (
+ (tf := xml.attrib.get("tf"))
+ and (tt := xml.attrib.get("tt"))
+ and (tr := xml.attrib.get("tr"))
+ ):
+ event_bus.notify(MapTraceEvent(start=int(tf), total=int(tt), data=tr))
+ return HandlingResult.success()
+ return HandlingResult.analyse()
diff --git a/deebot_client/messages/xml/pos.py b/deebot_client/messages/xml/pos.py
new file mode 100644
index 000000000..703495337
--- /dev/null
+++ b/deebot_client/messages/xml/pos.py
@@ -0,0 +1,34 @@
+"""Pos messages."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from deebot_client.events import Position, PositionsEvent
+from deebot_client.message import HandlingResult
+from deebot_client.messages.xml.common import XmlMessage
+from deebot_client.rs.map import PositionType
+
+if TYPE_CHECKING:
+ from xml.etree.ElementTree import Element
+
+ from deebot_client.event_bus import EventBus
+
+
+class Pos(XmlMessage):
+ """Pos message."""
+
+ NAME = "Pos"
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ if p := xml.attrib.get("p"):
+ p_x, p_y = p.split(",", 2)
+ p_a = xml.attrib.get("a", 0)
+ position = Position(
+ type=PositionType.DEEBOT, x=int(p_x), y=int(p_y), a=int(p_a)
+ )
+ event_bus.notify(PositionsEvent(positions=[position]))
+ return HandlingResult.success()
+
+ return HandlingResult.analyse()
diff --git a/deebot_client/messages/xml/sleep.py b/deebot_client/messages/xml/sleep.py
new file mode 100644
index 000000000..d31ab0b28
--- /dev/null
+++ b/deebot_client/messages/xml/sleep.py
@@ -0,0 +1,31 @@
+"""Sleep messages."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from deebot_client.message import HandlingResult
+from deebot_client.messages.xml.common import XmlMessage
+
+if TYPE_CHECKING:
+ from xml.etree.ElementTree import Element
+
+ from deebot_client.event_bus import EventBus
+
+
+class SleepStatus(XmlMessage):
+ """SleepStatus message."""
+
+ NAME = "SleepStatus"
+
+ @classmethod
+ def _handle_xml(cls, _event_bus: EventBus, _xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ b""
+
+ We currently ignore this message
+
+ :return: A message response
+ """
+ return HandlingResult.success()
diff --git a/deebot_client/messages/xml/water_info.py b/deebot_client/messages/xml/water_info.py
new file mode 100644
index 000000000..701c18bdf
--- /dev/null
+++ b/deebot_client/messages/xml/water_info.py
@@ -0,0 +1,32 @@
+"""Water messages."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from deebot_client.events import WaterInfoEvent
+from deebot_client.message import HandlingResult
+from deebot_client.messages.xml.common import XmlMessage
+
+if TYPE_CHECKING:
+ from xml.etree.ElementTree import Element
+
+ from deebot_client.event_bus import EventBus
+
+
+class WaterBoxInfo(XmlMessage):
+ """WaterBoxInfo message."""
+
+ NAME = "WaterBoxInfo"
+
+ @classmethod
+ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
+ """Handle xml message and notify the correct event subscribers.
+
+ :return: A message response
+ """
+ if (on := xml.attrib.get("on")) is None:
+ return HandlingResult.analyse()
+
+ event_bus.notify(WaterInfoEvent(mop_attached=on != "0"))
+ return HandlingResult.success()
diff --git a/tests/commands/xml/test_charge_state.py b/tests/commands/xml/test_charge_state.py
index 926ac3ae5..2a3c6e9e9 100644
--- a/tests/commands/xml/test_charge_state.py
+++ b/tests/commands/xml/test_charge_state.py
@@ -21,11 +21,10 @@
("state", "expected_event"),
[
("SlotCharging", StateEvent(State.DOCKED)),
- ("Idle", StateEvent(State.IDLE)),
("Going", StateEvent(State.RETURNING)),
("unknown state returned", StateEvent(State.ERROR)),
],
- ids=["slot_charging", "idle", "going", "unknown"],
+ ids=["slot_charging", "going", "unknown"],
)
async def test_get_charge_state(state: str, expected_event: Event) -> None:
json = get_request_xml(f"")
@@ -34,8 +33,12 @@ async def test_get_charge_state(state: str, expected_event: Event) -> None:
@pytest.mark.parametrize(
"xml",
- ["", ""],
- ids=["error", "no_state"],
+ [
+ "",
+ "",
+ "",
+ ],
+ ids=["error", "no_state", "idle"],
)
async def test_get_charge_state_error(xml: str) -> None:
json = get_request_xml(xml)
diff --git a/tests/commands/xml/test_clean.py b/tests/commands/xml/test_clean.py
new file mode 100644
index 000000000..14daae18c
--- /dev/null
+++ b/tests/commands/xml/test_clean.py
@@ -0,0 +1,81 @@
+from __future__ import annotations
+
+import pytest
+
+from deebot_client.command import CommandResult
+from deebot_client.commands.xml import GetCleanState
+from deebot_client.commands.xml.clean import CleanArea
+from deebot_client.events import FanSpeedEvent, FanSpeedLevel, StateEvent
+from deebot_client.message import HandlingState
+from deebot_client.models import CleanMode, State
+from tests.commands import assert_command
+
+from . import get_request_xml
+
+
+@pytest.mark.parametrize(
+ ("command", "command_result"),
+ [
+ (CleanArea(CleanMode.SPOT_AREA, "4", 1), HandlingState.SUCCESS),
+ ],
+)
+async def test_CleanArea(command: CleanArea, command_result: HandlingState) -> None:
+ json = get_request_xml("")
+ await assert_command(
+ command, json, None, command_result=CommandResult(command_result)
+ )
+
+
+@pytest.mark.parametrize(
+ ("speed", "state", "expected_fan_speed_event", "expected_state_event"),
+ [
+ (
+ "standard",
+ "s",
+ FanSpeedEvent(FanSpeedLevel.NORMAL),
+ StateEvent(State.CLEANING),
+ ),
+ (
+ "strong",
+ "s",
+ FanSpeedEvent(FanSpeedLevel.MAX),
+ StateEvent(State.CLEANING),
+ ),
+ (
+ "standard",
+ "p",
+ FanSpeedEvent(FanSpeedLevel.NORMAL),
+ StateEvent(State.PAUSED),
+ ),
+ ],
+ ids=["standard_cleaning", "strong_cleaning", "paused"],
+)
+async def test_get_clean_state(
+ speed: str,
+ state: str,
+ expected_fan_speed_event: FanSpeedEvent,
+ expected_state_event: StateEvent,
+) -> None:
+ json = get_request_xml(
+ f""
+ )
+ await assert_command(
+ GetCleanState(),
+ json,
+ [x for x in [expected_fan_speed_event, expected_state_event] if x is not None],
+ )
+
+
+@pytest.mark.parametrize(
+ "xml",
+ ["", ""],
+ ids=["error", "no_state"],
+)
+async def test_get_clean_state_error(xml: str) -> None:
+ json = get_request_xml(xml)
+ await assert_command(
+ GetCleanState(),
+ json,
+ None,
+ command_result=CommandResult(HandlingState.ANALYSE_LOGGED),
+ )
diff --git a/tests/commands/xml/test_fan_speed.py b/tests/commands/xml/test_fan_speed.py
index bf728ecfc..288b3c1e2 100644
--- a/tests/commands/xml/test_fan_speed.py
+++ b/tests/commands/xml/test_fan_speed.py
@@ -4,8 +4,8 @@
import pytest
-from deebot_client.command import CommandResult
-from deebot_client.commands.xml import GetFanSpeed
+from deebot_client.command import CommandResult, CommandWithMessageHandling
+from deebot_client.commands.xml import GetCleanSpeed, SetCleanSpeed
from deebot_client.events import FanSpeedEvent, FanSpeedLevel
from deebot_client.message import HandlingState
from tests.commands import assert_command
@@ -26,19 +26,36 @@
)
async def test_get_fan_speed(speed: str, expected_event: Event) -> None:
json = get_request_xml(f"")
- await assert_command(GetFanSpeed(), json, expected_event)
+ await assert_command(GetCleanSpeed(), json, expected_event)
@pytest.mark.parametrize(
"xml",
- ["", ""],
- ids=["error", "no_state"],
+ [""],
+ ids=["error"],
)
async def test_get_fan_speed_error(xml: str) -> None:
json = get_request_xml(xml)
await assert_command(
- GetFanSpeed(),
+ GetCleanSpeed(),
json,
None,
command_result=CommandResult(HandlingState.ANALYSE_LOGGED),
)
+
+
+@pytest.mark.parametrize(
+ ("command", "xml", "result"),
+ [
+ (
+ SetCleanSpeed(FanSpeedLevel.MAX),
+ "",
+ HandlingState.SUCCESS,
+ ),
+ ],
+)
+async def test_set_fan_speed(
+ command: CommandWithMessageHandling, xml: str, result: HandlingState
+) -> None:
+ json = get_request_xml(xml)
+ await assert_command(command, json, None, command_result=CommandResult(result))