From 77f2b75b0cbc1647dc8b609f514492643a680143 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Kundr=C3=A1t?= Date: Fri, 30 Jan 2026 03:34:21 +0100 Subject: [PATCH 1/2] add context-free parsing of commands and responses I'm writing a KMP protocol sniffer, and for that I need a function which looks at the captured communication log, and returns parsed commands/responses. --- src/pykmp/client.py | 47 ++++++++++++++++++++++ tests/test_commands.py | 90 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 tests/test_commands.py diff --git a/src/pykmp/client.py b/src/pykmp/client.py index 6ca0b0d..9115b70 100644 --- a/src/pykmp/client.py +++ b/src/pykmp/client.py @@ -63,6 +63,18 @@ class EncodedClientResponse(Generic[CCReq_t_co]): DESTINATION_ADDRESS_DEFAULT = constants.DestinationAddress.HEAT_METER.value +@attrs.define(kw_only=True) +class UnknownCidError(codec.BaseCodecError): + """Encountered a command with an unknown command ID""" + + cid: int + raw_data: bytes + + def __str__(self) -> str: # noqa: D105 + pretty_data = f"with data {self.raw_data.hex(' ')}" if len(self.raw_data) else "(no data)" + return f"Unrecognized CID {self.cid:#02x} {pretty_data}" + + @attrs.define(kw_only=True, auto_attribs=False) class ClientCodec: """Wires up the codecs of all layers for communication *to the meter*.""" @@ -110,6 +122,41 @@ def decode( ) return frame.request_cls.get_response_type().decode(application_data) + def decode_command(self, physical_bytes: codec.PhysicalBytes): + data_link_bytes = self.physical_codec_encode.decode(physical_bytes) + data_link_data = self.data_link_codec.decode(data_link_bytes) + application_data = self.application_codec.decode( + data_link_data.application_bytes + ) + matching_commands = [c for c in messages.BaseRequest.__subclasses__() + if getattr(c, 'command_id', None) is not None + and c.command_id == application_data.command_id] + assert len(matching_commands) <= 1 + if len(matching_commands): + return matching_commands[0].decode(application_data) + else: + raise UnknownCidError(cid=application_data.command_id, raw_data=application_data.data) + + def decode_response(self, physical_bytes: codec.PhysicalBytes): + try: + data_link_bytes = self.physical_codec_decode.decode(physical_bytes) + except codec.AckReceivedException as exc: + raise NotImplementedError from exc + + data_link_data = self.data_link_codec.decode(data_link_bytes) + application_data = self.application_codec.decode( + data_link_data.application_bytes + ) + + matching_responses = [r for r in messages.BaseResponse.__subclasses__() + if getattr(r, 'command_id', None) is not None + and r.command_id == application_data.command_id] + assert len(matching_responses) <= 1 + if len(matching_responses): + return matching_responses[0].decode(application_data) + else: + raise UnknownCidError(cid=application_data.command_id, raw_data=application_data.data) + class ClientCommunicator(Protocol): """Wrap the codecs and communication communication with the meter.""" diff --git a/tests/test_commands.py b/tests/test_commands.py new file mode 100644 index 0000000..c065114 --- /dev/null +++ b/tests/test_commands.py @@ -0,0 +1,90 @@ +# SPDX-FileCopyrightText: 2023 Gert van Dijk +# SPDX-FileCopyrightText: 2026 Jan Kundrát +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +from pykmp import messages +from pykmp.client import ( + ClientCodec, + UnknownCidError, +) + +SOME_DESTINATION_ADDRESS = 0x3A +ANOTHER_DESTINATION_ADDRESS = 0x3F + + +@pytest.mark.parametrize( + ("payload", "parsed"), + [ + pytest.param( + '80 3f 01 05 8a 0d', + messages.GetTypeRequest(), + ), + pytest.param( + '80 3f 02 35 e9 0d', + messages.GetSerialRequest(), + ), + pytest.param( + '80 3f 10 02 01 5a 00 9a 1b bf 2b 0d', + messages.GetRegisterRequest(data_raw=bytes.fromhex('02 01 5A 00 9a'), registers=[346, 154]), + ), + pytest.param( + '80 3f 10 01 03 e9 7c d4 0d', + messages.GetRegisterRequest(data_raw=bytes.fromhex('01 03 e9'), registers=[1001]), + ), + pytest.param( + '80 ff ff 1d 0f 0d', + UnknownCidError(cid=0xff, raw_data=b''), + ), + ] +) +def test_blind_command_decoding(payload, parsed) -> None: + communicator = ClientCodec( + destination_address=ANOTHER_DESTINATION_ADDRESS, + ) + raw_bytes = bytes.fromhex(payload) + if isinstance(parsed, Exception): + with pytest.raises(type(parsed)) as excinfo: + decoded = communicator.decode_command(raw_bytes) + assert str(excinfo.value) == str(parsed) + else: + decoded = communicator.decode_command(raw_bytes) + assert decoded == parsed + encoded = communicator.encode(parsed).physical_bytes + assert encoded.hex(' ') == payload + +@pytest.mark.parametrize( + ("payload", "parsed"), + [ + pytest.param( + '40 3f 10 03 e9 33 04 00 00 00 00 00 63 38 0d', + messages.GetRegisterResponse(data_raw=bytes.fromhex('03 e9 33 04 00 00 00 00 00'), + registers={1001: + messages.RegisterData(id_=1001, unit=51, + value=bytes.fromhex('04 00 00 00 00 00'))}), + ), + pytest.param( + '06', + NotImplementedError(), + ), + pytest.param( + '40 ff ff 1d 0f 0d', + UnknownCidError(cid=0xff, raw_data=b''), + ), + ] +) +def test_blind_response_decoding(payload, parsed) -> None: + communicator = ClientCodec( + destination_address=SOME_DESTINATION_ADDRESS, + ) + if isinstance(parsed, Exception): + with pytest.raises(type(parsed)) as excinfo: + decoded = communicator.decode_response(bytes.fromhex(payload)) + assert str(excinfo.value) == str(parsed) + else: + decoded = communicator.decode_response(bytes.fromhex(payload)) + assert decoded == parsed From 9d838532347f36f051a3ec627cc09ab1abad4c47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Kundr=C3=A1t?= Date: Fri, 30 Jan 2026 03:34:21 +0100 Subject: [PATCH 2/2] add a KMP protocol sniffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The input format is the same as what `socat` produces, e.g.: > 2026/01/24 23:37:36.000562669 length=9 from=29553 to=29561 80 3f 10 01 03 e9 7c d4 0d < 2026/01/24 23:37:36.000658765 length=1 from=127635 to=127635 40 < 2026/01/24 23:37:36.000667759 length=1 from=127636 to=127636 3f < 2026/01/24 23:37:36.000676940 length=1 from=127637 to=127637 10 < 2026/01/24 23:37:36.000686033 length=1 from=127638 to=127638 03 < 2026/01/24 23:37:36.000695251 length=1 from=127639 to=127639 e9 < 2026/01/24 23:37:36.000704439 length=1 from=127640 to=127640 33 < 2026/01/24 23:37:36.000713505 length=1 from=127641 to=127641 04 < 2026/01/24 23:37:36.000722680 length=1 from=127642 to=127642 00 < 2026/01/24 23:37:36.000732096 length=1 from=127643 to=127643 04 < 2026/01/24 23:37:36.000741146 length=1 from=127644 to=127644 cf < 2026/01/24 23:37:36.000750320 length=1 from=127645 to=127645 95 < 2026/01/24 23:37:36.000759528 length=1 from=127646 to=127646 76 < 2026/01/24 23:37:36.000768603 length=1 from=127647 to=127647 5a < 2026/01/24 23:37:36.000777851 length=1 from=127648 to=127648 1b < 2026/01/24 23:37:36.000787020 length=1 from=127649 to=127649 bf < 2026/01/24 23:37:36.000796300 length=1 from=127650 to=127650 0d The log can be produced by, e.g.: socat -d -d -x UNIX-CONNECT:path/to/libvirt-serial-pty GOPEN:/dev/real-serial-tty,b1200,rawer,cs8,cstopb=2 The output of that looks like: >>> GetRegisterRequest(data_raw=b'\x01\x03\xe9', registers=[1001]) <<< GetRegisterResponse(data_raw=b'\x03\xe93\x04\x00\x04\xcf\x95v', registers={1001: RegisterData(id_=1001, unit=51, value=b'\x04\x00\x04\xcf\x95v')}) | 1001 → Fabrication No = 80713078 no unit (number) --- sniffer-socat.py | 97 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 sniffer-socat.py diff --git a/sniffer-socat.py b/sniffer-socat.py new file mode 100644 index 0000000..c216ca4 --- /dev/null +++ b/sniffer-socat.py @@ -0,0 +1,97 @@ +import fileinput +import pykmp + +tx_buf = b'' +rx_buf = b'' + +class Mode: + NONE = 0 + READ = 1 + WRITE = 2 +mode = Mode.NONE + +client_codec = pykmp.client.ClientCodec() + +def find_packet(buf, start_mark): + start = buf.find(bytes([start_mark])) + stop = buf.find(bytes([pykmp.constants.ByteCode.STOP.value])) + if start == -1: + return (None, buf) + if stop == -1: + return (None, buf) + if start > 0: + print(f'# !!! extra content before start-of-packet: {buf[0:start].hex()}') + buf = buf[start:] + stop = stop - start + start = 0 + return (buf[start:stop + 1], buf[stop + 1:]) + +def handle_tx(buf): + while True: + packet, buf = find_packet(buf, pykmp.constants.ByteCode.START_TO_METER.value) + if packet is None: + break + try: + print(f'##> {packet.hex("-")} ({len(packet)} bytes)') + parsed = client_codec.decode_command(packet) + print(f'>>> {parsed}') + except pykmp.client.UnknownCidError as e: + print(f'>>> Unknown {e.cid:#02x}: {e.raw_data.hex(" ")}') + except Exception as e: + print(f'>>> {type(e)} {e}') + return buf + +def handle_rx(buf): + while True: + packet, buf = find_packet(buf, pykmp.constants.ByteCode.START_FROM_METER.value) + if packet is None: + break + try: + parsed = client_codec.decode_response(packet) + print(f'##< {packet.hex("-")} ({len(packet)} bytes)') + print(f'<<< {parsed}') + if regs := getattr(parsed, 'registers', None): + for (num, reg) in regs.items(): + name = pykmp.constants.REGISTERS.get(num, f"R-{num}") + output = pykmp.registers.RegisterOutput.from_register_data(reg) + print(f' | {output.to_pretty_line()}') + if register_ids := getattr(parsed, 'register_ids', None): + for rid in register_ids: + print(f' {rid:>4} | {pykmp.constants.REGISTERS.get(rid, "")}') + if log := getattr(parsed, 'log', None): + for i, row in enumerate(log): + for reg in row: + output = pykmp.registers.RegisterOutput.from_register_data(reg) + print(f'{i:>2} | {output.to_pretty_line()}') + except pykmp.client.UnknownCidError as e: + print(f'<<< Unknown {e.cid:#02x}: {e.raw_data.hex(" ")}') + if e.cid == 0xb8: + print(f' | len: {len(e.raw_data)}') + except Exception as e: + print(f'<<< {type(e)} {e}') + return buf + +for line in fileinput.input(): + if len(line.strip()) == 0: + pass + elif line.startswith('> '): + mode = Mode.WRITE + elif line.startswith('< '): + mode = Mode.READ + elif line.startswith(' '): + match mode: + case Mode.READ: + rx_buf += bytes.fromhex(line) + rx_buf = handle_rx(rx_buf) + case Mode.WRITE: + tx_buf += bytes.fromhex(line) + tx_buf = handle_tx(tx_buf) + case _: + print (f'# !!! unknown mode when handling {line=}') + else: + print(f'# {line=}') + +if count := len(tx_buf): + print(f'!!! unparsed TX data ({count} bytes)') +if count := len(rx_buf): + print(f'!!! unparsed RX data ({count} bytes)')