Skip to content

Commit 6bbe90a

Browse files
Extend subscribe_target_values to use v2 API
1 parent 6419ef1 commit 6bbe90a

1 file changed

Lines changed: 175 additions & 69 deletions

File tree

kuksa-client/kuksa_client/grpc/__init__.py

Lines changed: 175 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
########################################################################
2-
# Copyright (c) 2022 Robert Bosch GmbH
2+
# Copyright (c) 2022-2025 Robert Bosch GmbH
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
@@ -693,10 +693,13 @@ def from_tuple(cls, path: str, dp: types_v2.Datapoint):
693693
# we assume here that only one field of Value is set -> we use the first entry.
694694
# This should always be the case.
695695
data = dp.value.ListFields()
696-
field_descriptor, value = data[0]
697-
field_name = field_descriptor.name
698-
value = getattr(dp.value, field_name)
699-
if dp.timestamp.seconds == 0 and dp.timestamp.nanos == 0:
696+
if data:
697+
field_descriptor, value = data[0]
698+
field_name = field_descriptor.name
699+
value = getattr(dp.value, field_name)
700+
else:
701+
value = None
702+
if dp.timestamp is None or (dp.timestamp.seconds == 0 and dp.timestamp.nanos == 0):
700703
timestamp = None
701704
else:
702705
timestamp = dp.timestamp.ToDatetime(
@@ -750,7 +753,6 @@ def __init__(
750753
connected: bool = False,
751754
tls_server_name: Optional[str] = None,
752755
):
753-
754756
self.authorization_header = self.get_authorization_header(token)
755757
self.target_host = f"{host}:{port}"
756758
self.root_certificates = root_certificates
@@ -868,26 +870,29 @@ def _prepare_subscribe_request(
868870
logger.debug("%s: %s", type(req).__name__, req)
869871
return req
870872

871-
def _prepare_subscribev2_request(
872-
self,
873-
entries: Iterable[SubscribeEntry],
873+
def _prepare_v2_subscribe_request(
874+
self, paths: Iterable[str]
874875
) -> val_v2.SubscribeRequest:
875-
paths = []
876-
for entry in entries:
877-
paths.append(entry.path)
876+
req = val_v2.SubscribeRequest(signal_paths=paths)
877+
logger.debug("%s: %s", type(req).__name__, req)
878+
return req
878879

879-
for field in entry.fields:
880-
if field != Field.VALUE:
881-
raise VSSClientError(
882-
error={
883-
"code": grpc.StatusCode.INVALID_ARGUMENT.value[0],
884-
"reason": grpc.StatusCode.INVALID_ARGUMENT.value[1],
885-
"message": "Cannot use v2 if specifiying fields other than value",
886-
},
887-
errors=[],
888-
)
880+
def _prepare_v2_provide_actuation_request(
881+
self,
882+
paths: Iterable[str],
883+
) -> List[val_v2.OpenProviderStreamRequest]:
884+
signals = []
885+
for path in paths:
886+
signals.append(types_v2.SignalID(path=path))
887+
provide_req = val_v2.ProvideActuationRequest(actuator_identifiers=signals)
888+
req = val_v2.OpenProviderStreamRequest(provide_actuation_request=provide_req)
889+
logger.debug("%s: %s", type(req).__name__, req)
890+
return [req]
889891

890-
req = val_v2.SubscribeRequest(signal_paths=paths)
892+
def _prepare_v2_list_metadata_request(
893+
self, path: str
894+
) -> val_v2.ListMetadataRequest:
895+
req = val_v2.ListMetadataRequest(root=path)
891896
logger.debug("%s: %s", type(req).__name__, req)
892897
return req
893898

@@ -947,6 +952,8 @@ def __init__(self, *args, **kwargs):
947952
super().__init__(*args, **kwargs)
948953
self.channel = None
949954
self.exit_stack = contextlib.ExitStack()
955+
self.path_to_id_mapping: Dict[str, int] = dict()
956+
self.id_to_path_mapping: Dict[int, str] = dict()
950957

951958
def __enter__(self):
952959
self.connect()
@@ -969,6 +976,9 @@ def wrapper(self, *args, **kwargs):
969976
return wrapper
970977

971978
def connect(self, target_host=None):
979+
self.path_to_id_mapping.clear()
980+
self.id_to_path_mapping.clear()
981+
972982
creds = self._load_creds()
973983
if target_host is None:
974984
target_host = self.target_host
@@ -1161,15 +1171,25 @@ def subscribe_current_values(
11611171
for path, dp in updates.items():
11621172
print(f"Current value for {path} is now: {dp.value}")
11631173
"""
1164-
for updates in self.subscribe(
1165-
entries=(
1166-
SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,))
1167-
for path in paths
1168-
),
1169-
try_v2=True,
1170-
**rpc_kwargs,
1171-
):
1172-
yield {update.entry.path: update.entry.value for update in updates}
1174+
try:
1175+
logger.debug("Try to subscribe current values via v2")
1176+
for updates in self.v2_subscribe(paths, **rpc_kwargs):
1177+
yield {
1178+
update.entry.path: update.entry.value for update in updates
1179+
}
1180+
except RpcError as exc:
1181+
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
1182+
logger.debug("v2 not available - falling back to v1 subscribe current values")
1183+
for updates in self.subscribe(
1184+
entries=(
1185+
SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,))
1186+
for path in paths
1187+
),
1188+
**rpc_kwargs,
1189+
):
1190+
yield {update.entry.path: update.entry.value for update in updates}
1191+
else:
1192+
raise VSSClientError.from_grpc_error(exc) from exc
11731193

11741194
@check_connected
11751195
def subscribe_target_values(
@@ -1186,16 +1206,27 @@ def subscribe_target_values(
11861206
for path, dp in updates.items():
11871207
print(f"Target value for {path} is now: {dp.value}")
11881208
"""
1189-
for updates in self.subscribe(
1190-
entries=(
1191-
SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,))
1192-
for path in paths
1193-
),
1194-
**rpc_kwargs,
1195-
):
1196-
yield {
1197-
update.entry.path: update.entry.actuator_target for update in updates
1198-
}
1209+
try:
1210+
logger.debug("Try to subscribe actuation requests via v2")
1211+
for updates in self.v2_subscribe_batch_actuation(paths, **rpc_kwargs):
1212+
yield {
1213+
update.entry.path: update.entry.value for update in updates
1214+
}
1215+
except RpcError as exc:
1216+
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
1217+
logger.debug("v2 not available - falling back to v1 subscribe target values")
1218+
for updates in self.subscribe(
1219+
entries=(
1220+
SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,))
1221+
for path in paths
1222+
),
1223+
**rpc_kwargs,
1224+
):
1225+
yield {
1226+
update.entry.path: update.entry.actuator_target for update in updates
1227+
}
1228+
else:
1229+
raise VSSClientError.from_grpc_error(exc) from exc
11991230

