-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprovider.py
More file actions
110 lines (94 loc) · 3.94 KB
/
provider.py
File metadata and controls
110 lines (94 loc) · 3.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from __future__ import annotations
import uuid
from collections.abc import Awaitable, Callable, Mapping
from typing import Any
from astrbot.core.computer.booters.base import ComputerBooter
from astrbot.core.star.context import Context
from .booters import boxlite as boxlite_booter
from .booters.boxlite import BoxliteBooter, allocate_boxlite_host_port
BootHook = Callable[[Context, str, str, dict], Awaitable[ComputerBooter]]
class BoxliteSandboxProvider:
provider_id = "boxlite"
capabilities = {"shell", "python", "filesystem"}
supports_persistent_reconnect = True
tool_names: set[str] = set()
def __init__(
self,
boot_hook: BootHook | None = None,
*,
plugin_config: Mapping[str, Any] | None = None,
) -> None:
self.plugin_config: dict[str, Any] = (
dict(plugin_config) if plugin_config is not None else {}
)
self._boot_hook = boot_hook
@staticmethod
def _persistent_name(config: dict, fallback: str) -> str:
return str(config.get("persistent_name") or fallback).strip()
def build_create_config(self, context: Context, session_id: str) -> dict:
return {"host_port": allocate_boxlite_host_port()}
def build_connect_info(self, sandbox_name: str, config: dict) -> dict:
return {
"name": sandbox_name,
"persistent_name": self._persistent_name(
config,
str(config.get("sandbox_id") or sandbox_name),
),
"host_port": int(config.get("host_port") or allocate_boxlite_host_port()),
}
def update_connect_info(self, record: dict, *, sandbox_name: str) -> dict:
connect_info = dict(record.get("connect_info") or {})
connect_info["name"] = sandbox_name
connect_info.setdefault(
"persistent_name",
str(record.get("sandbox_id") or sandbox_name).strip(),
)
return connect_info
def update_connect_info_after_boot(
self, record: dict, booter: ComputerBooter
) -> dict | None:
host_port = getattr(booter, "host_port", None)
if not host_port:
return None
connect_info = dict(record.get("connect_info") or {})
connect_info["host_port"] = int(host_port)
return connect_info
def get_idle_timeout(self, context: Context, session_id: str) -> float:
return 0.0
async def check_persistent_sandbox_exists(self, record: dict) -> bool:
connect_info = dict(record.get("connect_info") or {})
box_name = str(
connect_info.get("persistent_name")
or connect_info.get("name")
or record.get("sandbox_id")
or ""
).strip()
if not box_name:
return False
runtime = boxlite_booter.boxlite.Boxlite.default()
return runtime.get_info(box_name) is not None
async def create_booter(
self, context: Context, session_id: str, sandbox_id: str, config: dict
) -> ComputerBooter:
if self._boot_hook is not None:
return await self._boot_hook(context, session_id, sandbox_id, config)
host_port = config.get("host_port")
if bool(config.get("resume", False)) and not host_port:
raise RuntimeError(
"Boxlite persistent sandbox cannot be resumed without a stored host_port"
)
client = BoxliteBooter(
persistent=True,
persistent_name=self._persistent_name(config, sandbox_id),
resume=bool(config.get("resume", False)),
sandbox_id=sandbox_id,
host_port=int(host_port) if host_port else None,
)
await client.boot(uuid.uuid5(uuid.NAMESPACE_DNS, session_id).hex)
return client
async def destroy_booter(self, booter: ComputerBooter, record: dict) -> None:
destroy = getattr(booter, "destroy", None)
if callable(destroy):
await destroy()
return
await booter.shutdown()