Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions sniffer-socat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import fileinput
import pykmp

tx_buf = b''
rx_buf = b''

class Mode:
NONE = 0
READ = 1
WRITE = 2
mode = Mode.NONE

client_codec = pykmp.client.ClientCodec()

def find_packet(buf, start_mark):
start = buf.find(bytes([start_mark]))
stop = buf.find(bytes([pykmp.constants.ByteCode.STOP.value]))
if start == -1:
return (None, buf)
if stop == -1:
return (None, buf)
if start > 0:
print(f'# !!! extra content before start-of-packet: {buf[0:start].hex()}')
buf = buf[start:]
stop = stop - start
start = 0
return (buf[start:stop + 1], buf[stop + 1:])

def handle_tx(buf):
while True:
packet, buf = find_packet(buf, pykmp.constants.ByteCode.START_TO_METER.value)
if packet is None:
break
try:
print(f'##> {packet.hex("-")} ({len(packet)} bytes)')
parsed = client_codec.decode_command(packet)
print(f'>>> {parsed}')
except pykmp.client.UnknownCidError as e:
print(f'>>> Unknown {e.cid:#02x}: {e.raw_data.hex(" ")}')
except Exception as e:
print(f'>>> {type(e)} {e}')
return buf

def handle_rx(buf):
while True:
packet, buf = find_packet(buf, pykmp.constants.ByteCode.START_FROM_METER.value)
if packet is None:
break
try:
parsed = client_codec.decode_response(packet)
print(f'##< {packet.hex("-")} ({len(packet)} bytes)')
print(f'<<< {parsed}')
if regs := getattr(parsed, 'registers', None):
for (num, reg) in regs.items():
name = pykmp.constants.REGISTERS.get(num, f"R-{num}")
output = pykmp.registers.RegisterOutput.from_register_data(reg)
print(f' | {output.to_pretty_line()}')
if register_ids := getattr(parsed, 'register_ids', None):
for rid in register_ids:
print(f' {rid:>4} | {pykmp.constants.REGISTERS.get(rid, "<unknown>")}')
if log := getattr(parsed, 'log', None):
for i, row in enumerate(log):
for reg in row:
output = pykmp.registers.RegisterOutput.from_register_data(reg)
print(f'{i:>2} | {output.to_pretty_line()}')
except pykmp.client.UnknownCidError as e:
print(f'<<< Unknown {e.cid:#02x}: {e.raw_data.hex(" ")}')
if e.cid == 0xb8:
print(f' | len: {len(e.raw_data)}')
except Exception as e:
print(f'<<< {type(e)} {e}')
return buf

for line in fileinput.input():
if len(line.strip()) == 0:
pass
elif line.startswith('> '):
mode = Mode.WRITE
elif line.startswith('< '):
mode = Mode.READ
elif line.startswith(' '):
match mode:
case Mode.READ:
rx_buf += bytes.fromhex(line)
rx_buf = handle_rx(rx_buf)
case Mode.WRITE:
tx_buf += bytes.fromhex(line)
tx_buf = handle_tx(tx_buf)
case _:
print (f'# !!! unknown mode when handling {line=}')
else:
print(f'# {line=}')

if count := len(tx_buf):
print(f'!!! unparsed TX data ({count} bytes)')
if count := len(rx_buf):
print(f'!!! unparsed RX data ({count} bytes)')
47 changes: 47 additions & 0 deletions src/pykmp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ class EncodedClientResponse(Generic[CCReq_t_co]):
DESTINATION_ADDRESS_DEFAULT = constants.DestinationAddress.HEAT_METER.value


@attrs.define(kw_only=True)
class UnknownCidError(codec.BaseCodecError):
"""Encountered a command with an unknown command ID"""

cid: int
raw_data: bytes

def __str__(self) -> str: # noqa: D105
pretty_data = f"with data {self.raw_data.hex(' ')}" if len(self.raw_data) else "(no data)"
return f"Unrecognized CID {self.cid:#02x} {pretty_data}"


@attrs.define(kw_only=True, auto_attribs=False)
class ClientCodec:
"""Wires up the codecs of all layers for communication *to the meter*."""
Expand Down Expand Up @@ -110,6 +122,41 @@ def decode(
)
return frame.request_cls.get_response_type().decode(application_data)

