22
33from __future__ import annotations
44
5- import base64
6- import json as _json
7- import lzma
8-
95import pytest
106
117from deebot_client .events import FirmwareEvent
1410from deebot_client .messages .json .map import OnMapTrace
1511from tests .messages .json import assert_message
1612
13+ # Static pre-encoded payloads (LZMA1 with trimmed header, base64).
14+ # Generated from real GOAT A1600 RTK firmware 1.15.x encoding format.
1715
18- # ---------------------------------------------------------------------------
19- # Helper: encode bytes the same way mower firmware does
20- # ---------------------------------------------------------------------------
21- #
22- # Real GOAT A1600 RTK firmware 1.15.x emits an LZMA1 stream with a
23- # *trimmed* 9-byte header (props + dict_size_le + size_le_4_bytes) instead
24- # of the standard LZMA1 ALONE 13-byte header. The Rust helper
25- # ``decompress_base64_data`` accommodates this by injecting four zero
26- # bytes after position 7 to rebuild a valid 13-byte header. Reproduce
27- # that on the encoding side here so the round-trip works.
16+ # '[["7","0;100,200;150,250;"]]' (28 bytes uncompressed)
17+ _SINGLE_GROUP = "XQAABAAcAAAAAC3ghG4jMKGNRtkww/d7MLX33z8usOwaHU2B7///wFIAAA=="
2818
29- _DICT_SIZE = 1 << 18 # 256 KB — matches what real firmware advertises
19+ # '[["5","0;-11850,-28849;-11800,-28899;","0;-12850,-23699;-12800,-23750;"],["6","0;-7899,-39700;-7950,-39649;"]]' (110 bytes)
20+ _MULTI_GROUP = "XQAABABuAAAAAC3ghGojMKGNRtiHVVsAoT/QJyYK0+w8iNNfkK1fJciMh2LrturUwS3TNs8H+7FN7dV1zViWeRKpxrkDUNNQG/4bayp4L33u+jJYgA=="
3021
22+ # '[]' (2 bytes)
23+ _EMPTY_GROUPS = "XQAABAACAAAAAC2XXP/////wAAAA"
3124
32- def _ecovacs_encode (payload : bytes ) -> str :
33- raw = lzma .compress (
34- payload ,
35- format = lzma .FORMAT_RAW ,
36- filters = [{"id" : lzma .FILTER_LZMA1 ,
37- "preset" : lzma .PRESET_DEFAULT ,
38- "dict_size" : _DICT_SIZE }],
39- )
40- header = bytes ([0x5D ]) + _DICT_SIZE .to_bytes (4 , "little" ) + len (payload ).to_bytes (4 , "little" )
41- return base64 .b64encode (header + raw ).decode ("ascii" )
25+ # '[["1","0;1,2;3,4;"]]' (20 bytes)
26+ _SERIAL_TEST = "XQAABAAUAAAAAC3ghGIjMKGNR5N7rKaBA7QSJbYEb82/P//2SwAA"
27+
28+ # b'not json content' compressed — decompresses to non-JSON
29+ _NON_JSON = "XQAABAAQAAAAADcbyuolm7SQrGkrEp4K2Iwi8deA//3qYAA="
4230
4331
4432def _envelope (info_b64 : str , info_size : int , fw : str = "1.15.13" ) -> dict :
@@ -64,50 +52,28 @@ def _envelope(info_b64: str, info_size: int, fw: str = "1.15.13") -> dict:
6452 }
6553
6654
67- # ---------------------------------------------------------------------------
68- # Happy path
69- # ---------------------------------------------------------------------------
70-
71-
7255def test_OnMapTrace_decompresses_and_flattens_groups () -> None :
73- inner = '[["7","0;100,200;150,250;"]]'
74- data = inner .encode ("utf-8" )
75- info = _ecovacs_encode (data )
76-
77- expected_trace = "100,200;150,250"
7856 assert_message (
7957 OnMapTrace ,
80- _envelope (info , len ( data ) ),
81- (FirmwareEvent ("1.15.13" ), MapTraceEvent (start = 1 , total = 1 , data = expected_trace )),
58+ _envelope (_SINGLE_GROUP , 28 ),
59+ (FirmwareEvent ("1.15.13" ), MapTraceEvent (start = 1 , total = 1 , data = "100,200;150,250" )),
8260 device_class = "xmp9ds" ,
8361 )
8462
8563
8664def test_OnMapTrace_concatenates_multiple_groups_and_segments () -> None :
87- inner = (
88- '[["5","0;-11850,-28849;-11800,-28899;","0;-12850,-23699;-12800,-23750;"],'
89- '["6","0;-7899,-39700;-7950,-39649;"]]'
90- )
91- data = inner .encode ("utf-8" )
92- info = _ecovacs_encode (data )
9365 expected_trace = (
9466 "-11850,-28849;-11800,-28899;-12850,-23699;-12800,-23750;-7899,-39700;-7950,-39649"
9567 )
9668 assert_message (
9769 OnMapTrace ,
98- _envelope (info , len ( data ) ),
70+ _envelope (_MULTI_GROUP , 110 ),
9971 (FirmwareEvent ("1.15.13" ), MapTraceEvent (start = 1 , total = 1 , data = expected_trace )),
10072 device_class = "xmp9ds" ,
10173 )
10274
10375
104- # ---------------------------------------------------------------------------
105- # Edge cases
106- # ---------------------------------------------------------------------------
107-
108-
10976def test_OnMapTrace_no_info_field_returns_analyse () -> None :
110- """Without ``info``, defer to the legacy GetMapTrace handler — analyse, no event."""
11177 envelope = _envelope ("" , 0 )
11278 envelope ["body" ]["data" ].pop ("info" )
11379 envelope ["body" ]["data" ].pop ("infoSize" )
@@ -121,13 +87,9 @@ def test_OnMapTrace_no_info_field_returns_analyse() -> None:
12187
12288
12389def test_OnMapTrace_empty_groups_returns_analyse () -> None :
124- """A payload with no actual coordinates is unusable — analyse rather than emit empty event."""
125- inner = "[]"
126- data = inner .encode ("utf-8" )
127- info = _ecovacs_encode (data )
12890 assert_message (
12991 OnMapTrace ,
130- _envelope (info , len ( data ) ),
92+ _envelope (_EMPTY_GROUPS , 2 ),
13193 FirmwareEvent ("1.15.13" ),
13294 device_class = "xmp9ds" ,
13395 expected_state = HandlingState .ANALYSE_LOGGED ,
@@ -137,15 +99,12 @@ def test_OnMapTrace_empty_groups_returns_analyse() -> None:
13799@pytest .mark .parametrize (
138100 "broken_info" ,
139101 [
140- "@@@not-base64@@@" , # invalid base64
141- base64 . b64encode ( b"too short" ). decode ( "ascii" ) , # too short for LZMA header
142- _ecovacs_encode ( b"not json content" ), # decompresses to non-JSON
102+ "@@@not-base64@@@" ,
103+ "AQIDBA==" , # too short for LZMA header
104+ _NON_JSON ,
143105 ],
144106)
145107def test_OnMapTrace_corrupt_info_returns_analyse (broken_info : str ) -> None :
146- """Truncated/broken ``info`` (typical when upstream loggers truncate the payload)
147- should not raise — log at debug and return analyse so the message is dropped silently.
148- """
149108 assert_message (
150109 OnMapTrace ,
151110 _envelope (broken_info , 0 ),
@@ -156,12 +115,7 @@ def test_OnMapTrace_corrupt_info_returns_analyse(broken_info: str) -> None:
156115
157116
158117def test_OnMapTrace_uses_serial_as_event_start () -> None :
159- """``serial`` becomes the ``MapTraceEvent.start`` so the Map helper does not clear
160- on every push (it only clears when ``start == 0``)."""
161- inner = '[["1","0;1,2;3,4;"]]'
162- data = inner .encode ("utf-8" )
163- info = _ecovacs_encode (data )
164- envelope = _envelope (info , len (data ))
118+ envelope = _envelope (_SERIAL_TEST , 20 )
165119 envelope ["body" ]["data" ]["serial" ] = "42"
166120
167121 assert_message (
0 commit comments