Skip to content

Commit 2a9d15f

Browse files
committed
kraken: zenoh: add handlers for the extension endpoints
1 parent 2b9f3f0 commit 2a9d15f

2 files changed

Lines changed: 107 additions & 1 deletion

File tree

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
from commonwealth.utils.zenoh_helper import ZenohRouter, ZenohSession
22
from config import SERVICE_NAME
33
from zenoh_handlers.container_handler import ContainerHandlers
4+
from zenoh_handlers.extension_handler import ExtensionHandlers
45

56
session = ZenohSession(SERVICE_NAME)
67
router = ZenohRouter(SERVICE_NAME)
78

9+
# Extension
10+
extension_handlers = ExtensionHandlers(router)
11+
extension_handlers.register_queryables()
12+
813
# Container
914
container_handlers = ContainerHandlers(router)
10-
container_handlers.register_queryables()
15+
container_handlers.register_queryables()
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import asyncio
2+
import json
3+
from typing import Any, AsyncGenerator, List, cast
4+
5+
from commonwealth.utils.zenoh_helper import ZenohRouter
6+
from extension.extension import Extension
7+
from loguru import logger
8+
9+
10+
class ExtensionHandlers:
11+
INSTALL_PROGRESS_TOPIC = "extension/install/progress"
12+
13+
def __init__(self, router: ZenohRouter) -> None:
14+
self.router = router
15+
self.router.ensure_publisher(self.INSTALL_PROGRESS_TOPIC)
16+
17+
@staticmethod
18+
async def _install_progress_stream(identifier: str, extension: Extension) -> AsyncGenerator[str, None]:
19+
try:
20+
async for chunk in extension.install():
21+
try:
22+
payload = json.loads(chunk)
23+
except (TypeError, ValueError) as e:
24+
logger.debug(f"Failed to parse install progress chunk: {e}")
25+
continue
26+
payload["identifier"] = identifier
27+
yield json.dumps(payload, default=str)
28+
except Exception as error:
29+
logger.exception(f"Install of {identifier} failed")
30+
yield json.dumps({"identifier": identifier, "error": str(error)}, default=str)
31+
32+
async def install_handler(self, identifier: str, tag: str = "", stable: str = "true") -> dict[str, str]:
33+
"""
34+
Install an extension by its identifier and tag, if tag is not provided it will install the latest stable version.
35+
"""
36+
if tag:
37+
extension = cast(Extension, await Extension.from_manifest(identifier, tag))
38+
else:
39+
extension = await Extension.from_latest(identifier, stable.lower() == "true")
40+
41+
on_complete = json.dumps({"identifier": identifier, "status": "complete"})
42+
self.router.publish_from_generator(
43+
self.INSTALL_PROGRESS_TOPIC,
44+
self._install_progress_stream(identifier, extension),
45+
on_complete=on_complete,
46+
)
47+
return {"status": "started", "identifier": identifier}
48+
49+
async def uninstall_handler(self, identifier: str, tag: str = "") -> None:
50+
"""
51+
Uninstall all versions of an extension by its identifier or just a specific version if a tag is provided.
52+
"""
53+
if tag:
54+
extension = cast(Extension, await Extension.from_settings(identifier, tag))
55+
await extension.uninstall()
56+
else:
57+
extensions = cast(List[Extension], await Extension.from_settings(identifier))
58+
await asyncio.gather(*[ext.uninstall() for ext in extensions])
59+
60+
async def enable_handler(self, identifier: str, tag: str) -> None:
61+
"""
62+
Enables an extension by its identifier and tag.
63+
"""
64+
extension = cast(Extension, await Extension.from_settings(identifier, tag))
65+
await extension.enable()
66+
67+
async def disable_handler(self, identifier: str) -> None:
68+
"""
69+
Disables current running extension by its identifier.
70+
"""
71+
extension = await Extension.from_running(identifier)
72+
await extension.disable()
73+
74+
async def restart_handler(self, identifier: str) -> None:
75+
"""
76+
Restart current running extension by its identifier.
77+
"""
78+
extension = await Extension.from_running(identifier)
79+
await extension.restart()
80+
81+
async def fetch_handler(self) -> list[dict[str, Any]]:
82+
"""
83+
List details of all installed extensions.
84+
"""
85+
extensions = cast(List[Extension], await Extension.from_settings())
86+
return [ext.source.model_dump() for ext in extensions if ext.source.identifier != ""]
87+
88+
async def keep_uploaded_extension_alive_handler(self, temp_tag: str) -> None:
89+
"""
90+
Refresh the keep-alive timestamp for a temporary extension while the user is editing metadata.
91+
"""
92+
Extension.keep_temporary_extension_alive(temp_tag)
93+
94+
def register_queryables(self) -> None:
95+
self.router.add_queryable("extension/fetch", self.fetch_handler)
96+
self.router.add_queryable("extension/install", self.install_handler)
97+
self.router.add_queryable("extension/uninstall", self.uninstall_handler)
98+
self.router.add_queryable("extension/enable", self.enable_handler)
99+
self.router.add_queryable("extension/disable", self.disable_handler)
100+
self.router.add_queryable("extension/restart", self.restart_handler)
101+
self.router.add_queryable("extension/upload/keep-alive", self.keep_uploaded_extension_alive_handler)

0 commit comments

Comments
 (0)