1010
1111"""
1212
13+ import logging
1314from enum import IntEnum
1415from struct import pack , unpack , unpack_from
15- from typing import Literal , Tuple
16+ from typing import Literal , Tuple , cast
1617
17- from .constants import PybricksBroadcast , PybricksBroadcastValue
18+ from .constants import PybricksBroadcast , PybricksBroadcastData , PybricksBroadcastValue
19+
20+ logger = logging .getLogger (__name__ )
1821
1922
2023def decode_message (
@@ -30,24 +33,32 @@ def decode_message(
3033 or a tuple.
3134 """
3235
36+ logger .debug (f"decoding[{ len (data )} ]: { data !r} " )
37+
3338 # idx 0 is the channel
3439 channel : int = unpack_from ("<B" , data )[0 ] # uint8
40+ logger .debug (f"channel: { channel } " )
3541 # idx 1 is the message start
3642 idx = 1
37- decoded_data = []
43+ decoded_data : list [ PybricksBroadcastValue ] = []
3844 single_object = False
3945
4046 while idx < len (data ):
4147 idx , val = _decode_next_value (idx , data )
4248 if val is None :
49+ logger .debug (f"data[{ len (decoded_data )} ]: SINGLE_OBJECT marker" )
4350 single_object = True
4451 else :
52+ logger .debug (f"data[{ len (decoded_data )} ] of { type (val )!s:<15} : { val !r} " )
4553 decoded_data .append (val )
4654
4755 if single_object :
48- return PybricksBroadcast (channel , decoded_data [0 ])
56+ decoded_value : PybricksBroadcastValue = decoded_data [0 ]
57+ return PybricksBroadcast (channel , decoded_value )
4958 else :
50- return PybricksBroadcast (channel , tuple (decoded_data )) # type: ignore # https://github.com/python/mypy/issues/7509
59+ return PybricksBroadcast (
60+ channel , cast (PybricksBroadcastData , tuple (decoded_data ))
61+ )
5162
5263
5364def encode_message (channel : int = 0 , * values : PybricksBroadcastValue ) -> bytes :
@@ -63,17 +74,22 @@ def encode_message(channel: int = 0, *values: PybricksBroadcastValue) -> bytes:
6374
6475 # idx 0 is the channel
6576 encoded_channel = pack ("<B" , channel )
77+ logger .debug (f"channel: { channel } -> { encoded_channel !r} " )
6678
6779 # idx 1 is the message start
6880 encoded_data = bytearray (encoded_channel )
6981
7082 if len (values ) == 1 :
7183 # set SINGLE_OBJECT marker
7284 header = PybricksBleBroadcastDataType .SINGLE_OBJECT << 5
85+ logger .debug (f"data[{ len (encoded_data )} ]: SINGLE_OBJECT marker" )
7386 encoded_data .append (header )
7487
7588 for val in values :
7689 header , encoded_val = _encode_value (val )
90+ logger .debug (
91+ f"data[{ len (encoded_data )} ] of { type (val )!s} : { val !r} -> ({ header !r} , { encoded_val !r} )"
92+ )
7793 encoded_data .append (header )
7894
7995 if encoded_val is not None :
@@ -85,7 +101,9 @@ def encode_message(channel: int = 0, *values: PybricksBroadcastValue) -> bytes:
85101 f"Payload too large: { len (encoded_data )} bytes (maximum is { OBSERVED_DATA_MAX_SIZE } bytes)"
86102 )
87103
88- return bytes (encoded_data )
104+ message = bytes (encoded_data )
105+ logger .debug (f"encoded[{ len (message )} ]: { message !r} " )
106+ return message
89107
90108
91109def unpack_pnp_id (data : bytes ) -> Tuple [Literal ["BT" , "USB" ], int , int , int ]:
@@ -99,9 +117,14 @@ def unpack_pnp_id(data: bytes) -> Tuple[Literal["BT", "USB"], int, int, int]:
99117 :return: Tuple containing the vendor ID type (`BT` or `USB`), the vendor
100118 ID, the product ID and the product revision.
101119 """
120+ vid_type : int
121+ vid : int
122+ pid : int
123+ rev : int
124+
102125 vid_type , vid , pid , rev = unpack ("<BHHH" , data )
103- vid_type = "BT" if vid_type else "USB"
104- return vid_type , vid , pid , rev
126+
127+ return "BT" if vid_type else "USB" , vid , pid , rev
105128
106129
107130def pack_pnp_id (
0 commit comments