Skip to content

Commit 525742d

Browse files
committed
Try adapting async stuff
1 parent 210acd0 commit 525742d

3 files changed

Lines changed: 199 additions & 109 deletions

File tree

kuksa-client/kuksa_client/grpc/__init__.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -690,8 +690,8 @@ def from_message(cls, message: val_v1.EntryUpdate):
690690

691691
@classmethod
692692
def from_tuple(cls, path: str, dp: types_v2.Datapoint):
693-
# we assume here that only one field of Value is set -> we use the first entry.
694-
# This should always be the case.
693+
# we assume here that at max one field of Value is set -> we use the first entry.
694+
# If no field is set the value is currently unknown/not avaialable -> set to None.
695695
data = dp.value.ListFields()
696696
if data:
697697
field_descriptor, value = data[0]
@@ -712,6 +712,21 @@ def from_tuple(cls, path: str, dp: types_v2.Datapoint):
712712
fields=[Field(value=types_v1.FIELD_VALUE)],
713713
)
714714

715+
@classmethod
716+
def from_actuate_value(cls, path: str, value: types_v2.Value):
717+
# we assume here that exactly one field of Value is set -> we use the first entry.
718+
# This should always be the case.
719+
data = value.ListFields()
720+
field_descriptor, target_value = data[0]
721+
field_name = field_descriptor.name
722+
target_value = getattr(value, field_name)
723+
return cls(
724+
entry=DataEntry(
725+
path=path, actuator_target=Datapoint(value=target_value)
726+
),
727+
fields=[Field(value=types_v1.FIELD_ACTUATOR_TARGET)],
728+
)
729+
715730
def to_message(self) -> val_v1.EntryUpdate:
716731
message = val_v1.EntryUpdate(entry=self.entry.to_message())
717732
message.fields.extend(field.value for field in self.fields)

kuksa-client/kuksa_client/grpc/aio.py

Lines changed: 151 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
from kuksa.val.v1 import val_pb2 as val_v1
3535
from kuksa.val.v1 import val_pb2_grpc as val_grpc_v1
36+
from kuksa.val.v2 import types_pb2 as types_v2
3637
from kuksa.val.v2 import val_pb2_grpc as val_grpc_v2
3738

3839
from . import BaseVSSClient
@@ -57,6 +58,8 @@ def __init__(self, *args, **kwargs):
5758
super().__init__(*args, **kwargs)
5859
self.channel = None
5960
self.exit_stack = contextlib.AsyncExitStack()
61+
self.path_to_id_mapping: Dict[str, int] = dict()
62+
self.id_to_path_mapping: Dict[int, str] = dict()
6063

6164
async def __aenter__(self):
6265
await self.connect()
@@ -66,9 +69,13 @@ async def __aexit__(self, exc_type, exc_value, traceback):
6669
await self.disconnect()
6770

