Skip to content

Commit ce07c1c

Browse files
authored
feat: add parse_advertisement_data_bytes method (#121)
1 parent c51e7ab commit ce07c1c

9 files changed

Lines changed: 307 additions & 85 deletions

File tree

bench/test_parse_gap_tuple.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,5 @@ def test_parse_advertisement_data_tuple(benchmark):
8787

8888

8989
def test_parse_advertisement_data_tuple_uncached(benchmark):
90-
benchmark(lambda: _uncached_parse_advertisement_data(advs))
90+
joined_advs = b"".join(advs)
91+
benchmark(lambda: _uncached_parse_advertisement_data(joined_advs))

src/bluetooth_data_tools/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
BLEGAPAdvertisement,
88
BLEGAPType,
99
parse_advertisement_data,
10+
parse_advertisement_data_bytes,
1011
parse_advertisement_data_tuple,
1112
)
1213
from .privacy import get_cipher_for_irk, resolve_private_address
@@ -30,6 +31,7 @@
3031
"newest_manufacturer_data",
3132
"human_readable_name",
3233
"int_to_bluetooth_address",
34+
"parse_advertisement_data_bytes",
3335
"short_address",
3436
"BLEGAPType",
3537
"BLEGAPAdvertisement",

src/bluetooth_data_tools/gap.pxd

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ import cython
33

44
cdef str BLE_UUID
55

6+
cdef dict _EMPTY_MANUFACTURER_DATA
7+
cdef dict _EMPTY_SERVICE_DATA
8+
cdef list _EMPTY_SERVICE_UUIDS
9+
610
cdef object from_bytes
711
cdef object from_bytes_little
812
cdef object from_bytes_signed
@@ -39,7 +43,6 @@ cdef cython.uint TYPE_TX_POWER_LEVEL
3943
cpdef parse_advertisement_data(object data)
4044

4145
@cython.locals(
42-
gap_bytes="bytes",
4346
gap_data="const unsigned char *",
4447
gap_value=cython.bytes,
4548
gap_type_num="unsigned char",
@@ -48,6 +51,9 @@ cpdef parse_advertisement_data(object data)
4851
offset=cython.uint,
4952
start=cython.uint,
5053
end=cython.uint,
51-
splice_pos=cython.uint,
5254
)
53-
cpdef _uncached_parse_advertisement_data(tuple data)
55+
cpdef _uncached_parse_advertisement_bytes(bytes gap_bytes)
56+
57+
cpdef _uncached_parse_advertisement_data(bytes gap_bytes)
58+
59+
cpdef _uncached_parse_advertisement_tuple(tuple data)

src/bluetooth_data_tools/gap.py

Lines changed: 132 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,17 @@ def _uint32_bytes_as_uuid(uuid32_bytes: bytes_) -> str:
134134
_cached_uint32_bytes_as_uuid = _uint32_bytes_as_uuid
135135

136136

137+
_EMPTY_MANUFACTURER_DATA: dict[int, bytes] = {}
138+
_EMPTY_SERVICE_DATA: dict[str, bytes] = {}
139+
_EMPTY_SERVICE_UUIDS: list[str] = []
140+
141+
137142
@lru_cache(maxsize=256)
138143
def _parse_advertisement_data(
139-
data: tuple[bytes, ...],
144+
data: bytes,
140145
) -> BLEGAPAdvertisement:
141146
"""Parse advertisement data and return a BLEGAPAdvertisement."""
142-
return BLEGAPAdvertisement(*_uncached_parse_advertisement_data(data))
147+
return _uncached_parse_advertisement_data(data)
143148

144149

145150
_cached_parse_advertisement_data = _parse_advertisement_data
@@ -150,97 +155,142 @@ def parse_advertisement_data(
150155
) -> BLEGAPAdvertisement:
151156
"""Parse advertisement data and return a BLEGAPAdvertisement."""
152157
if type(data) is tuple:
153-
return _cached_parse_advertisement_data(data)
154-
return _cached_parse_advertisement_data(tuple(data))
158+
return _cached_parse_advertisement_data(
159+
b"".join(data) if len(data) > 1 else data[0]
160+
)
161+
return _cached_parse_advertisement_data(b"".join(data))
162+
155163

