Skip to content

Commit 9c4b56d

Browse files
committed
Try adapting async stuff
1 parent 210acd0 commit 9c4b56d

4 files changed

Lines changed: 249 additions & 152 deletions

File tree

kuksa-client/kuksa_client/grpc/__init__.py

Lines changed: 65 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -690,8 +690,8 @@ def from_message(cls, message: val_v1.EntryUpdate):
690690

691691
@classmethod
692692
def from_tuple(cls, path: str, dp: types_v2.Datapoint):
693-
# we assume here that only one field of Value is set -> we use the first entry.
694-
# This should always be the case.
693+
# we assume here that at max one field of Value is set -> we use the first entry.
694+
# If no field is set the value is currently unknown/not avaialable -> set to None.
695695
data = dp.value.ListFields()
696696
if data:
697697
field_descriptor, value = data[0]
@@ -712,6 +712,21 @@ def from_tuple(cls, path: str, dp: types_v2.Datapoint):
712712
fields=[Field(value=types_v1.FIELD_VALUE)],
713713
)
714714

715+
@classmethod
716+
def from_actuate_value(cls, path: str, value: types_v2.Value):
717+
# we assume here that exactly one field of Value is set -> we use the first entry.
718+
# This should always be the case.
719+
data = value.ListFields()
720+
field_descriptor, target_value = data[0]
721+
field_name = field_descriptor.name
722+
target_value = getattr(value, field_name)
723+
return cls(
724+
entry=DataEntry(
725+
path=path, actuator_target=Datapoint(value=target_value)
726+
),
727+
fields=[Field(value=types_v1.FIELD_ACTUATOR_TARGET)],
728+
)
729+
715730
def to_message(self) -> val_v1.EntryUpdate:
716731
message = val_v1.EntryUpdate(entry=self.entry.to_message())
717732
message.fields.extend(field.value for field in self.fields)
@@ -1177,19 +1192,19 @@ def subscribe_current_values(
11771192
yield {
11781193
update.entry.path: update.entry.value for update in updates
11791194
}
1180-
except RpcError as exc:
1181-
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
1182-
logger.debug("v2 not available - falling back to v1 subscribe current values")
1183-
for updates in self.subscribe(
1184-
entries=(
1185-
SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,))
1186-
for path in paths
1187-
),
1188-
**rpc_kwargs,
1189-
):
1190-
yield {update.entry.path: update.entry.value for update in updates}
1191-
else:
1192-
raise VSSClientError.from_grpc_error(exc) from exc
1195+
except VSSClientError as exc:
1196+
if exc.error["code"] != grpc.StatusCode.UNIMPLEMENTED:
1197+
raise
1198+
1199+
logger.debug("v2 not available - falling back to v1 subscribe current values")
1200+
for updates in self.subscribe(
1201+
entries=(
1202+
SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,))
1203+
for path in paths
1204+
),
1205+
**rpc_kwargs,
1206+
):
1207+
yield {update.entry.path: update.entry.value for update in updates}
11931208

11941209
@check_connected
11951210
def subscribe_target_values(
@@ -1210,23 +1225,23 @@ def subscribe_target_values(
12101225
logger.debug("Try to subscribe actuation requests via v2")
12111226
for updates in self.v2_subscribe_batch_actuation(paths, **rpc_kwargs):
12121227
yield {
1213-
update.entry.path: update.entry.value for update in updates
1228+
update.entry.path: update.entry.actuator_target for update in updates
1229+
}
1230+
except VSSClientError as exc:
1231+
if exc.error["code"] != grpc.StatusCode.UNIMPLEMENTED:
1232+
raise
1233+
1234+
logger.debug("v2 not available - falling back to v1 subscribe target values")
1235+
for updates in self.subscribe(
1236+
entries=(
1237+
SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,))
1238+
for path in paths
1239+
),
1240+
**rpc_kwargs,
1241+
):
1242+
yield {
1243+
update.entry.path: update.entry.actuator_target for update in updates
12141244
}
1215-
except RpcError as exc:
1216-
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
1217-
logger.debug("v2 not available - falling back to v1 subscribe target values")
1218-
for updates in self.subscribe(
1219-
entries=(
1220-
SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,))
1221-
for path in paths
1222-
),
1223-
**rpc_kwargs,
1224-
):
1225-
yield {
1226-
update.entry.path: update.entry.actuator_target for update in updates
1227-
}
1228-
else:
1229-
raise VSSClientError.from_grpc_error(exc) from exc
12301245

12311246
@check_connected
12321247
def subscribe_metadata(
@@ -1411,12 +1426,15 @@ def v2_subscribe(
14111426
)
14121427
req = self._prepare_v2_subscribe_request(paths)
14131428
resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs)
1414-
for resp in resp_stream:
1415-
logger.debug("%s: %s", type(resp).__name__, resp)
1416-
yield [
1417-
EntryUpdate.from_tuple(path, dp)
1418-
for path, dp in resp.entries.items()
1419-
]
1429+
try:
1430+
for resp in resp_stream:
1431+
logger.debug("%s: %s", type(resp).__name__, resp)
1432+
yield [
1433+
EntryUpdate.from_tuple(path, dp)
1434+
for path, dp in resp.entries.items()
1435+
]
1436+
except RpcError as exc:
1437+
raise VSSClientError.from_grpc_error(exc) from exc
14201438

14211439
@check_connected
14221440
def v2_subscribe_batch_actuation(
@@ -1435,13 +1453,16 @@ def v2_subscribe_batch_actuation(
14351453
self.ensure_id_mapping(paths, **rpc_kwargs)
14361454
req = self._prepare_v2_provide_actuation_request(paths)
14371455
resp_stream = self.client_stub_v2.OpenProviderStream(iter(req), **rpc_kwargs)
1438-
for resp in resp_stream:
1439-
logger.debug("batch %s: %s", type(resp).__name__, resp)
1440-
if resp.HasField("batch_actuate_stream_request"):
1441-
yield [
1442-
EntryUpdate.from_tuple(self.get_path(actuate_req.signal_id), Datapoint(value=actuate_req.value))
1443-
for actuate_req in resp.batch_actuate_stream_request.actuate_requests
1444-
]
1456+
try:
1457+
for resp in resp_stream:
1458+
logger.debug("batch %s: %s", type(resp).__name__, resp)
1459+
if resp.HasField("batch_actuate_stream_request"):
1460+
yield [
1461+
EntryUpdate.from_actuate_value(self.get_path(actuate_req.signal_id), actuate_req.value)
1462+
for actuate_req in resp.batch_actuate_stream_request.actuate_requests
1463+
]
1464+
except RpcError as exc:
1465+
raise VSSClientError.from_grpc_error(exc) from exc
14451466

14461467
@check_connected
14471468
def authorize(self, token: str, **rpc_kwargs) -> str:

0 commit comments

Comments
 (0)