diff --git a/sniffer-socat.py b/sniffer-socat.py new file mode 100644 index 0000000..c216ca4 --- /dev/null +++ b/sniffer-socat.py @@ -0,0 +1,97 @@ +import fileinput +import pykmp + +tx_buf = b'' +rx_buf = b'' + +class Mode: + NONE = 0 + READ = 1 + WRITE = 2 +mode = Mode.NONE + +client_codec = pykmp.client.ClientCodec() + +def find_packet(buf, start_mark): + start = buf.find(bytes([start_mark])) + stop = buf.find(bytes([pykmp.constants.ByteCode.STOP.value])) + if start == -1: + return (None, buf) + if stop == -1: + return (None, buf) + if start > 0: + print(f'# !!! extra content before start-of-packet: {buf[0:start].hex()}') + buf = buf[start:] + stop = stop - start + start = 0 + return (buf[start:stop + 1], buf[stop + 1:]) + +def handle_tx(buf): + while True: + packet, buf = find_packet(buf, pykmp.constants.ByteCode.START_TO_METER.value) + if packet is None: + break + try: + print(f'##> {packet.hex("-")} ({len(packet)} bytes)') + parsed = client_codec.decode_command(packet) + print(f'>>> {parsed}') + except pykmp.client.UnknownCidError as e: + print(f'>>> Unknown {e.cid:#02x}: {e.raw_data.hex(" ")}') + except Exception as e: + print(f'>>> {type(e)} {e}') + return buf + +def handle_rx(buf): + while True: + packet, buf = find_packet(buf, pykmp.constants.ByteCode.START_FROM_METER.value) + if packet is None: + break + try: + parsed = client_codec.decode_response(packet) + print(f'##< {packet.hex("-")} ({len(packet)} bytes)') + print(f'<<< {parsed}') + if regs := getattr(parsed, 'registers', None): + for (num, reg) in regs.items(): + name = pykmp.constants.REGISTERS.get(num, f"R-{num}") + output = pykmp.registers.RegisterOutput.from_register_data(reg) + print(f' | {output.to_pretty_line()}') + if register_ids := getattr(parsed, 'register_ids', None): + for rid in register_ids: + print(f' {rid:>4} | {pykmp.constants.REGISTERS.get(rid, "")}') + if log := getattr(parsed, 'log', None): + for i, row in enumerate(log): + for reg in row: + output = pykmp.registers.RegisterOutput.from_register_data(reg) + print(f'{i:>2} | {output.to_pretty_line()}') + except pykmp.client.UnknownCidError as e: + print(f'<<< Unknown {e.cid:#02x}: {e.raw_data.hex(" ")}') + if e.cid == 0xb8: + print(f' | len: {len(e.raw_data)}') + except Exception as e: + print(f'<<< {type(e)} {e}') + return buf + +for line in fileinput.input(): + if len(line.strip()) == 0: + pass + elif line.startswith('> '): + mode = Mode.WRITE + elif line.startswith('< '): + mode = Mode.READ + elif line.startswith(' '): + match mode: + case Mode.READ: + rx_buf += bytes.fromhex(line) + rx_buf = handle_rx(rx_buf) + case Mode.WRITE: + tx_buf += bytes.fromhex(line) + tx_buf = handle_tx(tx_buf) + case _: + print (f'# !!! unknown mode when handling {line=}') + else: + print(f'# {line=}') + +if count := len(tx_buf): + print(f'!!! unparsed TX data ({count} bytes)') +if count := len(rx_buf): + print(f'!!! unparsed RX data ({count} bytes)') diff --git a/src/pykmp/client.py b/src/pykmp/client.py index 6ca0b0d..9115b70 100644 --- a/src/pykmp/client.py +++ b/src/pykmp/client.py @@ -63,6 +63,18 @@ class EncodedClientResponse(Generic[CCReq_t_co]): DESTINATION_ADDRESS_DEFAULT = constants.DestinationAddress.HEAT_METER.value +@attrs.define(kw_only=True) +class UnknownCidError(codec.BaseCodecError): + """Encountered a command with an unknown command ID""" + + cid: int + raw_data: bytes + + def __str__(self) -> str: # noqa: D105 + pretty_data = f"with data {self.raw_data.hex(' ')}" if len(self.raw_data) else "(no data)" + return f"Unrecognized CID {self.cid:#02x} {pretty_data}" + + @attrs.define(kw_only=True, auto_attribs=False) class ClientCodec: """Wires up the codecs of all layers for communication *to the meter*.""" @@ -110,6 +122,41 @@ def decode( ) return frame.request_cls.get_response_type().decode(application_data) + def decode_command(self, physical_bytes: codec.PhysicalBytes): + data_link_bytes = self.physical_codec_encode.decode(physical_bytes) + data_link_data = self.data_link_codec.decode(data_link_bytes) + application_data = self.application_codec.decode( + data_link_data.application_bytes + ) + matching_commands = [c for c in messages.BaseRequest.__subclasses__() + if getattr(c, 'command_id', None) is not None + and c.command_id == application_data.command_id] + assert len(matching_commands) <= 1 + if len(matching_commands): + return matching_commands[0].decode(application_data) + else: + raise UnknownCidError(cid=application_data.command_id, raw_data=application_data.data) + + def decode_response(self, physical_bytes: codec.PhysicalBytes): + try: + data_link_bytes = self.physical_codec_decode.decode(physical_bytes) + except codec.AckReceivedException as exc: + raise NotImplementedError from exc + + data_link_data = self.data_link_codec.decode(data_link_bytes) + application_data = self.application_codec.decode( + data_link_data.application_bytes + ) + + matching_responses = [r for r in messages.BaseResponse.__subclasses__() + if getattr(r, 'command_id', None) is not None + and r.command_id == application_data.command_id] + assert len(matching_responses) <= 1 + if len(matching_responses): + return matching_responses[0].decode(application_data) + else: + raise UnknownCidError(cid=application_data.command_id, raw_data=application_data.data) + class ClientCommunicator(Protocol): """Wrap the codecs and communication communication with the meter.""" diff --git a/tests/test_commands.py b/tests/test_commands.py new file mode 100644 index 0000000..c065114 --- /dev/null +++ b/tests/test_commands.py @@ -0,0 +1,90 @@ +# SPDX-FileCopyrightText: 2023 Gert van Dijk +# SPDX-FileCopyrightText: 2026 Jan Kundrát +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +from pykmp import messages +from pykmp.client import ( + ClientCodec, + UnknownCidError, +) + +SOME_DESTINATION_ADDRESS = 0x3A +ANOTHER_DESTINATION_ADDRESS = 0x3F + + +@pytest.mark.parametrize( + ("payload", "parsed"), + [ + pytest.param( + '80 3f 01 05 8a 0d', + messages.GetTypeRequest(), + ), + pytest.param( + '80 3f 02 35 e9 0d', + messages.GetSerialRequest(), + ), + pytest.param( + '80 3f 10 02 01 5a 00 9a 1b bf 2b 0d', + messages.GetRegisterRequest(data_raw=bytes.fromhex('02 01 5A 00 9a'), registers=[346, 154]), + ), + pytest.param( + '80 3f 10 01 03 e9 7c d4 0d', + messages.GetRegisterRequest(data_raw=bytes.fromhex('01 03 e9'), registers=[1001]), + ), + pytest.param( + '80 ff ff 1d 0f 0d', + UnknownCidError(cid=0xff, raw_data=b''), + ), + ] +) +def test_blind_command_decoding(payload, parsed) -> None: + communicator = ClientCodec( + destination_address=ANOTHER_DESTINATION_ADDRESS, + ) + raw_bytes = bytes.fromhex(payload) + if isinstance(parsed, Exception): + with pytest.raises(type(parsed)) as excinfo: + decoded = communicator.decode_command(raw_bytes) + assert str(excinfo.value) == str(parsed) + else: + decoded = communicator.decode_command(raw_bytes) + assert decoded == parsed + encoded = communicator.encode(parsed).physical_bytes + assert encoded.hex(' ') == payload + +@pytest.mark.parametrize( + ("payload", "parsed"), + [ + pytest.param( + '40 3f 10 03 e9 33 04 00 00 00 00 00 63 38 0d', + messages.GetRegisterResponse(data_raw=bytes.fromhex('03 e9 33 04 00 00 00 00 00'), + registers={1001: + messages.RegisterData(id_=1001, unit=51, + value=bytes.fromhex('04 00 00 00 00 00'))}), + ), + pytest.param( + '06', + NotImplementedError(), + ), + pytest.param( + '40 ff ff 1d 0f 0d', + UnknownCidError(cid=0xff, raw_data=b''), + ), + ] +) +def test_blind_response_decoding(payload, parsed) -> None: + communicator = ClientCodec( + destination_address=SOME_DESTINATION_ADDRESS, + ) + if isinstance(parsed, Exception): + with pytest.raises(type(parsed)) as excinfo: + decoded = communicator.decode_response(bytes.fromhex(payload)) + assert str(excinfo.value) == str(parsed) + else: + decoded = communicator.decode_response(bytes.fromhex(payload)) + assert decoded == parsed