6871
async def connect(self, target_host=None):
72+
self.path_to_id_mapping.clear()
73+
self.id_to_path_mapping.clear()
74+
6975
creds = self._load_creds()
7076
if target_host is None:
7177
target_host = self.target_host
78+
7279
if creds is not None:
7380
logger.info("Establishing secure channel")
7481
if self.tls_server_name:
@@ -297,15 +304,25 @@ async def subscribe_current_values(
297304
for path, dp in updates.items():
298305
print(f"Current value for {path} is now: {dp.value}")
299306
"""
300-
async for updates in self.subscribe(
301-
entries=(
302-
SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,))
303-
for path in paths
304-
),
305-
try_v2=True,
306-
**rpc_kwargs,
307-
):
308-
yield {update.entry.path: update.entry.value for update in updates}
307+
try:
308+
logger.debug("Try to subscribe current values via v2")
309+
async for updates in self.v2_subscribe(paths=paths, **rpc_kwargs):
310+
yield {
311+
update.entry.path: update.entry.value for update in updates
312+
}
313+
except AioRpcError as exc:
314+
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
315+
logger.debug("v2 not available - falling back to v1 subscribe current values")
316+
async for updates in self.subscribe(
317+
entries=(
318+
SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,))
319+
for path in paths
320+
),
321+
**rpc_kwargs,
322+
):
323+
yield {update.entry.path: update.entry.value for update in updates}
324+
else:
325+
raise VSSClientError.from_grpc_error(exc) from exc
309326

310327
@check_connected_async_iter
311328
async def subscribe_target_values(
@@ -322,17 +339,27 @@ async def subscribe_target_values(
322339
for path, dp in updates.items():
323340
print(f"Target value for {path} is now: {dp.value}")
324341
"""
325-
async for updates in self.subscribe(
326-
entries=(
327-
SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,))
328-
for path in paths
329-
),
330-
try_v2=True,
331-
**rpc_kwargs,
332-
):
333-
yield {
334-
update.entry.path: update.entry.actuator_target for update in updates
335-
}
342+
try:
343+
logger.debug("Try to subscribe actuation requests via v2")
344+
async for updates in self.v2_subscribe_batch_actuation(paths=paths, **rpc_kwargs):
345+
yield {
346+
update.entry.path: update.entry.actuator_target for update in updates
347+
}
348+
except AioRpcError as exc:
349+
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
350+
logger.debug("v2 not available - falling back to v1 subscribe target values")
351+
async for updates in self.subscribe(
352+
entries=(
353+
SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,))
354+
for path in paths
355+
),
356+
**rpc_kwargs,
357+
):
358+
yield {
359+
update.entry.path: update.entry.actuator_target for update in updates
360+
}
361+
else:
362+
raise VSSClientError.from_grpc_error(exc) from exc
336363