164+
def _uncached_parse_advertisement_data(data: bytes) -> BLEGAPAdvertisement:
165+
return BLEGAPAdvertisement(*_uncached_parse_advertisement_bytes(data))
156166

157-
def _uncached_parse_advertisement_data(
167+
168+
def _uncached_parse_advertisement_tuple(
158169
data: tuple[bytes, ...],
159170
) -> BLEGAPAdvertisementTupleType:
160-
manufacturer_data: dict[int, bytes] = {}
161-
service_data: dict[str, bytes] = {}
162-
service_uuids: list[str] = []
171+
return _uncached_parse_advertisement_bytes(
172+
b"".join(data) if len(data) > 1 else data[0]
173+
)
174+
175+
176+
def _uncached_parse_advertisement_bytes(
177+
gap_bytes: bytes,
178+
) -> BLEGAPAdvertisementTupleType:
179+
manufacturer_data = _EMPTY_MANUFACTURER_DATA
180+
service_data = _EMPTY_SERVICE_DATA
181+
service_uuids = _EMPTY_SERVICE_UUIDS
163182
local_name: str | None = None
164183
tx_power: int | None = None
165184

166-
for gap_bytes in data:
167-
offset = 0
168-
total_length = len(gap_bytes)
169-
gap_data = gap_bytes
170-
# IMPORTANT: All data must be manually bounds checked
171-
# because the data is untrusted and can be malformed.
172-
while offset + 2 < total_length:
173-
if not (length := gap_data[offset]):
174-
offset += 1 # Handle zero padding
175-
continue
176-
if not (gap_type_num := gap_data[offset + 1]):
177-
offset += 1 + length # Skip empty type
178-
continue
179-
start = offset + 2
180-
end = start + length - 1
181-
offset += 1 + length
182-
if end > total_length or end - start <= 0:
183-
_LOGGER.debug(
184-
"Invalid BLE GAP AD structure at offset %s: %s (%s)",
185-
offset,
186-
gap_bytes,
187-
)
188-
continue
189-
if gap_type_num == TYPE_SHORT_LOCAL_NAME and local_name is None:
190-
local_name = gap_data[start:end].decode("utf-8", "replace")
191-
elif gap_type_num == TYPE_COMPLETE_LOCAL_NAME:
192-
local_name = gap_data[start:end].decode("utf-8", "replace")
193-
elif gap_type_num == TYPE_MANUFACTURER_SPECIFIC_DATA:
194-
splice_pos = start + 2
195-
if splice_pos >= total_length or splice_pos >= end:
196-
break
197-
manufacturer_data[gap_data[start] | (gap_data[start + 1] << 8)] = (
198-
gap_data[splice_pos:end]
199-
)
200-
elif gap_type_num in {
201-
TYPE_16BIT_SERVICE_UUID_COMPLETE,
202-
TYPE_16BIT_SERVICE_UUID_MORE_AVAILABLE,
203-
}:
204-
service_uuids.append(_cached_uint16_bytes_as_uuid(gap_data[start:end]))
205-
elif gap_type_num in {
206-
TYPE_128BIT_SERVICE_UUID_MORE_AVAILABLE,
207-
TYPE_128BIT_SERVICE_UUID_COMPLETE,
208-
}:
209-
service_uuids.append(_cached_uint128_bytes_as_uuid(gap_data[start:end]))
210-
elif gap_type_num == TYPE_SERVICE_DATA:
211-
splice_pos = start + 2
212-
if splice_pos >= total_length or splice_pos >= end:
213-
break
214-
service_data[
215-
_cached_uint16_bytes_as_uuid(gap_data[start:splice_pos])
216-
] = gap_data[splice_pos:end]
217-
elif gap_type_num == TYPE_SERVICE_DATA_32BIT_UUID:
218-
splice_pos = start + 4
219-
if splice_pos >= total_length or splice_pos >= end:
220-
break
221-
service_data[
222-
_cached_uint32_bytes_as_uuid(gap_data[start:splice_pos])
223-
] = gap_data[splice_pos:end]
224-
elif gap_type_num == TYPE_SERVICE_DATA_128BIT_UUID:
225-
splice_pos = start + 16
226-
if splice_pos >= total_length or splice_pos >= end:
227-
break
228-
service_data[
229-
_cached_uint128_bytes_as_uuid(gap_data[start:splice_pos])
230-
] = gap_data[splice_pos:end]
231-
elif gap_type_num == TYPE_TX_POWER_LEVEL:
232-
tx_power = _cached_from_bytes_signed(gap_data[start:end])
185+
offset = 0
186+
total_length = len(gap_bytes)
187+
gap_data = gap_bytes
188+
# IMPORTANT: All data must be manually bounds checked
189+
# because the data is untrusted and can be malformed.
190+
while offset + 2 < total_length:
191+
if not (length := gap_data[offset]):
192+
offset += 1 # Handle zero padding
193+
continue
194+
if not (gap_type_num := gap_data[offset + 1]):
195+
offset += 1 + length # Skip empty type
196+
continue
197+
start = offset + 2
198+
end = start + length - 1
199+
offset += 1 + length
200+
if end > total_length or end - start <= 0:
201+
_LOGGER.debug(
202+
"Invalid BLE GAP AD structure at offset %s: %s (%s)",
203+
offset,
204+
gap_bytes,
205+
)
206+
continue
207+
if gap_type_num == TYPE_SHORT_LOCAL_NAME and local_name is None:
208+
local_name = gap_data[start:end].decode("utf-8", "replace")
209+
elif gap_type_num == TYPE_COMPLETE_LOCAL_NAME:
210+
local_name = gap_data[start:end].decode("utf-8", "replace")
211+
elif gap_type_num == TYPE_MANUFACTURER_SPECIFIC_DATA:
212+
splice_pos = start + 2
213+
if splice_pos >= total_length or splice_pos >= end:
214+
break
215+
if manufacturer_data is _EMPTY_MANUFACTURER_DATA:
216+
manufacturer_data = {}
217+
manufacturer_data[gap_data[start] | (gap_data[start + 1] << 8)] = gap_data[
218+
splice_pos:end
219+
]
220+
elif gap_type_num in {
221+
TYPE_16BIT_SERVICE_UUID_COMPLETE,
222+
TYPE_16BIT_SERVICE_UUID_MORE_AVAILABLE,
223+
}:
224+
if service_uuids is _EMPTY_SERVICE_UUIDS:
225+
service_uuids = []
226+
service_uuids.append(_cached_uint16_bytes_as_uuid(gap_data[start:end]))
227+
elif gap_type_num in {
228+
TYPE_128BIT_SERVICE_UUID_MORE_AVAILABLE,
229+
TYPE_128BIT_SERVICE_UUID_COMPLETE,
230+
}:
231+
if service_uuids is _EMPTY_SERVICE_UUIDS:
232+
service_uuids = []
233+
service_uuids.append(_cached_uint128_bytes_as_uuid(gap_data[start:end]))
234+
elif gap_type_num == TYPE_SERVICE_DATA:
235+
splice_pos = start + 2
236+
if splice_pos >= total_length or splice_pos >= end:
237+
break
238+
if service_data is _EMPTY_SERVICE_DATA:
239+
service_data = {}
240+
service_data[_cached_uint16_bytes_as_uuid(gap_data[start:splice_pos])] = (
241+
gap_data[splice_pos:end]
242+
)
243+
elif gap_type_num == TYPE_SERVICE_DATA_32BIT_UUID:
244+
splice_pos = start + 4
245+
if splice_pos >= total_length or splice_pos >= end:
246+
break
247+
if service_data is _EMPTY_SERVICE_DATA:
248+
service_data = {}
249+
service_data[_cached_uint32_bytes_as_uuid(gap_data[start:splice_pos])] = (
250+
gap_data[splice_pos:end]
251+
)
252+
elif gap_type_num == TYPE_SERVICE_DATA_128BIT_UUID:
253+
splice_pos = start + 16
254+
if splice_pos >= total_length or splice_pos >= end:
255+
break
256+
if service_data is _EMPTY_SERVICE_DATA:
257+
service_data = {}
258+
service_data[_cached_uint128_bytes_as_uuid(gap_data[start:splice_pos])] = (
259+
gap_data[splice_pos:end]
260+
)
261+
elif gap_type_num == TYPE_TX_POWER_LEVEL:
262+
tx_power = _cached_from_bytes_signed(gap_data[start:end])
233263

234264
return (local_name, service_uuids, service_data, manufacturer_data, tx_power)
235265

236266

237267
if TYPE_CHECKING:
238268

269+
@lru_cache(maxsize=256)
270+
def parse_advertisement_data_bytes(
271+
gap_bytes: bytes,
272+
) -> BLEGAPAdvertisementTupleType:
273+
"""Parse a tuple of raw advertisement data and return a tuple of BLEGAPAdvertisementTupleType.
274+
275+
The format of the tuple is:
276+
(local_name, service_uuids, service_data, manufacturer_data, tx_power)
277+
278+
This is tightly coupled to bleak. If you are not using bleak
279+
it is recommended to use parse_advertisement_data instead.
280+
281+
local_name: str | None
282+
service_uuids: list[str]
283+
service_data: dict[str, bytes]
284+
manufacturer_data: dict[int, bytes]
285+
tx_power: int | None
286+
"""
287+
return _uncached_parse_advertisement_bytes(gap_bytes)
288+
239289
@lru_cache(maxsize=256)
240290
def parse_advertisement_data_tuple(
241291
data: tuple[bytes, ...],
242292
) -> BLEGAPAdvertisementTupleType:
243-
"""Parse a tuple of raw advertisement data and return a tuple of BLEGAPAdvertisementTupleType.
293+
"""Parse raw advertisement bytes and return a tuple of BLEGAPAdvertisementTupleType.
244294
245295
The format of the tuple is:
246296
(local_name, service_uuids, service_data, manufacturer_data, tx_power)
@@ -254,8 +304,11 @@ def parse_advertisement_data_tuple(
254304
manufacturer_data: dict[int, bytes]
255305
tx_power: int | None
256306
"""
257-
return _uncached_parse_advertisement_data(data)
307+
return _uncached_parse_advertisement_tuple(data)
258308
else:
259-
parse_advertisement_data_tuple = lru_cache(maxsize=1024)(
260-
_uncached_parse_advertisement_data
309+
parse_advertisement_data_bytes = lru_cache(maxsize=256)(
310+
_uncached_parse_advertisement_bytes
311+
)
312+
parse_advertisement_data_tuple = lru_cache(maxsize=256)(
313+
_uncached_parse_advertisement_tuple
261314
)

tests/benchmarks/test_parse_gap.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,5 +82,15 @@
8282
]
8383

8484

85+
def test_parse_advertisement_single_tuple(benchmark: BenchmarkFixture) -> None:
86+
advs_as_single_tuple = (b"".join(advs),)
87+
benchmark(lambda: parse_advertisement_data(advs_as_single_tuple))
88+
89+
90+
def test_parse_advertisement_data_tuple(benchmark: BenchmarkFixture) -> None:
91+
advs_as_tuple = tuple(advs)
92+
benchmark(lambda: parse_advertisement_data(advs_as_tuple))
93+
94+
8595
def test_parse_advertisement_data(benchmark: BenchmarkFixture) -> None:
8696
benchmark(lambda: parse_advertisement_data(advs))

0 commit comments

Comments
 (0)