Skip to content

Commit 776d131

Browse files
committed
Fix wildcard regression in subscribe_current_values on v2 (closes #53)
The v2 Subscribe RPC only accepts fully-qualified leaf paths, so passing a branch path like `['Vehicle']` — which v1 Subscribe accepted as a wildcard — now fails with NOT_FOUND since 0.5.1. This breaks existing user code that worked on 0.4.x. Restore the prior behavior by catching NOT_FOUND from v2 Subscribe and retrying with paths expanded via ListMetadata(root=path). Leaf paths continue on the fast path (no extra RPC). The UNIMPLEMENTED fallback to v1 Subscribe is preserved for old databrokers. - kuksa-client/kuksa_client/grpc/aio.py: async path, new _expand_v2_branch_paths helper. - kuksa-client/kuksa_client/grpc/__init__.py: sync mirror of the same change. - kuksa-client/tests/test_grpc.py: new test asserting branch-path expansion on NOT_FOUND. Trailing `.*` in paths (e.g. `Vehicle.Cabin.*`) is stripped before lookup so both the idiomatic v1 form and the explicit wildcard form work. Signed-off-by: Komada (aki1770-del) <aki1770@gmail.com>
1 parent 54b46fd commit 776d131

3 files changed

Lines changed: 182 additions & 8 deletions

File tree

kuksa-client/kuksa_client/grpc/__init__.py

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,13 +1188,32 @@ def subscribe_current_values(
11881188
]):
11891189
for path, dp in updates.items():
11901190
print(f"Current value for {path} is now: {dp.value}")
1191+
1192+
Branch paths (e.g. ``['Vehicle']`` or ``['Vehicle.Cabin.*']``) are
1193+
accepted: if the v2 Subscribe RPC rejects a path with NOT_FOUND, the
1194+
paths are expanded via ListMetadata and the subscription is retried
1195+
with the resulting leaf signals. This restores the wildcard semantics
1196+
that v1 provided natively.
11911197
"""
1198+
paths = list(paths)
11921199
try:
11931200
logger.debug("Try to subscribe current values via v2")
1194-
for updates in self.v2_subscribe(paths, **rpc_kwargs):
1195-
yield {
1196-
update.entry.path: update.entry.value for update in updates
1197-
}
1201+
try:
1202+
for updates in self.v2_subscribe(paths, **rpc_kwargs):
1203+
yield {
1204+
update.entry.path: update.entry.value for update in updates
1205+
}
1206+
except VSSClientError as exc:
1207+
if exc.error["code"] != grpc.StatusCode.NOT_FOUND.value[0]:
1208+
raise
1209+
logger.debug(
1210+
"v2 Subscribe returned NOT_FOUND; expanding branch paths via ListMetadata"
1211+
)
1212+
expanded = self._expand_v2_branch_paths(paths, **rpc_kwargs)
1213+
for updates in self.v2_subscribe(expanded, **rpc_kwargs):
1214+
yield {
1215+
update.entry.path: update.entry.value for update in updates
1216+
}
11981217
except VSSClientError as exc:
11991218
if exc.error["code"] != grpc.StatusCode.UNIMPLEMENTED.value[0]:
12001219
raise
@@ -1351,6 +1370,43 @@ def get_path(self, signal_id: types_v2.SignalID) -> str:
13511370
return self.id_to_path_mapping[signal_id.id]
13521371
return "<unknown signal>"
13531372

1373+
def _expand_v2_branch_paths(
1374+
self, paths: Iterable[str], **rpc_kwargs
1375+
) -> List[str]:
1376+
"""Expand branch / wildcard paths into concrete leaf signals.
1377+
1378+
For each input path, calls ``ListMetadata(root=path)`` and returns
1379+
every signal path beneath that branch. Leaf paths pass through
1380+
(ListMetadata returns a single entry). Non-existent paths surface
1381+
as NOT_FOUND. Trailing ``.*`` suffixes are stripped before lookup.
1382+
1383+
Order is preserved and duplicates (from overlapping branches) are
1384+
removed. Used to restore v1-style wildcard semantics on top of the
1385+
v2 Subscribe RPC, which only accepts fully-qualified leaf paths.
1386+
"""
1387+
rpc_kwargs["metadata"] = self.generate_metadata_header(
1388+
rpc_kwargs.get("metadata")
1389+
)
1390+
expanded: List[str] = []
1391+
for path in paths:
1392+
lookup = path[:-2] if path.endswith(".*") else path
1393+
req = self._prepare_v2_list_metadata_request(lookup)
1394+
try:
1395+
resp = self.client_stub_v2.ListMetadata(req, **rpc_kwargs)
1396+
except RpcError as exc:
1397+
raise VSSClientError.from_grpc_error(exc) from exc
1398+
if not resp.metadata:
1399+
raise VSSClientError(
1400+
error={
1401+
"code": grpc.StatusCode.NOT_FOUND.value[0],
1402+
"reason": grpc.StatusCode.NOT_FOUND.value[1],
1403+
"message": f"Path {path} not found on server",
1404+
},
1405+
errors=[],
1406+
)
1407+
expanded.extend(m.path for m in resp.metadata)
1408+
return list(dict.fromkeys(expanded))
1409+
13541410
def ensure_id_mapping(self, paths: Iterable[str], **rpc_kwargs):
13551411
for path in paths:
13561412
if path not in self.path_to_id_mapping:

kuksa-client/kuksa_client/grpc/aio.py

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,13 +303,32 @@ async def subscribe_current_values(
303303
]):
304304
for path, dp in updates.items():
305305
print(f"Current value for {path} is now: {dp.value}")
306+
307+
Branch paths (e.g. ``['Vehicle']`` or ``['Vehicle.Cabin.*']``) are
308+
accepted: if the v2 Subscribe RPC rejects a path with NOT_FOUND, the
309+
paths are expanded via ListMetadata and the subscription is retried
310+
with the resulting leaf signals. This restores the wildcard semantics
311+
that v1 provided natively.
306312
"""
313+
paths = list(paths)
307314
try:
308315
logger.debug("Try to subscribe current values via v2")
309-
async for updates in self.v2_subscribe(paths=paths, **rpc_kwargs):
310-
yield {
311-
update.entry.path: update.entry.value for update in updates
312-
}
316+
try:
317+
async for updates in self.v2_subscribe(paths=paths, **rpc_kwargs):
318+
yield {
319+
update.entry.path: update.entry.value for update in updates
320+
}
321+
except VSSClientError as exc:
322+
if exc.error["code"] != grpc.StatusCode.NOT_FOUND.value[0]:
323+
raise
324+
logger.debug(
325+
"v2 Subscribe returned NOT_FOUND; expanding branch paths via ListMetadata"
326+
)
327+
expanded = await self._expand_v2_branch_paths(paths, **rpc_kwargs)
328+
async for updates in self.v2_subscribe(paths=expanded, **rpc_kwargs):
329+
yield {
330+
update.entry.path: update.entry.value for update in updates
331+
}
313332
except VSSClientError as exc:
314333
if exc.error["code"] != grpc.StatusCode.UNIMPLEMENTED.value[0]:
315334
raise
@@ -468,6 +487,43 @@ def get_path(self, signal_id: types_v2.SignalID) -> str:
468487
return self.id_to_path_mapping[signal_id.id]
469488
return "<unknown signal>"
470489