337364
@check_connected_async_iter
338365
async def subscribe_metadata(
@@ -434,6 +461,41 @@ async def set(
434461
raise VSSClientError.from_grpc_error(exc) from exc
435462
self._process_set_response(resp)
436463

464+
def get_path(self, signal_id: types_v2.SignalID) -> str:
465+
if signal_id.HasField("path"):
466+
return signal_id.path
467+
elif signal_id.HasField("id") and signal_id.id in self.id_to_path_mapping:
468+
return self.id_to_path_mapping[signal_id.id]
469+
return "<unknown signal>"
470+
471+
async def ensure_id_mapping(self, paths: Iterable[str], **rpc_kwargs):
472+
for path in paths:
473+
if path not in self.path_to_id_mapping:
474+
# Prevent duplicate requests for the same path
475+
self.path_to_id_mapping[path] = None
476+
req = self._prepare_v2_list_metadata_request(path)
477+
try:
478+
resp = await self.client_stub_v2.ListMetadata(req, **rpc_kwargs)
479+
logger.debug("%s: %s", type(resp).__name__, resp)
480+
if len(resp.metadata) == 1:
481+
self.path_to_id_mapping[path] = resp.metadata[0].id
482+
self.id_to_path_mapping[resp.metadata[0].id] = path
483+
else:
484+
del self.path_to_id_mapping[path]
485+
raise VSSClientError(
486+
error={
487+
"code": grpc.StatusCode.NOT_FOUND.value[0],
488+
"reason": grpc.StatusCode.NOT_FOUND.value[1],
489+
"message": f"Path {path} not found on server",
490+
},
491+
errors=[],
492+
)
493+
except AioRpcError as exc:
494+
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
495+
logger.debug("v2 not available - skip querying ids")
496+
return
497+
raise VSSClientError.from_grpc_error(exc) from exc
498+
437499
@check_connected_async_iter
438500
async def subscribe(
439501
self,
@@ -446,36 +508,78 @@ async def subscribe(
446508
rpc_kwargs
447509
grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials.
448510
"""
511+
512+
if try_v2:
513+
raise VSSClientError(
514+
error={
515+
"code": grpc.StatusCode.INVALID_ARGUMENT.value[0],
516+
"reason": grpc.StatusCode.INVALID_ARGUMENT.value[1],
517+
"message": ("Method subscribe supports v1, only. "
518+
"Use v2_subscribe or v2_subscribe_batch_actuation instead."),
519+
},
520+
errors=[],
521+
)
522+
523+
logger.debug("Try subscribing via v1")
449524
rpc_kwargs["metadata"] = self.generate_metadata_header(
450525
rpc_kwargs.get("metadata")
451526
)
452-
if try_v2:
453-
logger.debug("Trying v2")
454-
req = self._prepare_subscribev2_request(entries)
455-
resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs)
456-
try:
457-
async for resp in resp_stream:
458-
logger.debug("%s: %s", type(resp).__name__, resp)
459-
yield [
460-
EntryUpdate.from_tuple(path, dp)
461-
for path, dp in resp.entries.items()
462-
]
463-
except AioRpcError as exc:
464-
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
465-
logger.debug("v2 not available fall back to v1 instead")
466-
await self.subscribe(entries)
467-
else:
468-
raise VSSClientError.from_grpc_error(exc) from exc
469-
else:
470-
logger.debug("Trying v1")
471-
req = self._prepare_subscribe_request(entries)
472-
resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs)
473-
try:
474-
async for resp in resp_stream:
475-
logger.debug("%s: %s", type(resp).__name__, resp)
476-
yield [EntryUpdate.from_message(update) for update in resp.updates]
477-
except AioRpcError as exc:
478-
raise VSSClientError.from_grpc_error(exc) from exc
527+
req = self._prepare_subscribe_request(entries)
528+
resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs)
529+
try:
530+
async for resp in resp_stream:
531+
logger.debug("%s: %s", type(resp).__name__, resp)
532+
yield [EntryUpdate.from_message(update) for update in resp.updates]
533+
except AioRpcError as exc:
534+
raise VSSClientError.from_grpc_error(exc) from exc
535+
536+
@check_connected_async_iter
537+
async def v2_subscribe(
538+
self, paths: Iterable[str], **rpc_kwargs
539+
) -> AsyncIterator[List[EntryUpdate]]:
540+
"""
541+
Parameters:
542+
rpc_kwargs
543+
grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials.
544+
"""
545+
546+
logger.debug("Subscribe current values via v2")
547+
rpc_kwargs["metadata"] = self.generate_metadata_header(
548+
rpc_kwargs.get("metadata")
549+
)
550+
req = self._prepare_v2_subscribe_request(paths)
551+
resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs)
552+
async for resp in resp_stream:
553+
logger.debug("%s: %s", type(resp).__name__, resp)
554+
yield [
555+
EntryUpdate.from_tuple(path, dp)
556+
for path, dp in resp.entries.items()
557+
]
558+
559+
@check_connected_async_iter
560+
async def v2_subscribe_batch_actuation(
561+
self, paths: Iterable[str], **rpc_kwargs
562+
) -> AsyncIterator[List[EntryUpdate]]:
563+
"""
564+
Parameters:
565+
rpc_kwargs
566+
grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials.
567+
"""
568+
569+
logger.debug("Subscribe actuation requests via v2")
570+
rpc_kwargs["metadata"] = self.generate_metadata_header(
571+
rpc_kwargs.get("metadata")
572+
)
573+
await self.ensure_id_mapping(paths, **rpc_kwargs)
574+
req = self._prepare_v2_provide_actuation_request(paths)
575+
resp_stream = self.client_stub_v2.OpenProviderStream(iter(req), **rpc_kwargs)
576+
async for resp in resp_stream:
577+
logger.debug("batch %s: %s", type(resp).__name__, resp)
578+
if resp.HasField("batch_actuate_stream_request"):
579+
yield [
580+
EntryUpdate.from_actuate_value(self.get_path(actuate_req.signal_id), actuate_req.value)
581+
for actuate_req in resp.batch_actuate_stream_request.actuate_requests
582+
]
479583

480584
@check_connected_async
481585
async def authorize(self, token: str, **rpc_kwargs) -> str:

0 commit comments

Comments
 (0)