|
2 | 2 | import os |
3 | 3 | import shlex |
4 | 4 | import uuid |
| 5 | +from pathlib import Path |
5 | 6 |
|
6 | 7 | from pydantic import Field |
7 | 8 | from pydantic.dataclasses import dataclass |
|
14 | 15 | from astrbot.core.computer.computer_client import get_booter |
15 | 16 | from astrbot.core.message.message_event_result import MessageChain |
16 | 17 | from astrbot.core.platform.message_session import MessageSession |
17 | | -from astrbot.core.tools.computer_tools.util import check_admin_permission |
| 18 | +from astrbot.core.tools.computer_tools.util import ( |
| 19 | + check_admin_permission, |
| 20 | + is_local_runtime, |
| 21 | + workspace_root, |
| 22 | +) |
18 | 23 | from astrbot.core.tools.registry import builtin_tool |
19 | | -from astrbot.core.utils.astrbot_path import get_astrbot_temp_path |
| 24 | +from astrbot.core.utils.astrbot_path import ( |
| 25 | + get_astrbot_system_tmp_path, |
| 26 | + get_astrbot_temp_path, |
| 27 | +) |
| 28 | + |
| 29 | + |
| 30 | +def _file_send_allowed_roots(umo: str | None) -> tuple[Path, ...]: |
| 31 | + roots = [] |
| 32 | + if umo: |
| 33 | + roots.append(workspace_root(umo)) |
| 34 | + roots.extend( |
| 35 | + [ |
| 36 | + Path(get_astrbot_temp_path()).resolve(strict=False), |
| 37 | + Path(get_astrbot_system_tmp_path()).resolve(strict=False), |
| 38 | + ] |
| 39 | + ) |
| 40 | + return tuple(roots) |
| 41 | + |
| 42 | + |
| 43 | +def _is_path_within(path: Path, roots: tuple[Path, ...]) -> bool: |
| 44 | + return any(path == root or path.is_relative_to(root) for root in roots) |
| 45 | + |
| 46 | + |
| 47 | +def _is_restricted_local_env(context: ContextWrapper[AstrAgentContext]) -> bool: |
| 48 | + if not is_local_runtime(context): |
| 49 | + return False |
| 50 | + cfg = context.context.context.get_config( |
| 51 | + umo=context.context.event.unified_msg_origin |
| 52 | + ) |
| 53 | + provider_settings = cfg.get("provider_settings", {}) |
| 54 | + require_admin = provider_settings.get("computer_use_require_admin", True) |
| 55 | + return require_admin and context.context.event.role != "admin" |
| 56 | + |
| 57 | + |
| 58 | +def _can_send_local_file( |
| 59 | + context: ContextWrapper[AstrAgentContext], |
| 60 | + local_path: Path, |
| 61 | +) -> bool: |
| 62 | + umo = context.context.event.unified_msg_origin |
| 63 | + allowed_roots = _file_send_allowed_roots(umo) |
| 64 | + if _is_path_within(local_path, allowed_roots): |
| 65 | + return True |
| 66 | + return is_local_runtime(context) and not _is_restricted_local_env(context) |
20 | 67 |
|
21 | 68 |
|
22 | 69 | @builtin_tool |
@@ -85,23 +132,38 @@ async def _resolve_path_from_sandbox( |
85 | 132 | *, |
86 | 133 | component_type: str = "file", |
87 | 134 | ) -> tuple[str, bool]: |
88 | | - path = str(path) |
89 | | - # if the path is relative, check if the file exists in user's local workspace |
| 135 | + path = str(path).strip() |
| 136 | + if not path: |
| 137 | + raise FileNotFoundError(f"{component_type} path is empty") |
| 138 | + |
| 139 | + # Relative host paths are resolved only inside the user's workspace. |
90 | 140 | if not os.path.isabs(path): |
91 | 141 | unified_msg_origin = context.context.event.unified_msg_origin |
92 | 142 | if unified_msg_origin: |
93 | | - from astrbot.core.tools.computer_tools.util import workspace_root |
94 | | - |
95 | 143 | try: |
96 | 144 | ws_path = workspace_root(unified_msg_origin) |
97 | | - ws_candidate = (ws_path / path).resolve() |
| 145 | + ws_candidate = (ws_path / path).resolve(strict=False) |
98 | 146 | if ws_candidate.is_file() and ws_candidate.is_relative_to(ws_path): |
99 | 147 | return str(ws_candidate), False |
100 | 148 | except Exception: |
101 | 149 | pass |
102 | | - # check if the file exists in local environment (only allow absolute paths to prevent traversal) |
103 | | - elif os.path.isfile(path): |
104 | | - return path, False |
| 150 | + else: |
| 151 | + local_candidate = Path(path).expanduser().resolve(strict=False) |
| 152 | + if local_candidate.is_file(): |
| 153 | + if _can_send_local_file(context, local_candidate): |
| 154 | + return str(local_candidate), False |
| 155 | + if is_local_runtime(context): |
| 156 | + allowed = ", ".join( |
| 157 | + str(root) |
| 158 | + for root in _file_send_allowed_roots( |
| 159 | + context.context.event.unified_msg_origin |
| 160 | + ) |
| 161 | + ) |
| 162 | + raise PermissionError( |
| 163 | + "Local file send is restricted for this user. " |
| 164 | + f"Allowed directories: {allowed}. " |
| 165 | + f"Blocked path: {local_candidate}." |
| 166 | + ) |
105 | 167 |
|
106 | 168 | try: |
107 | 169 | sb = await get_booter( |
@@ -221,6 +283,8 @@ async def call( |
221 | 283 | ) |
222 | 284 | except FileNotFoundError as exc: |
223 | 285 | return f"error: {exc}" |
| 286 | + except PermissionError as exc: |
| 287 | + return f"error: {exc}" |
224 | 288 | except Exception as exc: |
225 | 289 | return f"error: failed to build messages[{idx}] component: {exc}" |
226 | 290 |
|
|
0 commit comments