Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion deebot_client/messages/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .auto_empty import OnAutoEmpty
from .battery import OnBattery
from .gps_position import OnGpsPos
from .map import OnCachedMapInfo, OnMajorMap, OnMapInfoV2, OnMapSetV2
from .map import OnCachedMapInfo, OnMajorMap, OnMapInfoV2, OnMapSetV2, OnMapTrace
from .station_state import OnStationState
from .stats import OnStats, ReportStats
from .work_state import OnWorkState
Expand All @@ -24,6 +24,7 @@
"OnMajorMap",
"OnMapInfoV2",
"OnMapSetV2",
"OnMapTrace",
"OnStats",
"OnWorkState",
"ReportStats",
Expand All @@ -42,6 +43,7 @@
OnMajorMap,
OnMapInfoV2,
OnMapSetV2,
OnMapTrace,

OnStationState,

Expand Down
88 changes: 87 additions & 1 deletion deebot_client/messages/json/map/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,31 @@

from typing import TYPE_CHECKING, Any

from deebot_client.events.map import MajorMapEvent, MapInfoEvent, MapSetType
import orjson

from deebot_client.events.map import (
MajorMapEvent,
MapInfoEvent,
MapSetType,
MapTraceEvent,
)
from deebot_client.logging_filter import get_logger
from deebot_client.message import HandlingResult, HandlingState, MessageBodyDataDict
from deebot_client.rs.util import decompress_base64_data

from .cached_map_info import OnCachedMapInfo

if TYPE_CHECKING:
from deebot_client.event_bus import EventBus

_LOGGER = get_logger(__name__)

__all__ = [
"OnCachedMapInfo",
"OnMajorMap",
"OnMapInfoV2",
"OnMapSetV2",
"OnMapTrace",
]


