Skip to content

Commit fe70033

Browse files
committed
Try adapting async stuff
1 parent 210acd0 commit fe70033

2 files changed

Lines changed: 173 additions & 93 deletions

File tree

kuksa-client/kuksa_client/grpc/aio.py

Lines changed: 151 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
from kuksa.val.v1 import val_pb2 as val_v1
3535
from kuksa.val.v1 import val_pb2_grpc as val_grpc_v1
36+
from kuksa.val.v2 import types_pb2 as types_v2
3637
from kuksa.val.v2 import val_pb2_grpc as val_grpc_v2
3738

3839
from . import BaseVSSClient
@@ -57,6 +58,8 @@ def __init__(self, *args, **kwargs):
5758
super().__init__(*args, **kwargs)
5859
self.channel = None
5960
self.exit_stack = contextlib.AsyncExitStack()
61+
self.path_to_id_mapping: Dict[str, int] = dict()
62+
self.id_to_path_mapping: Dict[int, str] = dict()
6063

6164
async def __aenter__(self):
6265
await self.connect()
@@ -66,9 +69,13 @@ async def __aexit__(self, exc_type, exc_value, traceback):
6669
await self.disconnect()
6770

6871
async def connect(self, target_host=None):
72+
self.path_to_id_mapping.clear()
73+
self.id_to_path_mapping.clear()
74+
6975
creds = self._load_creds()
7076
if target_host is None:
7177
target_host = self.target_host
78+
7279
if creds is not None:
7380
logger.info("Establishing secure channel")
7481
if self.tls_server_name:
@@ -297,15 +304,25 @@ async def subscribe_current_values(
297304
for path, dp in updates.items():
298305
print(f"Current value for {path} is now: {dp.value}")
299306
"""
300-
async for updates in self.subscribe(
301-
entries=(
302-
SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,))
303-
for path in paths
304-
),
305-
try_v2=True,
306-
**rpc_kwargs,
307-
):
308-
yield {update.entry.path: update.entry.value for update in updates}
307+
try:
308+
logger.debug("Try to subscribe current values via v2")
309+
async for updates in self.v2_subscribe(paths, **rpc_kwargs):
310+
yield {
311+
update.entry.path: update.entry.value for update in updates
312+
}
313+
except AioRpcError as exc:
314+
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
315+
logger.debug("v2 not available - falling back to v1 subscribe current values")
316+
async for updates in self.subscribe(
317+
entries=(
318+
SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,))
319+
for path in paths
320+
),
321+
**rpc_kwargs,
322+
):
323+
yield {update.entry.path: update.entry.value for update in updates}
324+
else:
325+
raise VSSClientError.from_grpc_error(exc) from exc
309326

310327
@check_connected_async_iter
311328
async def subscribe_target_values(
@@ -322,17 +339,27 @@ async def subscribe_target_values(
322339
for path, dp in updates.items():
323340
print(f"Target value for {path} is now: {dp.value}")
324341
"""
325-
async for updates in self.subscribe(
326-
entries=(
327-
SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,))
328-
for path in paths
329-
),
330-
try_v2=True,
331-
**rpc_kwargs,
332-
):
333-
yield {
334-
update.entry.path: update.entry.actuator_target for update in updates
335-
}
342+
try:
343+
logger.debug("Try to subscribe actuation requests via v2")
344+
async for updates in self.v2_subscribe_batch_actuation(paths, **rpc_kwargs):
345+
yield {
346+
update.entry.path: update.entry.value for update in updates
347+
}
348+
except AioRpcError as exc:
349+
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
350+
logger.debug("v2 not available - falling back to v1 subscribe target values")
351+
async for updates in self.subscribe(
352+
entries=(
353+
SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,))
354+
for path in paths
355+
),
356+
**rpc_kwargs,
357+
):
358+
yield {
359+
update.entry.path: update.entry.actuator_target for update in updates
360+
}
361+
else:
362+
raise VSSClientError.from_grpc_error(exc) from exc
336363