490+
async def _expand_v2_branch_paths(
491+
self, paths: Iterable[str], **rpc_kwargs
492+
) -> List[str]:
493+
"""Expand branch / wildcard paths into concrete leaf signals.
494+
495+
For each input path, calls ``ListMetadata(root=path)`` and returns
496+
every signal path beneath that branch. Leaf paths pass through
497+
(ListMetadata returns a single entry). Non-existent paths surface
498+
as NOT_FOUND. Trailing ``.*`` suffixes are stripped before lookup.
499+
500+
Order is preserved and duplicates (from overlapping branches) are
501+
removed. Used to restore v1-style wildcard semantics on top of the
502+
v2 Subscribe RPC, which only accepts fully-qualified leaf paths.
503+
"""
504+
rpc_kwargs["metadata"] = self.generate_metadata_header(
505+
rpc_kwargs.get("metadata")
506+
)
507+
expanded: List[str] = []
508+
for path in paths:
509+
lookup = path[:-2] if path.endswith(".*") else path
510+
req = self._prepare_v2_list_metadata_request(lookup)
511+
try:
512+
resp = await self.client_stub_v2.ListMetadata(req, **rpc_kwargs)
513+
except AioRpcError as exc:
514+
raise VSSClientError.from_grpc_error(exc) from exc
515+
if not resp.metadata:
516+
raise VSSClientError(
517+
error={
518+
"code": grpc.StatusCode.NOT_FOUND.value[0],
519+
"reason": grpc.StatusCode.NOT_FOUND.value[1],
520+
"message": f"Path {path} not found on server",
521+
},
522+
errors=[],
523+
)
524+
expanded.extend(m.path for m in resp.metadata)
525+
return list(dict.fromkeys(expanded))
526+
471527
async def ensure_id_mapping(self, paths: Iterable[str], **rpc_kwargs):
472528
for path in paths:
473529
if path not in self.path_to_id_mapping:

kuksa-client/tests/test_grpc.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -670,6 +670,68 @@ async def subscribe_response_stream(**kwargs):
670670
'Vehicle.Chassis.Height': Datapoint(666),
671671
}
672672

673+
async def test_subscribe_current_values_branch_path_expansion(
674+
self, mocker, unused_tcp_port,
675+
):
676+
"""Branch path (e.g. 'Vehicle') is rejected by v2 Subscribe with
677+
NOT_FOUND; client falls back to ListMetadata expansion and retries
678+
with the resulting leaf signals (restores pre-0.5.1 semantics for
679+
issue #53)."""
680+
client = VSSClient('127.0.0.1', unused_tcp_port)
681+
client.connected = True # To bypass connection check
682+
683+
not_found = VSSClientError(
684+
error={
685+
"code": grpc.StatusCode.NOT_FOUND.value[0],
686+
"reason": grpc.StatusCode.NOT_FOUND.value[1],
687+
"message": "Path not found",
688+
},
689+
errors=[],
690+
)
691+
692+
call_count = {"v2_subscribe": 0}
693+
694+
async def v2_subscribe_side_effect(paths, **kwargs):
695+
call_count["v2_subscribe"] += 1
696+
if call_count["v2_subscribe"] == 1:
697+
# First call with branch path — server rejects
698+
raise not_found
699+
# Second call with expanded leaves — yield real data
700+
yield [
701+
EntryUpdate(DataEntry(
702+
'Vehicle.Speed', value=Datapoint(42.0),
703+
), (Field.VALUE,)),
704+
EntryUpdate(DataEntry(
705+
'Vehicle.ADAS.ABS.IsActive', value=Datapoint(True),
706+
), (Field.VALUE,)),
707+
]
708+
mocker.patch.object(
709+
client, 'v2_subscribe', side_effect=v2_subscribe_side_effect,
710+
)
711+
712+
async def expand_side_effect(paths, **kwargs):
713+
# Simulate ListMetadata resolving 'Vehicle' to two concrete leaves
714+
assert list(paths) == ['Vehicle']
715+
return ['Vehicle.Speed', 'Vehicle.ADAS.ABS.IsActive']
716+
mocker.patch.object(
717+
client, '_expand_v2_branch_paths', side_effect=expand_side_effect,
718+
)
719+
720+
received_updates: Dict[str, Datapoint] = {}
721+
async for updates in client.subscribe_current_values(['Vehicle']):
722+
received_updates.update(updates)
723+
724+
assert call_count["v2_subscribe"] == 2
725+
# First call used the original branch path; retry used expanded leaves.
726+
first_paths = list(client.v2_subscribe.call_args_list[0][1]['paths'])
727+
second_paths = list(client.v2_subscribe.call_args_list[1][1]['paths'])
728+
assert first_paths == ['Vehicle']
729+
assert second_paths == ['Vehicle.Speed', 'Vehicle.ADAS.ABS.IsActive']
730+
assert received_updates == {
731+
'Vehicle.Speed': Datapoint(42.0),
732+
'Vehicle.ADAS.ABS.IsActive': Datapoint(True),
733+
}
734+
673735
async def test_subscribe_target_values(self, mocker, unused_tcp_port):
674736
client = VSSClient('127.0.0.1', unused_tcp_port)
675737
client.connected = True # To bypass connection check

0 commit comments

Comments
 (0)