Expand Down Expand Up @@ -91,3 +103,77 @@ def _handle_body_data_dict(
event_bus.notify(MapInfoEvent(map_id=data["mid"], info=data["info"]))

return HandlingResult.success()


class OnMapTrace(MessageBodyDataDict):
"""On map trace message — variant pushed by mower firmwares (e.g. GOAT 1.15.x).

The vacuum-style ``getMapTrace`` response carried ``traceValue`` directly.
Mower firmwares instead push a compressed envelope:

.. code-block:: json

{
"mid": "...", "batid": "...", "serial": "1",
"index": "0", "type": "4",
"info": "<base64 of LZMA-compressed JSON>",
"infoSize": 3455
}

The decompressed payload is itself a small JSON document of the form
``[[group_id, "0;x1,y1;x2,y2;..."], ...]`` where each group is a
contiguous trajectory segment. We flatten the points across groups
into the ``"x,y;x,y;..."`` shape that downstream consumers (the
``Map`` Rust helper in particular) already accept.

Used in lieu of the legacy ``GetMapTrace`` fallback for these devices.
"""

NAME = "onMapTrace"

@classmethod
def _handle_body_data_dict(
cls, event_bus: EventBus, data: dict[str, Any]
) -> HandlingResult:
"""Handle message->body->data and notify the correct event subscribers."""
info = data.get("info")
if not info:
# Not the compressed-envelope variant — leave to legacy GetMapTrace
return HandlingResult.analyse()

try:
decompressed = decompress_base64_data(info)
groups = orjson.loads(decompressed)
except Exception:
_LOGGER.debug("Could not decompress/parse onMapTrace info field")
return HandlingResult.analyse()

flat_points: list[str] = []
for group in groups:
# Each group: [group_id_str, "0;x1,y1;x2,y2;...;", "0;x,y;..." ...]
if not isinstance(group, list):
continue
for segment in group[1:]:
if not isinstance(segment, str):
continue
# Drop the leading "0" anchor and any empty trailing segment
pts = [p for p in segment.split(";") if p and p != "0"]
flat_points.extend(pts)

if not flat_points:
return HandlingResult.analyse()

# Use serial as a stable monotonic ``start`` so ``Map`` does not clear
# the trace on every push — only the very first ever (serial == 0)
# would trigger the reset, which is the firmware's intent.
# fmt: off
try:
start = int(data.get("serial", 1))
except (TypeError, ValueError):
start = 1
# fmt: on

event_bus.notify(
MapTraceEvent(start=start, total=start, data=";".join(flat_points))
)
return HandlingResult.success()
98 changes: 98 additions & 0 deletions deebot_client/mower_trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Mower trajectory accumulator and SVG renderer.

Mowers (e.g. Ecovacs GOAT family) do not expose the regular ``map``

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the map capability. MapTraceEvent is also used by the vacuum bots. I also know that mower have a full map, so we should implement that one instead creating a new workaround just for the traces

capability used by vacuums, but their firmware pushes trajectory points
through :class:`~deebot_client.events.map.MapTraceEvent`. This module
keeps the parsing, accumulation and rendering of those points in one
place so consumers (e.g. the Home Assistant integration) only have to
forward the event payload and read back an SVG.
"""

from __future__ import annotations


class MowerMapTrace:
"""Accumulator and SVG renderer for mower trajectory traces."""

MAX_POINTS = 5000

_STROKE_COLOR = "#1976d2"

def __init__(self) -> None:
self._points: list[tuple[int, int]] = []

@property
def has_points(self) -> bool:
"""Return whether any trace points have been accumulated."""
return bool(self._points)

def clear(self) -> None:
"""Drop all accumulated trace points."""
self._points.clear()

def add_data(self, raw: str) -> int:
"""Parse a ``MapTraceEvent.data`` string and accumulate points.

Tokens are ``"x,y"`` separated by ``";"``. Malformed tokens are
skipped silently. The accumulator keeps at most :attr:`MAX_POINTS`
points (FIFO drop). Returns the number of points actually added.
"""
new_points: list[tuple[int, int]] = []
for raw_token in raw.split(";"):
token = raw_token.strip()
if not token:
continue
try:
x_str, y_str = token.split(",")
new_points.append((int(x_str), int(y_str)))
except ValueError:
continue

if not new_points:
return 0

self._points.extend(new_points)
if len(self._points) > self.MAX_POINTS:
self._points = self._points[-self.MAX_POINTS :]
return len(new_points)

def to_svg(self) -> str | None:
"""Render the accumulated trace as an SVG polyline.

Returns ``None`` when no points have been accumulated yet.
"""
if not self._points:
return None

xs = [p[0] for p in self._points]
ys = [p[1] for p in self._points]
min_x, max_x = min(xs), max(xs)
min_y, max_y = min(ys), max(ys)

# 5% padding on the larger of the two dimensions, with a 50-unit floor
# so a near-zero-area trace still has visible margin.
padding = max(50, max(max_x - min_x, max_y - min_y) // 20)
min_x -= padding
max_x += padding
min_y -= padding
max_y += padding

width = max_x - min_x
height = max_y - min_y

# Mower coordinates use bottom-up Y; flip for SVG top-down rendering.
flipped = " ".join(f"{x},{max_y + min_y - y}" for x, y in self._points)

# Stroke scales with width so the line stays visible on tiny lawns
# and isn't a hairline on huge ones.
stroke_width = max(20, width // 200)

return (
f'<svg xmlns="http://www.w3.org/2000/svg" '
f'viewBox="{min_x} {min_y} {width} {height}" '
f'preserveAspectRatio="xMidYMid meet">'
f'<polyline points="{flipped}" fill="none" '
f'stroke="{self._STROKE_COLOR}" stroke-width="{stroke_width}" '
f'stroke-linejoin="round" stroke-linecap="round"/>'
f"</svg>"
)
130 changes: 130 additions & 0 deletions tests/messages/json/map/test_on_map_trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Tests for OnMapTrace — mower variant with LZMA-compressed info field."""

from __future__ import annotations

import pytest

from deebot_client.events import FirmwareEvent
from deebot_client.events.map import MapTraceEvent
from deebot_client.message import HandlingState
from deebot_client.messages.json.map import OnMapTrace
from tests.messages.json import assert_message

# Static pre-encoded payloads (LZMA1 with trimmed header, base64).
# Generated from real GOAT A1600 RTK firmware 1.15.x encoding format.

# '[["7","0;100,200;150,250;"]]' (28 bytes uncompressed)
_SINGLE_GROUP = "XQAABAAcAAAAAC3ghG4jMKGNRtkww/d7MLX33z8usOwaHU2B7///wFIAAA=="

# '[["5","0;-11850,-28849;-11800,-28899;","0;-12850,-23699;-12800,-23750;"],["6","0;-7899,-39700;-7950,-39649;"]]' (110 bytes)
_MULTI_GROUP = "XQAABABuAAAAAC3ghGojMKGNRtiHVVsAoT/QJyYK0+w8iNNfkK1fJciMh2LrturUwS3TNs8H+7FN7dV1zViWeRKpxrkDUNNQG/4bayp4L33u+jJYgA=="

# '[]' (2 bytes)
_EMPTY_GROUPS = "XQAABAACAAAAAC2XXP/////wAAAA"

# '[["1","0;1,2;3,4;"]]' (20 bytes)
_SERIAL_TEST = "XQAABAAUAAAAAC3ghGIjMKGNR5N7rKaBA7QSJbYEb82/P//2SwAA"

# b'not json content' compressed — decompresses to non-JSON
_NON_JSON = "XQAABAAQAAAAADcbyuolm7SQrGkrEp4K2Iwi8deA//3qYAA="


def _envelope(info_b64: str, info_size: int, fw: str = "1.15.13") -> dict:
return {
"header": {
"pri": 1,
"tzm": 120,
"ts": "1777450352860644879",
"fwVer": fw,
"hwVer": "0.0.0",
},
"body": {
"data": {
"mid": "123456789",
"batid": "hmfald",
"serial": "1",
"index": "0",
"type": "4",
"info": info_b64,
"infoSize": info_size,
}
},
}


def test_OnMapTrace_decompresses_and_flattens_groups() -> None:
assert_message(
OnMapTrace,
_envelope(_SINGLE_GROUP, 28),
(
FirmwareEvent("1.15.13"),
MapTraceEvent(start=1, total=1, data="100,200;150,250"),
),
device_class="xmp9ds",
)


def test_OnMapTrace_concatenates_multiple_groups_and_segments() -> None:
expected_trace = "-11850,-28849;-11800,-28899;-12850,-23699;-12800,-23750;-7899,-39700;-7950,-39649"
assert_message(
OnMapTrace,
_envelope(_MULTI_GROUP, 110),
(
FirmwareEvent("1.15.13"),
MapTraceEvent(start=1, total=1, data=expected_trace),
),
device_class="xmp9ds",
)


def test_OnMapTrace_no_info_field_returns_analyse() -> None:
envelope = _envelope("", 0)
envelope["body"]["data"].pop("info")
envelope["body"]["data"].pop("infoSize")
assert_message(
OnMapTrace,
envelope,
FirmwareEvent("1.15.13"),
device_class="xmp9ds",
expected_state=HandlingState.ANALYSE_LOGGED,
)


def test_OnMapTrace_empty_groups_returns_analyse() -> None:
assert_message(
OnMapTrace,
_envelope(_EMPTY_GROUPS, 2),
FirmwareEvent("1.15.13"),
device_class="xmp9ds",
expected_state=HandlingState.ANALYSE_LOGGED,
)


@pytest.mark.parametrize(
"broken_info",
[
"@@@not-base64@@@",
"AQIDBA==", # too short for LZMA header
_NON_JSON,
],
)
def test_OnMapTrace_corrupt_info_returns_analyse(broken_info: str) -> None:
assert_message(
OnMapTrace,
_envelope(broken_info, 0),
FirmwareEvent("1.15.13"),
device_class="xmp9ds",
expected_state=HandlingState.ANALYSE_LOGGED,
)


def test_OnMapTrace_uses_serial_as_event_start() -> None:
envelope = _envelope(_SERIAL_TEST, 20)
envelope["body"]["data"]["serial"] = "42"

assert_message(
OnMapTrace,
envelope,
(FirmwareEvent("1.15.13"), MapTraceEvent(start=42, total=42, data="1,2;3,4")),
device_class="xmp9ds",
)
Loading
Loading