Skip to content

Commit 9b46c5b

Browse files
authored
fix: enforce admin guard for sandbox file transfer tools (AstrBotDevs#5402)
* fix: enforce admin guard for sandbox file transfer tools * refactor: deduplicate computer tools admin permission checks * fix: add missing space in permission error message
1 parent e7808a3 commit 9b46c5b

4 files changed

Lines changed: 29 additions & 33 deletions

File tree

astrbot/core/computer/tools/fs.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from astrbot.core.utils.astrbot_path import get_astrbot_temp_path
1212

1313
from ..computer_client import get_booter
14+
from .permissions import check_admin_permission
1415

1516
# @dataclass
1617
# class CreateFileTool(FunctionTool):
@@ -102,6 +103,8 @@ async def call(
102103
context: ContextWrapper[AstrAgentContext],
103104
local_path: str,
104105
) -> str | None:
106+
if permission_error := check_admin_permission(context, "File upload/download"):
107+
return permission_error
105108
sb = await get_booter(
106109
context.context.context,
107110
context.context.event.unified_msg_origin,
@@ -161,6 +164,8 @@ async def call(
161164
remote_path: str,
162165
also_send_to_user: bool = True,
163166
) -> ToolExecResult:
167+
if permission_error := check_admin_permission(context, "File upload/download"):
168+
return permission_error
164169
sb = await get_booter(
165170
context.context.context,
166171
context.context.event.unified_msg_origin,
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from astrbot.core.agent.run_context import ContextWrapper
2+
from astrbot.core.astr_agent_context import AstrAgentContext
3+
4+
5+
def check_admin_permission(
6+
context: ContextWrapper[AstrAgentContext], operation_name: str
7+
) -> str | None:
8+
cfg = context.context.context.get_config(
9+
umo=context.context.event.unified_msg_origin
10+
)
11+
provider_settings = cfg.get("provider_settings", {})
12+
require_admin = provider_settings.get("computer_use_require_admin", True)
13+
if require_admin and context.context.event.role != "admin":
14+
return (
15+
f"error: Permission denied. {operation_name} is only allowed for admin users. "
16+
"Tell user to set admins in `AstrBot WebUI -> Config -> General Config` by adding their user ID to the admins list if they need this feature. "
17+
f"User's ID is: {context.context.event.get_sender_id()}. User's ID can be found by using /sid command."
18+
)
19+
return None

astrbot/core/computer/tools/python.py

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from astrbot.core.agent.tool import ToolExecResult
88
from astrbot.core.astr_agent_context import AstrAgentContext, AstrMessageEvent
99
from astrbot.core.computer.computer_client import get_booter, get_local_booter
10+
from astrbot.core.computer.tools.permissions import check_admin_permission
1011
from astrbot.core.message.message_event_result import MessageChain
1112

1213
param_schema = {
@@ -26,21 +27,6 @@
2627
}
2728

2829

29-
def _check_admin_permission(context: ContextWrapper[AstrAgentContext]) -> str | None:
30-
cfg = context.context.context.get_config(
31-
umo=context.context.event.unified_msg_origin
32-
)
33-
provider_settings = cfg.get("provider_settings", {})
34-
require_admin = provider_settings.get("computer_use_require_admin", True)
35-
if require_admin and context.context.event.role != "admin":
36-
return (
37-
"error: Permission denied. Python execution is only allowed for admin users. "
38-
"Tell user to set admins in `AstrBot WebUI -> Config -> General Config` by adding their user ID to the admins list if they need this feature."
39-
f"User's ID is: {context.context.event.get_sender_id()}. User's ID can be found by using /sid command."
40-
)
41-
return None
42-
43-
4430
async def handle_result(result: dict, event: AstrMessageEvent) -> ToolExecResult:
4531
data = result.get("data", {})
4632
output = data.get("output", {})
@@ -81,7 +67,7 @@ class PythonTool(FunctionTool):
8167
async def call(
8268
self, context: ContextWrapper[AstrAgentContext], code: str, silent: bool = False
8369
) -> ToolExecResult:
84-
if permission_error := _check_admin_permission(context):
70+
if permission_error := check_admin_permission(context, "Python execution"):
8571
return permission_error
8672
sb = await get_booter(
8773
context.context.context,
@@ -104,7 +90,7 @@ class LocalPythonTool(FunctionTool):
10490
async def call(
10591
self, context: ContextWrapper[AstrAgentContext], code: str, silent: bool = False
10692
) -> ToolExecResult:
107-
if permission_error := _check_admin_permission(context):
93+
if permission_error := check_admin_permission(context, "Python execution"):
10894
return permission_error
10995
sb = get_local_booter()
11096
try:

astrbot/core/computer/tools/shell.py

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,7 @@
77
from astrbot.core.astr_agent_context import AstrAgentContext
88

99
from ..computer_client import get_booter, get_local_booter
10-
11-
12-
def _check_admin_permission(context: ContextWrapper[AstrAgentContext]) -> str | None:
13-
cfg = context.context.context.get_config(
14-
umo=context.context.event.unified_msg_origin
15-
)
16-
provider_settings = cfg.get("provider_settings", {})
17-
require_admin = provider_settings.get("computer_use_require_admin", True)
18-
if require_admin and context.context.event.role != "admin":
19-
return (
20-
"error: Permission denied. Shell execution is only allowed for admin users. "
21-
"Tell user to set admins in `AstrBot WebUI -> Config -> General Config` by adding their user ID to the admins list if they need this feature."
22-
f"User's ID is: {context.context.event.get_sender_id()}. User's ID can be found by using /sid command."
23-
)
24-
return None
10+
from .permissions import check_admin_permission
2511

2612

2713
@dataclass
@@ -61,7 +47,7 @@ async def call(
6147
background: bool = False,
6248
env: dict = {},
6349
) -> ToolExecResult:
64-
if permission_error := _check_admin_permission(context):
50+
if permission_error := check_admin_permission(context, "Shell execution"):
6551
return permission_error
6652

6753
if self.is_local:

0 commit comments

Comments
 (0)