Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion deebot_client/commands/xml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .fan_speed import GetFanSpeed
from .life_span import GetLifeSpan
from .play_sound import PlaySound
from .pos import GetPos
from .pos import GetChargerPos, GetPos
from .stats import GetCleanSum

if TYPE_CHECKING:
Expand All @@ -24,6 +24,7 @@
"Charge",
"GetBatteryInfo",
"GetChargeState",
"GetChargerPos",
"GetCleanLogs",
"GetCleanSum",
"GetError",
Expand All @@ -37,9 +38,11 @@
# ordered by file asc
_COMMANDS: list[type[XmlCommand]] = [
GetBatteryInfo,
GetChargerPos,
GetCleanLogs,
GetError,
GetLifeSpan,
GetPos,
PlaySound,
]
# fmt: on
Expand Down
30 changes: 19 additions & 11 deletions deebot_client/commands/xml/pos.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

from typing import TYPE_CHECKING

from deebot_client.events import Position, PositionsEvent
from deebot_client.message import HandlingResult
from deebot_client.messages.xml import Pos
from deebot_client.rs.map import PositionType

from .common import XmlCommandWithMessageHandling
Expand All @@ -16,7 +16,7 @@
from deebot_client.event_bus import EventBus


class GetPos(XmlCommandWithMessageHandling):
class GetPos(XmlCommandWithMessageHandling, Pos):
"""GetPos command."""

NAME = "GetPos"
Expand All @@ -30,13 +30,21 @@ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
if xml.attrib.get("ret") != "ok" or xml.attrib.get("t") != "p":
return HandlingResult.analyse()

if p := xml.attrib.get("p"):
p_x, p_y = p.split(",", 2)
p_a = xml.attrib.get("a", 0)
position = Position(
type=PositionType.DEEBOT, x=int(p_x), y=int(p_y), a=int(p_a)
)
event_bus.notify(PositionsEvent(positions=[position]))
return HandlingResult.success()
return cls._parse_xml(PositionType.DEEBOT, event_bus, xml)

return HandlingResult.analyse()

class GetChargerPos(XmlCommandWithMessageHandling, Pos):
"""GetChargerPos command."""

NAME = "GetChargerPos"

@classmethod
def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
"""Handle xml message and notify the correct event subscribers.

:return: A message response
"""
if xml.attrib.get("ret") != "ok":
return HandlingResult.analyse()

return cls._parse_xml(PositionType.CHARGER, event_bus, xml)
6 changes: 4 additions & 2 deletions deebot_client/messages/xml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
from typing import TYPE_CHECKING

from deebot_client.messages.xml.battery import BatteryInfo
from deebot_client.messages.xml.pos import Pos

if TYPE_CHECKING:
from collections.abc import Sequence

from deebot_client.message import Message

__all__: Sequence[str] = ["BatteryInfo"]
__all__: Sequence[str] = ["BatteryInfo", "Pos"]
# fmt: off
# ordered by file asc
_MESSAGES: list[type[Message]] = [
BatteryInfo
BatteryInfo,
Pos
]
# fmt: on