12001231
@check_connected
12011232
def subscribe_metadata(
@@ -1295,6 +1326,41 @@ def set(
12951326
raise VSSClientError.from_grpc_error(exc) from exc
12961327
self._process_set_response(resp)
12971328

1329+
def get_path(self, signal_id: types_v2.SignalID) -> str:
1330+
if signal_id.HasField("path"):
1331+
return signal_id.path
1332+
elif signal_id.HasField("id") and signal_id.id in self.id_to_path_mapping:
1333+
return self.id_to_path_mapping[signal_id.id]
1334+
return "<unknown signal>"
1335+
1336+
def ensure_id_mapping(self, paths: Iterable[str], **rpc_kwargs):
1337+
for path in paths:
1338+
if path not in self.path_to_id_mapping:
1339+
# Prevent duplicate requests for the same path
1340+
self.path_to_id_mapping[path] = None
1341+
req = self._prepare_v2_list_metadata_request(path)
1342+
try:
1343+
resp = self.client_stub_v2.ListMetadata(req, **rpc_kwargs)
1344+
logger.debug("%s: %s", type(resp).__name__, resp)
1345+
if len(resp.metadata) == 1:
1346+
self.path_to_id_mapping[path] = resp.metadata[0].id
1347+
self.id_to_path_mapping[resp.metadata[0].id] = path
1348+
else:
1349+
del self.path_to_id_mapping[path]
1350+
raise VSSClientError(
1351+
error={
1352+
"code": grpc.StatusCode.NOT_FOUND.value[0],
1353+
"reason": grpc.StatusCode.NOT_FOUND.value[1],
1354+
"message": f"Path {path} not found on server",
1355+
},
1356+
errors=[],
1357+
)
1358+
except RpcError as exc:
1359+
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
1360+
logger.debug("v2 not available - skip querying ids")
1361+
return
1362+
raise VSSClientError.from_grpc_error(exc) from exc
1363+
12981364
@check_connected
12991365
def subscribe(
13001366
self, entries: Iterable[SubscribeEntry], try_v2: bool = False, **rpc_kwargs
@@ -1305,36 +1371,76 @@ def subscribe(
13051371
grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials.
13061372
"""
13071373

1374+
if try_v2:
1375+
raise VSSClientError(
1376+
error={
1377+
"code": grpc.StatusCode.INVALID_ARGUMENT.value[0],
1378+
"reason": grpc.StatusCode.INVALID_ARGUMENT.value[1],
1379+
"message": "Method subscribe supports v1, only. Use v2_subscribe or v2_subscribe_batch_actuation instead.",
1380+
},
1381+
errors=[],
1382+
)
1383+
1384+
logger.debug("Try subscribing via v1")
13081385
rpc_kwargs["metadata"] = self.generate_metadata_header(
13091386
rpc_kwargs.get("metadata")
13101387
)
1311-
if try_v2:
1312-
logger.debug("Trying v2")
1313-
req = self._prepare_subscribev2_request(entries)
1314-
resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs)
1315-
try:
1316-
for resp in resp_stream:
1317-
logger.debug("%s: %s", type(resp).__name__, resp)
1318-
yield [
1319-
EntryUpdate.from_tuple(path, dp)
1320-
for path, dp in resp.entries.items()
1321-
]
1322-
except RpcError as exc:
1323-
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
1324-
logger.debug("v2 not available fall back to v1 instead")
1325-
self.subscribe(entries)
1326-
else:
1327-
raise VSSClientError.from_grpc_error(exc) from exc
1328-
else:
1329-
logger.debug("Trying v1")
1330-
req = self._prepare_subscribe_request(entries)
1331-
resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs)
1332-
try:
1333-
for resp in resp_stream:
1334-
logger.debug("%s: %s", type(resp).__name__, resp)
1335-
yield [EntryUpdate.from_message(update) for update in resp.updates]
1336-
except RpcError as exc:
1337-
raise VSSClientError.from_grpc_error(exc) from exc
1388+
req = self._prepare_subscribe_request(entries)
1389+
resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs)
1390+
try:
1391+
for resp in resp_stream:
1392+
logger.debug("%s: %s", type(resp).__name__, resp)
1393+
yield [EntryUpdate.from_message(update) for update in resp.updates]
1394+
except RpcError as exc:
1395+
raise VSSClientError.from_grpc_error(exc) from exc
1396+
1397+
@check_connected
1398+
def v2_subscribe(
1399+
self, paths: Iterable[str], **rpc_kwargs
1400+
) -> Iterator[List[EntryUpdate]]:
1401+
"""
1402+
Parameters:
1403+
rpc_kwargs
1404+
grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials.
1405+
"""
1406+
1407+
logger.debug("Subscribe current values via v2")
1408+
rpc_kwargs["metadata"] = self.generate_metadata_header(
1409+
rpc_kwargs.get("metadata")
1410+
)
1411+
req = self._prepare_v2_subscribe_request(paths)
1412+
resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs)
1413+
for resp in resp_stream:
1414+
logger.debug("%s: %s", type(resp).__name__, resp)
1415+
yield [
1416+
EntryUpdate.from_tuple(path, dp)
1417+
for path, dp in resp.entries.items()
1418+
]
1419+
1420+
@check_connected
1421+
def v2_subscribe_batch_actuation(
1422+
self, paths: Iterable[str], **rpc_kwargs
1423+
) -> Iterator[List[EntryUpdate]]:
1424+
"""
1425+
Parameters:
1426+
rpc_kwargs
1427+
grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials.
1428+
"""
1429+
1430+
logger.debug("Subscribe actuation requests via v2")
1431+
rpc_kwargs["metadata"] = self.generate_metadata_header(
1432+
rpc_kwargs.get("metadata")
1433+
)
1434+
self.ensure_id_mapping(paths, **rpc_kwargs)
1435+
req = self._prepare_v2_provide_actuation_request(paths)
1436+
resp_stream = self.client_stub_v2.OpenProviderStream(iter(req), **rpc_kwargs)
1437+
for resp in resp_stream:
1438+
logger.debug("batch %s: %s", type(resp).__name__, resp)
1439+
if resp.HasField("batch_actuate_stream_request"):
1440+
yield [
1441+
EntryUpdate.from_tuple(self.get_path(actuate_req.signal_id), Datapoint(value=actuate_req.value))
1442+
for actuate_req in resp.batch_actuate_stream_request.actuate_requests
1443+
]
13381444

13391445
@check_connected
13401446
def authorize(self, token: str, **rpc_kwargs) -> str:

0 commit comments

Comments
 (0)