337364
@check_connected_async_iter
338365
async def subscribe_metadata(
@@ -434,6 +461,41 @@ async def set(
434461
raise VSSClientError.from_grpc_error(exc) from exc
435462
self._process_set_response(resp)
436463

464+
def get_path(self, signal_id: types_v2.SignalID) -> str:
465+
if signal_id.HasField("path"):
466+
return signal_id.path
467+
elif signal_id.HasField("id") and signal_id.id in self.id_to_path_mapping:
468+
return self.id_to_path_mapping[signal_id.id]
469+
return "<unknown signal>"
470+
471+
async def ensure_id_mapping(self, paths: Iterable[str], **rpc_kwargs):
472+
for path in paths:
473+
if path not in self.path_to_id_mapping:
474+
# Prevent duplicate requests for the same path
475+
self.path_to_id_mapping[path] = None
476+
req = self._prepare_v2_list_metadata_request(path)
477+
try:
478+
resp = await self.client_stub_v2.ListMetadata(req, **rpc_kwargs)
479+
logger.debug("%s: %s", type(resp).__name__, resp)
480+
if len(resp.metadata) == 1:
481+
self.path_to_id_mapping[path] = resp.metadata[0].id
482+
self.id_to_path_mapping[resp.metadata[0].id] = path
483+
else:
484+
del self.path_to_id_mapping[path]
485+
raise VSSClientError(
486+
error={
487+
"code": grpc.StatusCode.NOT_FOUND.value[0],
488+
"reason": grpc.StatusCode.NOT_FOUND.value[1],
489+
"message": f"Path {path} not found on server",
490+
},
491+
errors=[],
492+
)
493+
except AioRpcError as exc:
494+
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
495+
logger.debug("v2 not available - skip querying ids")
496+
return
497+
raise VSSClientError.from_grpc_error(exc) from exc
498+
437499
@check_connected_async_iter
438500
async def subscribe(
439501
self,
@@ -446,36 +508,78 @@ async def subscribe(
446508
rpc_kwargs
447509
grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials.
448510
"""
511+
512+
if try_v2:
513+
raise VSSClientError(
514+
error={
515+
"code": grpc.StatusCode.INVALID_ARGUMENT.value[0],
516+
"reason": grpc.StatusCode.INVALID_ARGUMENT.value[1],
517+
"message": ("Method subscribe supports v1, only. "
518+
"Use v2_subscribe or v2_subscribe_batch_actuation instead."),
519+
},
520+
errors=[],
521+
)
522+
523+
logger.debug("Try subscribing via v1")
449524
rpc_kwargs["metadata"] = self.generate_metadata_header(
450525
rpc_kwargs.get("metadata")
451526
)
452-
if try_v2:
453-
logger.debug("Trying v2")
454-
req = self._prepare_subscribev2_request(entries)
455-
resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs)
456-
try:
457-
async for resp in resp_stream:
458-
logger.debug("%s: %s", type(resp).__name__, resp)
459-
yield [
460-
EntryUpdate.from_tuple(path, dp)
461-
for path, dp in resp.entries.items()
462-
]
463-
except AioRpcError as exc:
464-
if exc.code() == grpc.StatusCode.UNIMPLEMENTED:
465-
logger.debug("v2 not available fall back to v1 instead")
466-
await self.subscribe(entries)
467-
else:
468-
raise VSSClientError.from_grpc_error(exc) from exc
469-
else:
470-
logger.debug("Trying v1")
471-
req = self._prepare_subscribe_request(entries)
472-
resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs)
473-
try:
474-
async for resp in resp_stream:
475-
logger.debug("%s: %s", type(resp).__name__, resp)
476-
yield [EntryUpdate.from_message(update) for update in resp.updates]
477-
except AioRpcError as exc:
478-
raise VSSClientError.from_grpc_error(exc) from exc
527+
req = self._prepare_subscribe_request(entries)
528+
resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs)
529+
try:
530+
async for resp in resp_stream:
531+
logger.debug("%s: %s", type(resp).__name__, resp)
532+
yield [EntryUpdate.from_message(update) for update in resp.updates]
533+
except AioRpcError as exc:
534+
raise VSSClientError.from_grpc_error(exc) from exc
535+
536+
@check_connected_async_iter
537+
async def v2_subscribe(
538+
self, paths: Iterable[str], **rpc_kwargs
539+
) -> AsyncIterator[List[EntryUpdate]]:
540+
"""
541+
Parameters:
542+
rpc_kwargs
543+
grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials.
544+
"""
545+
546+
logger.debug("Subscribe current values via v2")
547+
rpc_kwargs["metadata"] = self.generate_metadata_header(
548+
rpc_kwargs.get("metadata")
549+
)
550+
req = self._prepare_v2_subscribe_request(paths)
551+
resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs)
552+
async for resp in resp_stream:
553+
logger.debug("%s: %s", type(resp).__name__, resp)
554+
yield [
555+
EntryUpdate.from_tuple(path, dp)
556+
for path, dp in resp.entries.items()
557+
]
558+
559+
@check_connected_async_iter
560+
async def v2_subscribe_batch_actuation(
561+
self, paths: Iterable[str], **rpc_kwargs
562+
) -> AsyncIterator[List[EntryUpdate]]:
563+
"""
564+
Parameters:
565+
rpc_kwargs
566+
grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials.
567+
"""
568+
569+
logger.debug("Subscribe actuation requests via v2")
570+
rpc_kwargs["metadata"] = self.generate_metadata_header(
571+
rpc_kwargs.get("metadata")
572+
)
573+
await self.ensure_id_mapping(paths, **rpc_kwargs)
574+
req = self._prepare_v2_provide_actuation_request(paths)
575+
resp_stream = self.client_stub_v2.OpenProviderStream(iter(req), **rpc_kwargs)
576+
async for resp in resp_stream:
577+
logger.debug("batch %s: %s", type(resp).__name__, resp)
578+
if resp.HasField("batch_actuate_stream_request"):
579+
yield [
580+
EntryUpdate.from_tuple(self.get_path(actuate_req.signal_id), Datapoint(value=actuate_req.value))
581+
for actuate_req in resp.batch_actuate_stream_request.actuate_requests
582+
]
479583

480584
@check_connected_async
481585
async def authorize(self, token: str, **rpc_kwargs) -> str:

kuksa-client/tests/test_grpc.py

Lines changed: 22 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1768,22 +1768,14 @@ async def test_subscribe_some_entries_v2(
17681768
)
17691769

17701770
actual_responses = []
1771-
async for updates in client.subscribe(
1772-
entries=(
1773-
entry
1774-
for entry in ( # generator is intentional (Iterable)
1775-
EntryRequest(
1776-
"Vehicle.Speed", View.CURRENT_VALUE, (Field.VALUE,)
1777-
),
1778-
EntryRequest(
1779-
"Vehicle.ADAS.ABS.IsActive",
1780-
# Specified View is ignored so we can use anyone :-)
1781-
View.METADATA,
1782-
(Field.VALUE,),
1783-
),
1771+
async for updates in client.v2_subscribe(
1772+
paths=(
1773+
path
1774+
for path in ( # generator is intentional (Iterable)
1775+
"Vehicle.Speed",
1776+
"Vehicle.ADAS.ABS.IsActive",
17841777
)
17851778
),
1786-
try_v2=True,
17871779
):
17881780
actual_responses.append(updates)
17891781

@@ -1859,21 +1851,14 @@ async def test_subscribe_some_entries_v2_target(
18591851
actual_responses = []
18601852

18611853
with pytest.raises(VSSClientError):
1862-
async for updates in client.subscribe(
1863-
entries=(
1864-
entry
1865-
for entry in ( # generator is intentional (Iterable)
1866-
EntryRequest(
1867-
"Vehicle.Speed", View.TARGET_VALUE, (Field.ACTUATOR_TARGET,)
1868-
),
1869-
EntryRequest(
1870-
"Vehicle.ADAS.ABS.IsActive",
1871-
View.TARGET_VALUE,
1872-
(Field.ACTUATOR_TARGET,),
1873-
),
1854+
async for updates in client.v2_subscribe_batch_actuation(
1855+
paths=(
1856+
path
1857+
for path in ( # generator is intentional (Iterable)
1858+
"Vehicle.Speed",
1859+
"Vehicle.ADAS.ABS.IsActive",
18741860
)
18751861
),
1876-
try_v2=True,
18771862
):
18781863
actual_responses.append(updates)
18791864

@@ -1900,7 +1885,7 @@ async def test_subscribe_no_entries_requested(
19001885
"127.0.0.1", unused_tcp_port, ensure_startup_connection=False
19011886
) as client:
19021887
with pytest.raises(VSSClientError):
1903-
async for _ in client.subscribe(entries=(), try_v2=True):
1888+
async for _ in client.v2_subscribe(paths=()):
19041889
pass
19051890

19061891
@pytest.mark.usefixtures("mocked_databroker")
@@ -1926,16 +1911,13 @@ async def test_subscribe_nonexistent_entries(
19261911
"127.0.0.1", unused_tcp_port, ensure_startup_connection=False
19271912
) as client:
19281913
with pytest.raises(VSSClientError):
1929-
async for _ in client.subscribe(
1930-
entries=(
1931-
entry
1932-
for entry in ( # generator is intentional (Iterable)
1933-
EntryRequest(
1934-
"Does.Not.Exist", View.CURRENT_VALUE, (Field.VALUE,)
1935-
),
1914+
async for _ in client.v2_subscribe(
1915+
paths=(
1916+
path
1917+
for path in ( # generator is intentional (Iterable)
1918+
"Does.Not.Exist",
19361919
)
19371920
),
1938-
try_v2=True,
19391921
):
19401922
pass
19411923

@@ -2124,11 +2106,8 @@ async def test_add_subscriber_v2(self, mocker, unused_tcp_port, val_servicer_v2)
21242106
response for response in responses
21252107
)
21262108

2127-
subscribe_response_stream = client.subscribe(
2128-
entries=(
2129-
EntryRequest("Vehicle.Speed", View.CURRENT_VALUE, (Field.VALUE,)),
2130-
),
2131-
try_v2=True,
2109+
subscribe_response_stream = client.v2_subscribe(
2110+
paths=("Vehicle.Speed"),
21322111
)
21332112
sub_uid = await subscriber_manager.add_subscriber(
21342113
subscribe_response_stream, callback=callback
@@ -2183,11 +2162,8 @@ async def test_remove_subscriber_v2(self, mocker, unused_tcp_port, val_servicer_
21832162
val_servicer_v2.Subscribe.return_value = (
21842163
response for response in responses
21852164
)
2186-
subscribe_response_stream = client.subscribe(
2187-
entries=(
2188-
EntryRequest("Vehicle.Speed", View.CURRENT_VALUE, (Field.VALUE,)),
2189-
),
2190-
try_v2=True,
2165+
subscribe_response_stream = client.v2_subscribe(
2166+
paths=("Vehicle.Speed"),
21912167
)
21922168
sub_uid = await subscriber_manager.add_subscriber(
21932169
subscribe_response_stream, callback=mocker.Mock()

0 commit comments

Comments
 (0)