|
19 | 19 | import logging |
20 | 20 | import os |
21 | 21 | from pathlib import Path |
| 22 | +import re |
22 | 23 | import sys |
23 | 24 | from typing import Any |
24 | 25 | from typing import Literal |
@@ -187,8 +188,36 @@ def _load_from_yaml_config( |
187 | 188 | ) + e.args[1:] |
188 | 189 | raise e |
189 | 190 |
|
| 191 | + _VALID_AGENT_NAME_RE = re.compile(r"^[a-zA-Z0-9_]+$") |
| 192 | + |
| 193 | + def _validate_agent_name(self, agent_name: str) -> None: |
| 194 | + """Validate agent name to prevent arbitrary module imports.""" |
| 195 | + # Strip the special agent prefix for validation |
| 196 | + if agent_name.startswith("__"): |
| 197 | + name_to_check = agent_name[2:] |
| 198 | + check_dir = os.path.abspath(SPECIAL_AGENTS_DIR) |
| 199 | + else: |
| 200 | + name_to_check = agent_name |
| 201 | + check_dir = self.agents_dir |
| 202 | + |
| 203 | + if not self._VALID_AGENT_NAME_RE.match(name_to_check): |
| 204 | + raise ValueError( |
| 205 | + f"Invalid agent name: {agent_name!r}. Agent names must be valid" |
| 206 | + " Python identifiers (letters, digits, and underscores only)." |
| 207 | + ) |
| 208 | + |
| 209 | + # Verify the agent exists on disk before allowing import |
| 210 | + agent_path = Path(check_dir) / name_to_check |
| 211 | + agent_file = Path(check_dir) / f"{name_to_check}.py" |
| 212 | + if not (agent_path.is_dir() or agent_file.is_file()): |
| 213 | + raise ValueError( |
| 214 | + f"Agent not found: {agent_name!r}. No matching directory or module" |
| 215 | + f" exists in '{os.path.join(check_dir, name_to_check)}'." |
| 216 | + ) |
| 217 | + |
190 | 218 | def _perform_load(self, agent_name: str) -> Union[BaseAgent, App]: |
191 | 219 | """Internal logic to load an agent""" |
| 220 | + self._validate_agent_name(agent_name) |
192 | 221 | # Determine the directory to use for loading |
193 | 222 | if agent_name.startswith("__"): |
194 | 223 | # Special agent: use special agents directory |
@@ -335,18 +364,13 @@ def load_agent(self, agent_name: str) -> Union[BaseAgent, App]: |
335 | 364 | def list_agents(self) -> list[str]: |
336 | 365 | """Lists all agents available in the agent loader (sorted alphabetically).""" |
337 | 366 | base_path = Path.cwd() / self.agents_dir |
338 | | - agent_names = [] |
339 | | - for x in os.listdir(base_path): |
340 | | - if ( |
341 | | - os.path.isdir(os.path.join(base_path, x)) |
342 | | - and not x.startswith(".") |
343 | | - and x != "__pycache__" |
344 | | - ): |
345 | | - try: |
346 | | - self._determine_agent_language(x) |
347 | | - agent_names.append(x) |
348 | | - except ValueError: |
349 | | - continue |
| 367 | + agent_names = [ |
| 368 | + x |
| 369 | + for x in os.listdir(base_path) |
| 370 | + if os.path.isdir(os.path.join(base_path, x)) |
| 371 | + and not x.startswith(".") |
| 372 | + and x != "__pycache__" |
| 373 | + ] |
350 | 374 | agent_names.sort() |
351 | 375 | return agent_names |
352 | 376 |
|
|
0 commit comments