From 5d381c23e09589aa0af3a71b96a7c44858c7960b Mon Sep 17 00:00:00 2001 From: johnlin Date: Sat, 4 Apr 2026 13:29:23 +0800 Subject: [PATCH 1/4] feat: add local shell skill support via ShellTool Agents can now load local skill directories (e.g. ~/.claude/skills/) via the shellSkills config key. Each skill's description is auto-read from its SKILL.md frontmatter. A 30s-timeout async subprocess executor handles command execution, respecting per-request timeout_ms when provided. Co-Authored-By: Claude Sonnet 4.6 --- bot/agents.py | 82 +++++++++++++++++++++- servers_config.example.json | 16 ++++- tests/test_agents.py | 133 ++++++++++++++++++++++++++++++++++++ 3 files changed, 228 insertions(+), 3 deletions(-) diff --git a/bot/agents.py b/bot/agents.py index b897440..2f56953 100644 --- a/bot/agents.py +++ b/bot/agents.py @@ -3,10 +3,13 @@ import asyncio import logging import os +from pathlib import Path from typing import Any from agents import Agent from agents import Runner +from agents import ShellTool +from agents import ShellToolLocalSkill from agents import TResponseInputItem from agents.mcp import MCPServerStdio from agents.mcp import MCPServerStreamableHttp @@ -24,6 +27,7 @@ MAX_TURNS = 25 MCP_SESSION_TIMEOUT_SECONDS = 30.0 +SHELL_TIMEOUT = 30.0 def _get_model() -> OpenAIResponsesModel | OpenAIChatCompletionsModel: @@ -51,13 +55,71 @@ def _get_model() -> OpenAIResponsesModel | OpenAIChatCompletionsModel: return OpenAIResponsesModel(model=model_name, openai_client=client) +def _read_skill_description(skill_dir: Path) -> str: + """Read the description field from a skill's SKILL.md frontmatter. + + Returns an empty string if the file is missing or has no description. + """ + skill_md = skill_dir / "SKILL.md" + try: + content = skill_md.read_text(encoding="utf-8") + except FileNotFoundError: + logging.warning("SKILL.md not found in %s", skill_dir) + return "" + + # Parse YAML frontmatter between --- delimiters + if not content.startswith("---"): + return "" + end = content.find("\n---", 3) + if end == -1: + return "" + frontmatter = content[3:end] + for line in frontmatter.splitlines(): + if line.startswith("description:"): + return line[len("description:"):].strip() + return "" + + +async def _shell_executor(request: Any) -> str: + """Execute a shell command from a LocalShellCommandRequest. + + Uses timeout_ms from the request if provided, otherwise falls back + to SHELL_TIMEOUT seconds. + """ + action = request.data.action + command: list[str] = action.command + env: dict[str, str] = action.env or {} + cwd: str | None = action.working_directory + timeout_ms: int | None = action.timeout_ms + + timeout = (timeout_ms / 1000.0) if timeout_ms is not None else SHELL_TIMEOUT + + merged_env = {**os.environ, **env} + + proc = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + env=merged_env, + cwd=cwd, + ) + try: + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout) + return stdout.decode("utf-8", errors="replace") + except TimeoutError: + proc.kill() + await proc.communicate() + return f"Command timed out after {timeout}s" + + class OpenAIAgent: - """A wrapper for OpenAI Agent with MCP server support.""" + """A wrapper for OpenAI Agent with MCP server and local shell skill support.""" def __init__( self, name: str, mcp_servers: list | None = None, + tools: list | None = None, instructions: str = DEFAULT_INSTRUCTIONS, ) -> None: self.agent = Agent( @@ -65,6 +127,7 @@ def __init__( instructions=instructions, model=_get_model(), mcp_servers=(mcp_servers if mcp_servers is not None else []), + tools=(tools if tools is not None else []), ) self.name = name self._conversations: dict[int, list[TResponseInputItem]] = {} @@ -120,8 +183,23 @@ def from_dict(cls, name: str, config: dict[str, Any]) -> OpenAIAgent: }, ) ) + + tools: list[Any] = [] + skill_configs = config.get("shellSkills", []) + if skill_configs: + skills: list[ShellToolLocalSkill] = [] + for skill_cfg in skill_configs: + skill_path = Path(os.path.expanduser(skill_cfg["path"])) + description = _read_skill_description(skill_path) + skills.append(ShellToolLocalSkill( + name=skill_cfg["name"], + description=description, + path=str(skill_path), + )) + tools.append(ShellTool(executor=_shell_executor, environment={"type": "local", "skills": skills})) + instructions = config.get("instructions", DEFAULT_INSTRUCTIONS) - return cls(name, mcp_servers, instructions=instructions) + return cls(name, mcp_servers, tools=tools, instructions=instructions) async def connect(self) -> None: for mcp_server in self.agent.mcp_servers: diff --git a/servers_config.example.json b/servers_config.example.json index 3b8c69a..c59a22e 100644 --- a/servers_config.example.json +++ b/servers_config.example.json @@ -5,5 +5,19 @@ "command": "uvx", "args": ["yfmcp"] } - } + }, + "shellSkills": [ + { + "name": "obsidian-cli", + "path": "~/.claude/skills/obsidian-cli" + }, + { + "name": "obsidian-markdown", + "path": "~/.claude/skills/obsidian-markdown" + }, + { + "name": "obsidian-bases", + "path": "~/.claude/skills/obsidian-bases" + } + ] } diff --git a/tests/test_agents.py b/tests/test_agents.py index ef6915c..13eb289 100644 --- a/tests/test_agents.py +++ b/tests/test_agents.py @@ -5,14 +5,17 @@ from unittest.mock import patch import pytest +from agents import ShellTool from agents.models.interface import Model from agents.models.openai_chatcompletions import OpenAIChatCompletionsModel from agents.models.openai_responses import OpenAIResponsesModel from bot.agents import DEFAULT_INSTRUCTIONS from bot.agents import MAX_TURNS +from bot.agents import SHELL_TIMEOUT from bot.agents import OpenAIAgent from bot.agents import _get_model +from bot.agents import _shell_executor @pytest.fixture(autouse=True) @@ -217,3 +220,133 @@ def test_no_truncation_when_under_limit(self): msgs = agent.get_messages(chat_id=100) user_msgs = [m for m in msgs if m["role"] == "user"] assert len(user_msgs) == 3 + + +def _make_shell_request( + command: list[str], + env: dict | None = None, + cwd: str | None = None, + timeout_ms: int | None = None, +): + """Build a mock LocalShellCommandRequest.""" + action = MagicMock() + action.command = command + action.env = env or {} + action.working_directory = cwd + action.timeout_ms = timeout_ms + request = MagicMock() + request.data.action = action + return request + + +class TestShellExecutor: + def test_constant_is_30_seconds(self): + assert SHELL_TIMEOUT == 30.0 + + @pytest.mark.anyio + async def test_returns_stdout(self): + request = _make_shell_request(["echo", "hello"]) + result = await _shell_executor(request) + assert "hello" in result + + @pytest.mark.anyio + async def test_stderr_merged_into_output(self): + request = _make_shell_request(["bash", "-c", "echo err >&2"]) + result = await _shell_executor(request) + assert "err" in result + + @pytest.mark.anyio + async def test_uses_working_directory(self): + request = _make_shell_request(["pwd"], cwd="/tmp") + result = await _shell_executor(request) + assert "/tmp" in result + + @pytest.mark.anyio + async def test_times_out_and_returns_message(self, monkeypatch): + monkeypatch.setattr("bot.agents.SHELL_TIMEOUT", 0.05) + request = _make_shell_request(["sleep", "10"]) + result = await _shell_executor(request) + assert "timed out" in result.lower() + + @pytest.mark.anyio + async def test_timeout_ms_from_request_overrides_default(self, monkeypatch): + monkeypatch.setattr("bot.agents.SHELL_TIMEOUT", 30.0) + # 50ms timeout from request — sleep 10s should time out + request = _make_shell_request(["sleep", "10"], timeout_ms=50) + result = await _shell_executor(request) + assert "timed out" in result.lower() + + +class TestFromDictShellSkills: + def test_no_shell_skills_when_config_missing(self): + agent = OpenAIAgent.from_dict("test", {"mcpServers": {}}) + shell_tools = [t for t in agent.agent.tools if isinstance(t, ShellTool)] + assert len(shell_tools) == 0 + + def test_shell_tool_added_when_skills_configured(self, tmp_path): + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text("---\nname: my-skill\ndescription: A test skill\n---\n") + + config = { + "mcpServers": {}, + "shellSkills": [{"name": "my-skill", "path": str(skill_dir)}], + } + agent = OpenAIAgent.from_dict("test", config) + shell_tools = [t for t in agent.agent.tools if isinstance(t, ShellTool)] + assert len(shell_tools) == 1 + + def test_skill_description_read_from_skill_md(self, tmp_path): + skill_dir = tmp_path / "obs" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + "---\nname: obs\ndescription: Interact with Obsidian\n---\n# Content\n" + ) + config = { + "mcpServers": {}, + "shellSkills": [{"name": "obs", "path": str(skill_dir)}], + } + agent = OpenAIAgent.from_dict("test", config) + shell_tool = next(t for t in agent.agent.tools if isinstance(t, ShellTool)) + skill = shell_tool.environment["skills"][0] + assert skill["description"] == "Interact with Obsidian" + + def test_tilde_expanded_in_path(self, tmp_path, monkeypatch): + skill_dir = tmp_path / "skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text("---\nname: s\ndescription: d\n---\n") + monkeypatch.setenv("HOME", str(tmp_path)) + + config = { + "mcpServers": {}, + "shellSkills": [{"name": "s", "path": "~/skill"}], + } + agent = OpenAIAgent.from_dict("test", config) + shell_tool = next(t for t in agent.agent.tools if isinstance(t, ShellTool)) + assert shell_tool.environment["skills"][0]["path"] == str(skill_dir) + + def test_multiple_skills_all_mounted(self, tmp_path): + skills = [] + for name in ["skill-a", "skill-b"]: + d = tmp_path / name + d.mkdir() + (d / "SKILL.md").write_text(f"---\nname: {name}\ndescription: desc\n---\n") + skills.append({"name": name, "path": str(d)}) + + agent = OpenAIAgent.from_dict("test", {"mcpServers": {}, "shellSkills": skills}) + shell_tool = next(t for t in agent.agent.tools if isinstance(t, ShellTool)) + assert len(shell_tool.environment["skills"]) == 2 + + def test_mcp_servers_and_shell_skills_coexist(self, tmp_path): + skill_dir = tmp_path / "s" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text("---\nname: s\ndescription: d\n---\n") + + config = { + "mcpServers": {"my-mcp": {"command": "uvx", "args": ["something"]}}, + "shellSkills": [{"name": "s", "path": str(skill_dir)}], + } + agent = OpenAIAgent.from_dict("test", config) + assert len(agent.agent.mcp_servers) == 1 + shell_tools = [t for t in agent.agent.tools if isinstance(t, ShellTool)] + assert len(shell_tools) == 1 From 653752b126e76d4856d89d25a3e14e4b80320a7e Mon Sep 17 00:00:00 2001 From: johnlin Date: Mon, 6 Apr 2026 14:37:33 +0800 Subject: [PATCH 2/4] refactor: auto-discover local shell skills from ./skills/ - Skills are now loaded from /skills/*/SKILL.md at agent init instead of being listed in servers_config.json. - Skill name is taken from the directory name; description is parsed from the SKILL.md YAML frontmatter. - Rewrote _shell_executor to match the SDK ShellCommandRequest API: iterate action.commands and use create_subprocess_shell. - Removed MagicMock-based executor tests; kept config-to-tool mapping tests against a monkeypatched SKILLS_DIR. - Added skills/.gitignore so users can drop their own skills into the directory without polluting the repo. --- bot/agents.py | 106 +++++++++++++-------------- servers_config.example.json | 16 +---- skills/.gitignore | 4 ++ tests/test_agents.py | 138 +++++++++--------------------------- 4 files changed, 90 insertions(+), 174 deletions(-) create mode 100644 skills/.gitignore diff --git a/bot/agents.py b/bot/agents.py index b1bf4ad..48f8b9e 100644 --- a/bot/agents.py +++ b/bot/agents.py @@ -9,7 +9,6 @@ from agents import Agent from agents import Runner from agents import ShellTool -from agents import ShellToolLocalSkill from agents import TResponseInputItem from agents.mcp import MCPServerStdio from agents.mcp import MCPServerStreamableHttp @@ -28,6 +27,7 @@ MAX_TURNS = 10 MCP_SESSION_TIMEOUT_SECONDS = 30.0 SHELL_TIMEOUT = 30.0 +SKILLS_DIR = Path(__file__).resolve().parent.parent / "skills" set_tracing_disabled(True) @@ -51,61 +51,68 @@ def _get_model() -> OpenAIResponsesModel | OpenAIChatCompletionsModel: return OpenAIResponsesModel(model=model_name, openai_client=client) -def _read_skill_description(skill_dir: Path) -> str: - """Read the description field from a skill's SKILL.md frontmatter. - - Returns an empty string if the file is missing or has no description. - """ - skill_md = skill_dir / "SKILL.md" - try: - content = skill_md.read_text(encoding="utf-8") - except FileNotFoundError: - logging.warning("SKILL.md not found in %s", skill_dir) - return "" - - # Parse YAML frontmatter between --- delimiters +def _parse_skill_description(content: str) -> str: + """Return the description field from a SKILL.md YAML frontmatter, or "".""" if not content.startswith("---"): return "" end = content.find("\n---", 3) if end == -1: return "" - frontmatter = content[3:end] - for line in frontmatter.splitlines(): + for line in content[3:end].splitlines(): if line.startswith("description:"): return line[len("description:") :].strip() return "" +def _load_shell_skills() -> list[dict[str, str]]: + """Discover local shell skills under SKILLS_DIR. + + Each immediate subdirectory of SKILLS_DIR containing a SKILL.md is mounted + as a ShellToolLocalSkill. The skill name is the directory name; the + description is read from the SKILL.md YAML frontmatter. + """ + if not SKILLS_DIR.is_dir(): + return [] + skills: list[dict[str, str]] = [] + for skill_dir in sorted(SKILLS_DIR.iterdir()): + skill_md = skill_dir / "SKILL.md" + if not skill_dir.is_dir() or not skill_md.is_file(): + continue + skills.append( + { + "name": skill_dir.name, + "description": _parse_skill_description(skill_md.read_text(encoding="utf-8")), + "path": str(skill_dir), + } + ) + return skills + + async def _shell_executor(request: Any) -> str: - """Execute a shell command from a LocalShellCommandRequest. + """Run each shell command from the request and return combined output. - Uses timeout_ms from the request if provided, otherwise falls back - to SHELL_TIMEOUT seconds. + Honours action.timeout_ms when set, otherwise falls back to SHELL_TIMEOUT. + stderr is merged into stdout for simplicity. """ action = request.data.action - command: list[str] = action.command - env: dict[str, str] = action.env or {} - cwd: str | None = action.working_directory - timeout_ms: int | None = action.timeout_ms - - timeout = (timeout_ms / 1000.0) if timeout_ms is not None else SHELL_TIMEOUT - - merged_env = {**os.environ, **env} - - proc = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - env=merged_env, - cwd=cwd, - ) - try: - stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout) - return stdout.decode("utf-8", errors="replace") - except TimeoutError: - proc.kill() - await proc.communicate() - return f"Command timed out after {timeout}s" + timeout = (action.timeout_ms / 1000.0) if action.timeout_ms else SHELL_TIMEOUT + + outputs: list[str] = [] + for command in action.commands: + proc = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + try: + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=timeout) + outputs.append(stdout.decode("utf-8", errors="replace")) + except TimeoutError: + proc.kill() + await proc.communicate() + outputs.append(f"Command timed out after {timeout}s: {command}") + break + return "\n".join(outputs) class OpenAIAgent: @@ -181,19 +188,8 @@ def from_dict(cls, name: str, config: dict[str, Any]) -> OpenAIAgent: ) tools: list[Any] = [] - skill_configs = config.get("shellSkills", []) - if skill_configs: - skills: list[ShellToolLocalSkill] = [] - for skill_cfg in skill_configs: - skill_path = Path(os.path.expanduser(skill_cfg["path"])) - description = _read_skill_description(skill_path) - skills.append( - ShellToolLocalSkill( - name=skill_cfg["name"], - description=description, - path=str(skill_path), - ) - ) + skills = _load_shell_skills() + if skills: tools.append(ShellTool(executor=_shell_executor, environment={"type": "local", "skills": skills})) instructions = config.get("instructions", DEFAULT_INSTRUCTIONS) diff --git a/servers_config.example.json b/servers_config.example.json index c59a22e..3b8c69a 100644 --- a/servers_config.example.json +++ b/servers_config.example.json @@ -5,19 +5,5 @@ "command": "uvx", "args": ["yfmcp"] } - }, - "shellSkills": [ - { - "name": "obsidian-cli", - "path": "~/.claude/skills/obsidian-cli" - }, - { - "name": "obsidian-markdown", - "path": "~/.claude/skills/obsidian-markdown" - }, - { - "name": "obsidian-bases", - "path": "~/.claude/skills/obsidian-bases" - } - ] + } } diff --git a/skills/.gitignore b/skills/.gitignore new file mode 100644 index 0000000..1425c6f --- /dev/null +++ b/skills/.gitignore @@ -0,0 +1,4 @@ +# Users drop their own shell skills into this directory; the bot auto-loads +# any subdirectory containing a SKILL.md. Skill content is not tracked. +* +!.gitignore diff --git a/tests/test_agents.py b/tests/test_agents.py index 44bd5ee..fc07dee 100644 --- a/tests/test_agents.py +++ b/tests/test_agents.py @@ -14,16 +14,17 @@ from bot.agents import DEFAULT_INSTRUCTIONS from bot.agents import MAX_TURNS -from bot.agents import SHELL_TIMEOUT from bot.agents import OpenAIAgent from bot.agents import _get_model -from bot.agents import _shell_executor @pytest.fixture(autouse=True) -def _mock_model(monkeypatch): - """Prevent tests from constructing a real OpenAI client.""" +def _mock_model(monkeypatch, tmp_path_factory): + """Prevent tests from constructing a real OpenAI client and isolate skills.""" monkeypatch.setattr("bot.agents._get_model", lambda: create_autospec(Model)) + # Point SKILLS_DIR at an empty directory so tests do not auto-load the + # real ./skills/ on disk. Tests that need skills can override this. + monkeypatch.setattr("bot.agents.SKILLS_DIR", tmp_path_factory.mktemp("empty_skills")) class TestGetModel: @@ -238,128 +239,57 @@ def test_no_truncation_when_under_limit(self): assert len(user_msgs) == 3 -def _make_shell_request( - command: list[str], - env: dict | None = None, - cwd: str | None = None, - timeout_ms: int | None = None, -): - """Build a mock LocalShellCommandRequest.""" - action = MagicMock() - action.command = command - action.env = env or {} - action.working_directory = cwd - action.timeout_ms = timeout_ms - request = MagicMock() - request.data.action = action - return request - - -class TestShellExecutor: - def test_constant_is_30_seconds(self): - assert SHELL_TIMEOUT == 30.0 - - @pytest.mark.anyio - async def test_returns_stdout(self): - request = _make_shell_request(["echo", "hello"]) - result = await _shell_executor(request) - assert "hello" in result - - @pytest.mark.anyio - async def test_stderr_merged_into_output(self): - request = _make_shell_request(["bash", "-c", "echo err >&2"]) - result = await _shell_executor(request) - assert "err" in result - - @pytest.mark.anyio - async def test_uses_working_directory(self): - request = _make_shell_request(["pwd"], cwd="/tmp") - result = await _shell_executor(request) - assert "/tmp" in result - - @pytest.mark.anyio - async def test_times_out_and_returns_message(self, monkeypatch): - monkeypatch.setattr("bot.agents.SHELL_TIMEOUT", 0.05) - request = _make_shell_request(["sleep", "10"]) - result = await _shell_executor(request) - assert "timed out" in result.lower() - - @pytest.mark.anyio - async def test_timeout_ms_from_request_overrides_default(self, monkeypatch): - monkeypatch.setattr("bot.agents.SHELL_TIMEOUT", 30.0) - # 50ms timeout from request — sleep 10s should time out - request = _make_shell_request(["sleep", "10"], timeout_ms=50) - result = await _shell_executor(request) - assert "timed out" in result.lower() - - -class TestFromDictShellSkills: - def test_no_shell_skills_when_config_missing(self): +class TestLoadShellSkills: + def test_no_shell_tool_when_skills_dir_missing(self, tmp_path, monkeypatch): + monkeypatch.setattr("bot.agents.SKILLS_DIR", tmp_path / "nonexistent") agent = OpenAIAgent.from_dict("test", {"mcpServers": {}}) shell_tools = [t for t in agent.agent.tools if isinstance(t, ShellTool)] assert len(shell_tools) == 0 - def test_shell_tool_added_when_skills_configured(self, tmp_path): + def test_shell_tool_added_when_skill_found(self, tmp_path, monkeypatch): skill_dir = tmp_path / "my-skill" skill_dir.mkdir() (skill_dir / "SKILL.md").write_text("---\nname: my-skill\ndescription: A test skill\n---\n") + monkeypatch.setattr("bot.agents.SKILLS_DIR", tmp_path) - config = { - "mcpServers": {}, - "shellSkills": [{"name": "my-skill", "path": str(skill_dir)}], - } - agent = OpenAIAgent.from_dict("test", config) - shell_tools = [t for t in agent.agent.tools if isinstance(t, ShellTool)] - assert len(shell_tools) == 1 - - def test_skill_description_read_from_skill_md(self, tmp_path): - skill_dir = tmp_path / "obs" - skill_dir.mkdir() - (skill_dir / "SKILL.md").write_text("---\nname: obs\ndescription: Interact with Obsidian\n---\n# Content\n") - config = { - "mcpServers": {}, - "shellSkills": [{"name": "obs", "path": str(skill_dir)}], - } - agent = OpenAIAgent.from_dict("test", config) + agent = OpenAIAgent.from_dict("test", {"mcpServers": {}}) shell_tool = next(t for t in agent.agent.tools if isinstance(t, ShellTool)) skill = shell_tool.environment["skills"][0] - assert skill["description"] == "Interact with Obsidian" + assert skill["name"] == "my-skill" + assert skill["description"] == "A test skill" + assert skill["path"] == str(skill_dir) - def test_tilde_expanded_in_path(self, tmp_path, monkeypatch): - skill_dir = tmp_path / "skill" - skill_dir.mkdir() - (skill_dir / "SKILL.md").write_text("---\nname: s\ndescription: d\n---\n") - monkeypatch.setenv("HOME", str(tmp_path)) - - config = { - "mcpServers": {}, - "shellSkills": [{"name": "s", "path": "~/skill"}], - } - agent = OpenAIAgent.from_dict("test", config) - shell_tool = next(t for t in agent.agent.tools if isinstance(t, ShellTool)) - assert shell_tool.environment["skills"][0]["path"] == str(skill_dir) - - def test_multiple_skills_all_mounted(self, tmp_path): - skills = [] + def test_multiple_skills_all_mounted(self, tmp_path, monkeypatch): for name in ["skill-a", "skill-b"]: d = tmp_path / name d.mkdir() - (d / "SKILL.md").write_text(f"---\nname: {name}\ndescription: desc\n---\n") - skills.append({"name": name, "path": str(d)}) + (d / "SKILL.md").write_text(f"---\nname: {name}\ndescription: desc {name}\n---\n") + monkeypatch.setattr("bot.agents.SKILLS_DIR", tmp_path) - agent = OpenAIAgent.from_dict("test", {"mcpServers": {}, "shellSkills": skills}) + agent = OpenAIAgent.from_dict("test", {"mcpServers": {}}) shell_tool = next(t for t in agent.agent.tools if isinstance(t, ShellTool)) assert len(shell_tool.environment["skills"]) == 2 - def test_mcp_servers_and_shell_skills_coexist(self, tmp_path): + def test_directory_without_skill_md_is_skipped(self, tmp_path, monkeypatch): + (tmp_path / "not-a-skill").mkdir() + good = tmp_path / "real-skill" + good.mkdir() + (good / "SKILL.md").write_text("---\nname: real-skill\ndescription: d\n---\n") + monkeypatch.setattr("bot.agents.SKILLS_DIR", tmp_path) + + agent = OpenAIAgent.from_dict("test", {"mcpServers": {}}) + shell_tool = next(t for t in agent.agent.tools if isinstance(t, ShellTool)) + skills = shell_tool.environment["skills"] + assert len(skills) == 1 + assert skills[0]["name"] == "real-skill" + + def test_mcp_servers_and_shell_skills_coexist(self, tmp_path, monkeypatch): skill_dir = tmp_path / "s" skill_dir.mkdir() (skill_dir / "SKILL.md").write_text("---\nname: s\ndescription: d\n---\n") + monkeypatch.setattr("bot.agents.SKILLS_DIR", tmp_path) - config = { - "mcpServers": {"my-mcp": {"command": "uvx", "args": ["something"]}}, - "shellSkills": [{"name": "s", "path": str(skill_dir)}], - } + config = {"mcpServers": {"my-mcp": {"command": "uvx", "args": ["something"]}}} agent = OpenAIAgent.from_dict("test", config) assert len(agent.agent.mcp_servers) == 1 shell_tools = [t for t in agent.agent.tools if isinstance(t, ShellTool)] From 7eaf34a7eafdafea259818fa8e672a945383f429 Mon Sep 17 00:00:00 2001 From: johnlin Date: Mon, 6 Apr 2026 14:53:22 +0800 Subject: [PATCH 3/4] fix: type ShellTool environment with SDK TypedDicts mypy could not narrow the dict literal {"type": "local", "skills": ...} to ShellToolLocalEnvironment; use the TypedDict constructors from the agents SDK so the types are explicit. --- bot/agents.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/bot/agents.py b/bot/agents.py index 48f8b9e..fe6b519 100644 --- a/bot/agents.py +++ b/bot/agents.py @@ -9,6 +9,8 @@ from agents import Agent from agents import Runner from agents import ShellTool +from agents import ShellToolLocalEnvironment +from agents import ShellToolLocalSkill from agents import TResponseInputItem from agents.mcp import MCPServerStdio from agents.mcp import MCPServerStreamableHttp @@ -64,7 +66,7 @@ def _parse_skill_description(content: str) -> str: return "" -def _load_shell_skills() -> list[dict[str, str]]: +def _load_shell_skills() -> list[ShellToolLocalSkill]: """Discover local shell skills under SKILLS_DIR. Each immediate subdirectory of SKILLS_DIR containing a SKILL.md is mounted @@ -73,17 +75,17 @@ def _load_shell_skills() -> list[dict[str, str]]: """ if not SKILLS_DIR.is_dir(): return [] - skills: list[dict[str, str]] = [] + skills: list[ShellToolLocalSkill] = [] for skill_dir in sorted(SKILLS_DIR.iterdir()): skill_md = skill_dir / "SKILL.md" if not skill_dir.is_dir() or not skill_md.is_file(): continue skills.append( - { - "name": skill_dir.name, - "description": _parse_skill_description(skill_md.read_text(encoding="utf-8")), - "path": str(skill_dir), - } + ShellToolLocalSkill( + name=skill_dir.name, + description=_parse_skill_description(skill_md.read_text(encoding="utf-8")), + path=str(skill_dir), + ) ) return skills @@ -190,7 +192,8 @@ def from_dict(cls, name: str, config: dict[str, Any]) -> OpenAIAgent: tools: list[Any] = [] skills = _load_shell_skills() if skills: - tools.append(ShellTool(executor=_shell_executor, environment={"type": "local", "skills": skills})) + environment = ShellToolLocalEnvironment(type="local", skills=skills) + tools.append(ShellTool(executor=_shell_executor, environment=environment)) instructions = config.get("instructions", DEFAULT_INSTRUCTIONS) return cls(name, mcp_servers, tools=tools, instructions=instructions) From 7d10b6c87cb57f12ee55021ca5f9f6dcfaeb1fb0 Mon Sep 17 00:00:00 2001 From: johnlin Date: Mon, 6 Apr 2026 17:08:19 +0800 Subject: [PATCH 4/4] feat: restrict shell executor to the obsidian CLI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds two layers of defence to _shell_executor so the bot cannot run anything other than the obsidian binary, even when the model emits a crafted command: 1. Allowlist — after shlex.split, the first token of each command must be exactly "obsidian"; otherwise the command is rejected and a one-line explanation is returned to the model. 2. No shell — commands are executed via create_subprocess_exec instead of create_subprocess_shell, so metacharacters like ;, &&, |, $(), and backticks are passed as literal arguments to obsidian rather than being interpreted by a shell. Together these block: - direct invocation of unrelated binaries (rm, curl, ssh, ...) - command chaining via ;, &&, ||, | - command substitution via $(...) and backticks - extra commands smuggled into action.commands as additional list entries (each entry is checked independently) Tests use SimpleNamespace fixtures (no MagicMock) and include an end-to-end check with a fake obsidian script on PATH that proves metacharacters are not shell-interpreted. --- bot/agents.py | 29 ++++++++++++++--- tests/test_agents.py | 76 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 4 deletions(-) diff --git a/bot/agents.py b/bot/agents.py index fe6b519..3bb46ee 100644 --- a/bot/agents.py +++ b/bot/agents.py @@ -3,6 +3,7 @@ import asyncio import logging import os +import shlex from pathlib import Path from typing import Any @@ -93,16 +94,36 @@ def _load_shell_skills() -> list[ShellToolLocalSkill]: async def _shell_executor(request: Any) -> str: """Run each shell command from the request and return combined output. - Honours action.timeout_ms when set, otherwise falls back to SHELL_TIMEOUT. - stderr is merged into stdout for simplicity. + Two layers of defence keep the bot from running anything other than the + ``obsidian`` CLI: + + 1. **Allowlist** — after ``shlex.split``, the first token of each command + must be exactly ``obsidian``. Anything else is rejected without + execution. This blocks attempts to invoke unrelated binaries. + 2. **No shell** — commands are executed via ``create_subprocess_exec`` + (not ``_shell``), so shell metacharacters like ``;``, ``&&``, ``|``, + ``$()``, and backticks are passed as literal arguments to ``obsidian`` + instead of being interpreted. This blocks command-chaining injection. + + Honours ``action.timeout_ms`` when set, otherwise falls back to + ``SHELL_TIMEOUT``. stderr is merged into stdout for simplicity. """ action = request.data.action timeout = (action.timeout_ms / 1000.0) if action.timeout_ms else SHELL_TIMEOUT outputs: list[str] = [] for command in action.commands: - proc = await asyncio.create_subprocess_shell( - command, + try: + tokens = shlex.split(command) + except ValueError as e: + outputs.append(f"rejected: cannot parse command ({e}): {command}") + continue + if not tokens or tokens[0] != "obsidian": + outputs.append(f"rejected: only the 'obsidian' CLI is allowed: {command}") + continue + + proc = await asyncio.create_subprocess_exec( + *tokens, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) diff --git a/tests/test_agents.py b/tests/test_agents.py index fc07dee..906af83 100644 --- a/tests/test_agents.py +++ b/tests/test_agents.py @@ -1,5 +1,6 @@ from __future__ import annotations +from types import SimpleNamespace from unittest.mock import MagicMock from unittest.mock import create_autospec from unittest.mock import patch @@ -16,6 +17,7 @@ from bot.agents import MAX_TURNS from bot.agents import OpenAIAgent from bot.agents import _get_model +from bot.agents import _shell_executor @pytest.fixture(autouse=True) @@ -294,3 +296,77 @@ def test_mcp_servers_and_shell_skills_coexist(self, tmp_path, monkeypatch): assert len(agent.agent.mcp_servers) == 1 shell_tools = [t for t in agent.agent.tools if isinstance(t, ShellTool)] assert len(shell_tools) == 1 + + +def _shell_request(*commands: str) -> SimpleNamespace: + """Build a real (not mocked) ShellCommandRequest-shaped object.""" + return SimpleNamespace( + data=SimpleNamespace( + action=SimpleNamespace(commands=list(commands), timeout_ms=None), + ), + ) + + +class TestShellExecutorAllowlist: + @pytest.mark.anyio + async def test_rejects_non_obsidian_binary(self): + result = await _shell_executor(_shell_request("echo hello")) + assert "rejected" in result.lower() + assert "obsidian" in result.lower() + + @pytest.mark.anyio + async def test_rejects_command_chained_with_semicolon(self, tmp_path): + # If the second command were ever interpreted by a shell, this file + # would be created. The allowlist + exec defence must prevent that. + sentinel = tmp_path / "should_not_exist" + result = await _shell_executor( + _shell_request(f"rm -rf /tmp/foo; touch {sentinel}"), + ) + assert "rejected" in result.lower() + assert not sentinel.exists() + + @pytest.mark.anyio + async def test_rejects_second_command_in_list(self, tmp_path): + # action.commands is a list — every entry must pass the allowlist. + sentinel = tmp_path / "should_not_exist" + result = await _shell_executor( + _shell_request("obsidian help", f"touch {sentinel}"), + ) + assert "rejected" in result.lower() + assert not sentinel.exists() + + @pytest.mark.anyio + async def test_rejects_malformed_quoting(self): + result = await _shell_executor( + _shell_request('obsidian read file="unclosed'), + ) + assert "rejected" in result.lower() + + @pytest.mark.anyio + async def test_metacharacters_are_not_shell_interpreted(self, tmp_path): + # Even if a command starts with `obsidian` and passes the allowlist, + # `&&` must not chain a second command via the shell. Use a fake + # `obsidian` script on PATH to confirm exec receives literal argv. + fake_bin = tmp_path / "bin" + fake_bin.mkdir() + sentinel = tmp_path / "should_not_exist" + fake_obsidian = fake_bin / "obsidian" + fake_obsidian.write_text("#!/bin/sh\necho fake-obsidian got: $*\n") + fake_obsidian.chmod(0o755) + + import os + + old_path = os.environ["PATH"] + os.environ["PATH"] = f"{fake_bin}{os.pathsep}{old_path}" + try: + result = await _shell_executor( + _shell_request(f"obsidian read && touch {sentinel}"), + ) + finally: + os.environ["PATH"] = old_path + + assert not sentinel.exists() + # The fake script echoed everything it received as argv; the `&&` and + # `touch` tokens should appear in its output as literal arguments. + assert "&&" in result + assert "touch" in result