|
1 | 1 | import platform |
2 | 2 | import subprocess |
| 3 | +from typing import Dict, Any |
3 | 4 |
|
| 5 | +from agentmesh.tools.base_tool import BaseTool, ToolResult |
4 | 6 |
|
5 | | -class Terminal: |
6 | | - def open_terminal(self, command): |
7 | | - system = platform.system() |
8 | 7 |
|
9 | | - try: |
10 | | - if system == "Windows": |
11 | | - subprocess.Popen(f'start cmd /k "{command}"', shell=True) |
12 | | - elif system == "Darwin": |
13 | | - applescript = f''' |
14 | | - tell application "Terminal" |
15 | | - activate |
16 | | - do script "{command}" |
17 | | - end tell |
18 | | - ''' |
19 | | - subprocess.run(["osascript", "-e", applescript]) |
20 | | - elif system == "Linux": |
21 | | - subprocess.Popen(f'gnome-terminal -- bash -c "{command}; exec bash"', shell=True) |
22 | | - else: |
23 | | - raise OSError("Unsupported operating system") |
24 | | - return True |
25 | | - except Exception as e: |
26 | | - print(f"Error opening terminal: {str(e)}") |
27 | | - return False |
| 8 | +class Terminal(BaseTool): |
| 9 | + name: str = "terminal" |
| 10 | + description: str = "A tool to run terminal commands on the local system" |
| 11 | + params: dict = { |
| 12 | + "type": "object", |
| 13 | + "properties": { |
| 14 | + "command": { |
| 15 | + "type": "string", |
| 16 | + "description": f"The terminal command to execute which should be valid in {platform.system()} platform" |
| 17 | + } |
| 18 | + }, |
| 19 | + "required": ["command"] |
| 20 | + } |
| 21 | + config: dict = {} |
| 22 | + |
| 23 | + def __init__(self, config=None): |
| 24 | + self.config = config or {} |
| 25 | + # Set of dangerous commands that should be blocked |
| 26 | + self.command_ban_set = {"halt", "poweroff", "shutdown", "reboot", "rm", "kill", |
| 27 | + "exit", "sudo", "su", "userdel", "groupdel", "logout", "alias"} |
| 28 | + |
| 29 | + def execute(self, args: Dict[str, Any]) -> ToolResult: |
| 30 | + """ |
| 31 | + Execute a terminal command safely. |
| 32 | +
|
| 33 | + :param args: Dictionary containing the command to execute |
| 34 | + :return: Result of the command execution |
| 35 | + """ |
| 36 | + command = args.get("command", "").strip() |
| 37 | + |
| 38 | + # Check if the command is safe to execute |
| 39 | + if not self._is_safe_command(command): |
| 40 | + return ToolResult.fail(result=f"Command '{command}' is not allowed for security reasons.") |
28 | 41 |
|
29 | | - def run_command(self, command): |
30 | 42 | try: |
31 | 43 | result = subprocess.run( |
32 | 44 | command, |
33 | 45 | shell=True, |
34 | | - check=True, |
| 46 | + check=True, # Raise exception on non-zero return code |
35 | 47 | stdout=subprocess.PIPE, |
36 | 48 | stderr=subprocess.PIPE, |
37 | | - text=True |
| 49 | + text=True, |
| 50 | + timeout=self.config.get("timeout", 30) |
38 | 51 | ) |
39 | | - return { |
40 | | - "status": "success", |
| 52 | + |
| 53 | + return ToolResult.success({ |
41 | 54 | "stdout": result.stdout, |
42 | 55 | "stderr": result.stderr, |
43 | | - "returncode": result.returncode |
44 | | - } |
| 56 | + "return_code": result.returncode, |
| 57 | + "command": command |
| 58 | + }) |
45 | 59 | except subprocess.CalledProcessError as e: |
46 | | - return { |
47 | | - "status": "error", |
| 60 | + # Preserve the original error handling for CalledProcessError |
| 61 | + return ToolResult.fail({ |
48 | 62 | "stdout": e.stdout, |
49 | 63 | "stderr": e.stderr, |
50 | | - "returncode": e.returncode |
51 | | - } |
| 64 | + "return_code": e.returncode, |
| 65 | + "command": command |
| 66 | + }) |
| 67 | + except subprocess.TimeoutExpired: |
| 68 | + return ToolResult.fail(result=f"Command timed out after {self.config.get('timeout', 20)} seconds.") |
| 69 | + except Exception as e: |
| 70 | + return ToolResult.fail(result=f"Error executing command: {str(e)}") |
| 71 | + |
| 72 | + def _is_safe_command(self, command: str) -> bool: |
| 73 | + """ |
| 74 | + Check if a command is safe to execute. |
| 75 | +
|
| 76 | + :param command: The command to check |
| 77 | + :return: True if the command is safe, False otherwise |
| 78 | + """ |
| 79 | + # Split the command to get the base command |
| 80 | + cmd_parts = command.split() |
| 81 | + if not cmd_parts: |
| 82 | + return False |
| 83 | + |
| 84 | + base_cmd = cmd_parts[0].lower() |
| 85 | + |
| 86 | + # Check if the base command is in the ban list |
| 87 | + if base_cmd in self.command_ban_set: |
| 88 | + return False |
| 89 | + |
| 90 | + # Check for sudo/su commands |
| 91 | + if any(banned in command.lower() for banned in ["sudo ", "su -"]): |
| 92 | + return False |
| 93 | + |
| 94 | + # Check for rm -rf or similar dangerous patterns |
| 95 | + if "rm" in base_cmd and ("-rf" in command or "-r" in command or "-f" in command): |
| 96 | + return False |
| 97 | + |
| 98 | + # Additional security checks can be added here |
| 99 | + |
| 100 | + return True |
0 commit comments