def decode_command(self, physical_bytes: codec.PhysicalBytes):
data_link_bytes = self.physical_codec_encode.decode(physical_bytes)
data_link_data = self.data_link_codec.decode(data_link_bytes)
application_data = self.application_codec.decode(
data_link_data.application_bytes
)
matching_commands = [c for c in messages.BaseRequest.__subclasses__()
if getattr(c, 'command_id', None) is not None
and c.command_id == application_data.command_id]
assert len(matching_commands) <= 1
if len(matching_commands):
return matching_commands[0].decode(application_data)
else:
raise UnknownCidError(cid=application_data.command_id, raw_data=application_data.data)

def decode_response(self, physical_bytes: codec.PhysicalBytes):
try:
data_link_bytes = self.physical_codec_decode.decode(physical_bytes)
except codec.AckReceivedException as exc:
raise NotImplementedError from exc

data_link_data = self.data_link_codec.decode(data_link_bytes)
application_data = self.application_codec.decode(
data_link_data.application_bytes
)

matching_responses = [r for r in messages.BaseResponse.__subclasses__()
if getattr(r, 'command_id', None) is not None
and r.command_id == application_data.command_id]
assert len(matching_responses) <= 1
if len(matching_responses):
return matching_responses[0].decode(application_data)
else:
raise UnknownCidError(cid=application_data.command_id, raw_data=application_data.data)


class ClientCommunicator(Protocol):
"""Wrap the codecs and communication communication with the meter."""
Expand Down
90 changes: 90 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# SPDX-FileCopyrightText: 2023 Gert van Dijk <github@gertvandijk.nl>
# SPDX-FileCopyrightText: 2026 Jan Kundrát <jkt@jankundrat.com>
#
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import pytest

from pykmp import messages
from pykmp.client import (
ClientCodec,
UnknownCidError,
)

SOME_DESTINATION_ADDRESS = 0x3A
ANOTHER_DESTINATION_ADDRESS = 0x3F


@pytest.mark.parametrize(
("payload", "parsed"),
[
pytest.param(
'80 3f 01 05 8a 0d',
messages.GetTypeRequest(),
),
pytest.param(
'80 3f 02 35 e9 0d',
messages.GetSerialRequest(),
),
pytest.param(
'80 3f 10 02 01 5a 00 9a 1b bf 2b 0d',
messages.GetRegisterRequest(data_raw=bytes.fromhex('02 01 5A 00 9a'), registers=[346, 154]),
),
pytest.param(
'80 3f 10 01 03 e9 7c d4 0d',
messages.GetRegisterRequest(data_raw=bytes.fromhex('01 03 e9'), registers=[1001]),
),
pytest.param(
'80 ff ff 1d 0f 0d',
UnknownCidError(cid=0xff, raw_data=b''),
),
]
)
def test_blind_command_decoding(payload, parsed) -> None:
communicator = ClientCodec(
destination_address=ANOTHER_DESTINATION_ADDRESS,
)
raw_bytes = bytes.fromhex(payload)
if isinstance(parsed, Exception):
with pytest.raises(type(parsed)) as excinfo:
decoded = communicator.decode_command(raw_bytes)
assert str(excinfo.value) == str(parsed)
else:
decoded = communicator.decode_command(raw_bytes)
assert decoded == parsed
encoded = communicator.encode(parsed).physical_bytes
assert encoded.hex(' ') == payload

@pytest.mark.parametrize(
("payload", "parsed"),
[
pytest.param(
'40 3f 10 03 e9 33 04 00 00 00 00 00 63 38 0d',
messages.GetRegisterResponse(data_raw=bytes.fromhex('03 e9 33 04 00 00 00 00 00'),
registers={1001:
messages.RegisterData(id_=1001, unit=51,
value=bytes.fromhex('04 00 00 00 00 00'))}),
),
pytest.param(
'06',
NotImplementedError(),
),
pytest.param(
'40 ff ff 1d 0f 0d',
UnknownCidError(cid=0xff, raw_data=b''),
),
]
)
def test_blind_response_decoding(payload, parsed) -> None:
communicator = ClientCodec(
destination_address=SOME_DESTINATION_ADDRESS,
)
if isinstance(parsed, Exception):
with pytest.raises(type(parsed)) as excinfo:
decoded = communicator.decode_response(bytes.fromhex(payload))
assert str(excinfo.value) == str(parsed)
else:
decoded = communicator.decode_response(bytes.fromhex(payload))
assert decoded == parsed