Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion deebot_client/messages/xml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,25 @@

from deebot_client.messages.xml.battery import BatteryInfo
from deebot_client.messages.xml.charge_state import ChargeState
from deebot_client.messages.xml.map import MapP, Trace
from deebot_client.messages.xml.pos import Pos

if TYPE_CHECKING:
from collections.abc import Sequence

from deebot_client.message import Message

__all__: Sequence[str] = ["BatteryInfo", "ChargeState", "Pos"]
__all__: Sequence[str] = ["BatteryInfo", "ChargeState", "MapP", "Pos", "Trace"]
# fmt: off
# ordered by file asc
_MESSAGES: list[type[Message]] = [
BatteryInfo,

ChargeState,

MapP,
Trace,

Pos
]
# fmt: on
Expand Down
63 changes: 63 additions & 0 deletions deebot_client/messages/xml/map.py
Comment thread
edenhaus marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Map messages."""

from __future__ import annotations

from typing import TYPE_CHECKING

from deebot_client.events.map import MapTraceEvent, MinorMapEvent
from deebot_client.message import HandlingResult
from deebot_client.messages.xml.common import XmlMessage

if TYPE_CHECKING:
from xml.etree.ElementTree import Element

from deebot_client.event_bus import EventBus


class MapP(XmlMessage):
"""MapP message."""

NAME = "MapP"

@classmethod
def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
"""Handle xml message and notify the correct event subscribers.

Sample message:
b"<ctl td='MapP' i='1245233875' pid='27' p='XQAABAAQJwAAAABv/f//o7f/Rz5IFXI5YVG4kYRDU5g6Z4W8UflplyVyfWyHmYdt2YVgA/k3ENxVye1lEM...fqEp3pept9Re5qT0lZFDWpoFg4D51VXQopPSDLSo2ZpM/zQ4IAhvgWIKnp7zlwcd6Ekj7U2FnOTTAQeWq3DPT+MTrAVO2wL/6mmGODzk4hBtA/wjZzOujPgEA=='/>"
:return: A message response
"""
if (
(pid := xml.attrib.get("pid"))
and (piece := xml.attrib.get("p"))
and pid.isdecimal()
):
event_bus.notify(MinorMapEvent(index=int(pid), value=piece))
return HandlingResult.success()

return HandlingResult.analyse()


class Trace(XmlMessage):
"""Trace message."""

NAME = "trace"

@classmethod
def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
"""Handle xml message and notify the correct event subscribers.

Sample message:
<ctl td='trace' trid='631369' tf='16' tt='17' tr='XQAABAAKAAAAAG0/wEAAA2cAS5AAAA=='/>
:return: A message response
"""
if (
(tf := xml.attrib.get("tf"))
and tf.isdecimal()
and (tt := xml.attrib.get("tt"))
and tt.isdecimal()
and (tr := xml.attrib.get("tr"))
):
event_bus.notify(MapTraceEvent(start=int(tf), total=int(tt), data=tr))
return HandlingResult.success()
return HandlingResult.analyse()
56 changes: 56 additions & 0 deletions tests/messages/xml/test_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from __future__ import annotations

import pytest

from deebot_client.events import MapTraceEvent, MinorMapEvent
from deebot_client.message import HandlingState
from deebot_client.messages.xml import MapP, Trace
from tests.messages import assert_message, assert_message_failure


@pytest.mark.parametrize(("pid", "data"), [(42, "base64data")])
def test_MapP(pid: int, data: str) -> None:
xml_message = f"<ctl td='MapP' i='1245233875' pid='{pid}' p='{data}'/>"
assert_message(
MapP,
xml_message,
MinorMapEvent(index=pid, value=data),
)


@pytest.mark.parametrize(
"xml_message",
{
"<ctl td='MapP' i='1245233875' pid='XXX' p='base64data'/>",
"<ctl td='MapP' i='1245233875' p='base64data'/>",
"<ctl td='MapP' i='1245233875' pid='42' />",
"<ctl td='MapP' i='1245233875' />",
},
)
def test_MapP_error(xml_message: str) -> None:
assert_message_failure(MapP, xml_message, HandlingState.ANALYSE_LOGGED)


@pytest.mark.parametrize(("tf", "tt", "tr"), [(13, 42, "base64data")])
def test_Trace(tf: int, tt: int, tr: str) -> None:
xml_message = f"<ctl td='trace' trid='631369' tf='{tf}' tt='{tt}' tr='{tr}'/>"
assert_message(
Trace,
xml_message,
MapTraceEvent(start=tf, total=tt, data=tr),
)


@pytest.mark.parametrize(
"xml_message",
{
"<ctl td='trace' trid='631369' tt='17' tr='XQAABAAKAAAAAG0/wEAAA2cAS5AAAA=='/>",
"<ctl td='trace' trid='631369' tf='XXX' tt='17' tr='XQAABAAKAAAAAG0/wEAAA2cAS5AAAA=='/>",
"<ctl td='trace' trid='631369' tf='16' tr='XQAABAAKAAAAAG0/wEAAA2cAS5AAAA=='/>",
"<ctl td='trace' trid='631369' tf='16' tt='XXX' tr='XQAABAAKAAAAAG0/wEAAA2cAS5AAAA=='/>",
"<ctl td='trace' trid='631369' tf='16' tt='16' />",
"<ctl td='trace' trid='631369' />",
},
)
def test_Trace_error(xml_message: str) -> None:
assert_message_failure(Trace, xml_message, HandlingState.ANALYSE_LOGGED)