Expand Down
42 changes: 42 additions & 0 deletions deebot_client/messages/xml/pos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Pos messages."""

from __future__ import annotations

from typing import TYPE_CHECKING

from deebot_client.events import Position, PositionsEvent
from deebot_client.message import HandlingResult
from deebot_client.messages.xml.common import XmlMessage
from deebot_client.rs.map import PositionType

if TYPE_CHECKING:
from xml.etree.ElementTree import Element

from deebot_client.event_bus import EventBus


class Pos(XmlMessage):
"""Pos message."""

NAME = "Pos"

@classmethod
def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult:
if xml.attrib.get("t") != "p":
return HandlingResult.analyse()

return cls._parse_xml(PositionType.DEEBOT, event_bus, xml)

@classmethod
def _parse_xml(
cls, position_type: PositionType, event_bus: EventBus, xml: Element
) -> HandlingResult:
"""Handle xml message and notify the correct event subscribers."""
if (p := xml.attrib.get("p")) and (xml.attrib.get("valid", "1")) == "1":
p_x, p_y = p.split(",", 2)
p_a = xml.attrib.get("a", 0)
position = Position(type=position_type, x=int(p_x), y=int(p_y), a=int(p_a))
event_bus.notify(PositionsEvent(positions=[position]))
return HandlingResult.success()

return HandlingResult.analyse()
33 changes: 31 additions & 2 deletions tests/commands/xml/test_pos.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from deebot_client.command import CommandResult
from deebot_client.commands.xml import GetPos
from deebot_client.commands.xml.pos import GetChargerPos
from deebot_client.events import Position, PositionsEvent
from deebot_client.message import HandlingState
from deebot_client.rs.map import PositionType
Expand All @@ -22,8 +23,13 @@ async def test_get_pos() -> None:

@pytest.mark.parametrize(
"xml",
["<ctl ret='error'/>", "<ctl ret='ok' t='p'></ctl>"],
ids=["error", "no_state"],
[
"<ctl ret='error'/>",
"<ctl ret='ok' t='p'></ctl>",
"<ctl ret='ok' t='??' p='77,-5' a='-3' valid='1'/>",
"<ctl ret='ok' t='p' p='77,-5' a='-3' valid='0'/>",
],
ids=["error", "no_state", "wrong_type", "not_valid"],
)
async def test_get_pos_error(xml: str) -> None:
json = get_request_xml(xml)
Expand All @@ -33,3 +39,26 @@ async def test_get_pos_error(xml: str) -> None:
None,
command_result=CommandResult(HandlingState.ANALYSE_LOGGED),
)


async def test_get_charger_pos() -> None:
json = get_request_xml("<ctl ret='ok' p='77,-5' a='-3'/>")
expected_event = PositionsEvent(
positions=[Position(type=PositionType.CHARGER, x=77, y=-5, a=-3)]
)
await assert_command(GetChargerPos(), json, expected_event)


@pytest.mark.parametrize(
"xml",
["<ctl ret='error'/>", "<ctl ret='ok'></ctl>"],
ids=["error", "no_state"],
)
async def test_get_charger_pos_error(xml: str) -> None:
json = get_request_xml(xml)
await assert_command(
GetChargerPos(),
json,
None,
command_result=CommandResult(HandlingState.ANALYSE_LOGGED),
)
32 changes: 32 additions & 0 deletions tests/messages/xml/test_pos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations

import pytest

from deebot_client.events import Position, PositionsEvent
from deebot_client.message import HandlingState
from deebot_client.messages.xml import Pos
from deebot_client.rs.map import PositionType
from tests.messages import assert_message, assert_message_failure


@pytest.mark.parametrize("position", [(-9, 15, 89)])
def test_Pos(position: tuple[int, int, int]) -> None:
x, y, a = position
xml_message = f'<ctl td="Pos" t="p" p="{x},{y}" a="{a}" valid="1" />'

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions about the message:

  • is t the type and will it be different for the charger position?
  • There is a valid attribute... I would assume that the position is only valid if it is equal to 1. Can you please verify it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point

  • I have no way of checking if valid = 0 ever happens, but I guess one more check doesn't hurt.
  • The app does show the charger position, I just didn't figure out what param it uses. I'll check that tomorrow

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a GetChargerPos command, but I didn't find any "Pos" events for it.
Suggestion:

  • Imlpement GetChargerPos
  • Only handle valid="1" t="p" messages as bot position, we'll leave everything else as HandlingResult.analyse() so that they can be eventually debugged further

assert_message(
Pos,
xml_message,
PositionsEvent([Position(type=PositionType.DEEBOT, x=x, y=y, a=a)]),
)


@pytest.mark.parametrize(
"xml_message",
{
'<ctl td="Pos" t="p" a="89" valid="1" />',
'<ctl td="Pos" t="??" p="0,0" a="89" valid="1" />',
'<ctl td="Pos" t="p" p="0,0" a="89" valid="0" />',
},
)
def test_Pos_error(xml_message: str) -> None:
assert_message_failure(Pos, xml_message, HandlingState.ANALYSE_LOGGED)
Loading