Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/command_system/security_review.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,18 @@ def _security_review_private_prompt(
allowed_tools = parse_slash_command_tools_from_frontmatter(
parsed.frontmatter.get("allowed-tools")
)
# Bridge CommandContext -> ToolContext (only workspace_root/cwd are needed; the
# default permission_context is bypassPermissions, matching the in-process path).
# Bridge CommandContext -> ToolContext. Explicit bypass: this executor only
# runs the frontmatter-allowlisted shell preprocessing for the slash
# command, matching the in-process path (the ToolContext default is no
# longer bypassPermissions — #274).
from src.permissions.types import ToolPermissionContext
from src.tool_system.context import ToolContext # lazy: keep tool_system off import chain

tool_context = ToolContext(workspace_root=context.workspace_root, cwd=context.cwd)
tool_context = ToolContext(
workspace_root=context.workspace_root,
cwd=context.cwd,
permission_context=ToolPermissionContext(mode="bypassPermissions"),
)
executor = make_bash_shell_executor(
tool_context, allowed_tools, slash_command_name="security-review"
)
Expand Down
45 changes: 43 additions & 2 deletions src/permissions/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def check_permissions(
self, tool_input: dict[str, Any], context: Any
) -> PermissionResult: ...

def is_read_only(self, tool_input: dict[str, Any]) -> bool: ...


@runtime_checkable
class RequiresInteractionTool(Protocol):
Expand Down Expand Up @@ -236,7 +238,16 @@ def has_permissions_to_use_tool_inner(
)
try:
tool_permission_result = tool.check_permissions(tool_input, tool_use_context)
except Exception:
except Exception as exc:
# Workspace-allowlist violations are hard structural failures, not
# ask-able prompts: propagate so dispatch callers surface the error
# instead of soft-denying through the ask path (#274). Lazy import:
# permissions/ must not import tool_system/ at module level (the
# DAG direction is tool_system -> permissions).
from src.tool_system.errors import ToolPermissionError

if isinstance(exc, ToolPermissionError):
raise
log.exception("Error in tool.check_permissions for %s", tool.name)

if tool_permission_result.behavior == "deny":
Expand Down Expand Up @@ -308,6 +319,30 @@ def has_permissions_to_use_tool_inner(
decision_reason=RuleDecisionReason(rule=rule),
)

# Read-only tools don't require permission prompts (TS parity:
# read-only tools resolve via checkReadPermissionForTool → allow).
# Runs after deny/ask rules so explicit rules still win, and ONLY when
# the tool's own check_permissions expressed no opinion (passthrough) —
# a read-only tool that returns its own ask (e.g. WebFetch domain
# gating) must keep that ask, exactly as in TS where the read-only
# allow lives inside each tool's checkPermissions.
# Documented divergence: TS asks for out-of-workspace read paths (the
# user can approve a one-off); this port allows here and hard-fails in
# the tool's call via ensure_allowed_path, with
# additional_working_directories as the escape hatch.
if tool_permission_result.behavior == "passthrough":
try:
is_read_only = bool(tool.is_read_only(tool_input))
except Exception:
log.debug("tool.is_read_only failed for %s", tool.name, exc_info=True)
is_read_only = False
if is_read_only:
return PermissionAllowDecision(
behavior="allow",
updated_input=_get_updated_input_or_fallback(tool_permission_result, tool_input),
decision_reason=OtherDecisionReason(reason="Tool is read-only"),
)

if tool_permission_result.behavior == "passthrough":
return _with_default_suggestions(
PermissionAskDecision(
Expand Down Expand Up @@ -428,7 +463,13 @@ def check_rule_based_permissions(
)
try:
tool_permission_result = tool.check_permissions(tool_input, tool_use_context)
except Exception:
except Exception as exc:
# Same contract as the main inner flow: workspace-allowlist
# violations propagate instead of soft-resolving (#274).
from src.tool_system.errors import ToolPermissionError

if isinstance(exc, ToolPermissionError):
raise
log.exception("Error in tool.check_permissions for %s", tool.name)

if tool_permission_result.behavior == "deny":
Expand Down
5 changes: 4 additions & 1 deletion src/tool_system/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ class GlobLimits:
@dataclass
class ToolContext:
workspace_root: Path
# "default" so the workspace-root allowlist is on unless a caller
# explicitly opts into bypass — an implicit bypassPermissions default
# silently disabled the sandbox for every SDK/embedder caller (#274).
permission_context: ToolPermissionContext = field(
default_factory=lambda: ToolPermissionContext(mode="bypassPermissions")
default_factory=lambda: ToolPermissionContext(mode="default")
)
cwd: Path | None = None
read_file_fingerprints: dict[Path, tuple[int, int] | tuple[int, int, bool]] = field(default_factory=dict)
Expand Down
9 changes: 4 additions & 5 deletions src/tool_system/tools/edit.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ..build_tool import Tool, build_tool
from ..context import ToolContext
from ..diff_utils import unified_diff_hunks
from ..errors import ToolInputError, ToolPermissionError
from ..errors import ToolInputError
from ..protocol import ToolResult
from src.permissions.types import (
PermissionAskDecision,
Expand Down Expand Up @@ -187,10 +187,9 @@ def _check_permissions(tool_input: dict[str, Any], context: ToolContext) -> Perm
file_path = tool_input.get("file_path")
if not isinstance(file_path, str):
return PermissionPassthroughResult()
try:
path = context.ensure_allowed_path(file_path)
except ToolPermissionError:
return PermissionPassthroughResult()
# A path outside the workspace allowlist raises ToolPermissionError and
# propagates (hard structural failure, not an ask-able prompt) — see #274.
path = context.ensure_allowed_path(file_path)
if path.suffix.lower() in {".md", ".markdown"} and not context.allow_docs:
return PermissionAskDecision(
message="Editing documentation files is blocked unless allow_docs is enabled",
Expand Down
15 changes: 13 additions & 2 deletions src/tool_system/tools/web_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from ..errors import ToolInputError, ToolPermissionError
from ..protocol import ToolResult
from src.permissions.types import (
PermissionAllowDecision,
PermissionAskDecision,
PermissionPassthroughResult,
PermissionResult,
)
Expand Down Expand Up @@ -416,18 +418,27 @@ def _is_preapproved(hostname: str, pathname: str) -> bool:
# -- Permission Check ----------------------------------------------------------

def _check_permissions(tool_input: dict[str, Any], context: ToolContext) -> PermissionResult:
"""Domain gating (TS parity): preapproved hosts auto-allow; everything
else asks. Returning a real ask (not passthrough) keeps WebFetch out of
the central read-only auto-allow — an arbitrary-URL fetch is promptable,
unlike a local read (#274)."""
url = tool_input.get("url", "")
if not isinstance(url, str) or not url:
# Passthrough resolves to read-only auto-allow, which is inert here:
# _web_fetch_call raises ToolInputError on an empty url before any
# network I/O (and dispatch schema validation rejects a missing one).
return PermissionPassthroughResult()
try:
parsed = urllib.parse.urlparse(url)
hostname = parsed.hostname or ""
pathname = parsed.path or "/"
if _is_preapproved(hostname, pathname):
return PermissionPassthroughResult()
return PermissionAllowDecision(behavior="allow", updated_input=tool_input)
except Exception:
pass
return PermissionPassthroughResult()
return PermissionAskDecision(
message=f"Claude wants to fetch {url}. Allow?",
)


# -- Result Mapping ------------------------------------------------------------
Expand Down
9 changes: 4 additions & 5 deletions src/tool_system/tools/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,10 @@ def _check_permissions(tool_input: dict[str, Any], context: ToolContext) -> Perm
if _is_auto_memory_write(file_path):
return PermissionPassthroughResult()

# Path is already expanded by backfill_observable_input
try:
path = context.ensure_allowed_path(file_path)
except ToolPermissionError:
return PermissionPassthroughResult()
# Path is already expanded by backfill_observable_input.
# A path outside the workspace allowlist raises ToolPermissionError and
# propagates (hard structural failure, not an ask-able prompt) — see #274.
path = context.ensure_allowed_path(file_path)

if path.suffix.lower() in {".md", ".markdown"} and not context.allow_docs:
return PermissionAskDecision(
Expand Down
11 changes: 9 additions & 2 deletions tests/integration/test_query_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from unittest.mock import MagicMock

from src.providers.base import ChatResponse
from src.permissions.types import ToolPermissionContext
from src.tool_system.context import ToolContext
from src.tool_system.defaults import build_default_registry
from src.types.content_blocks import TextBlock, ToolResultBlock, ToolUseBlock
Expand All @@ -24,7 +25,10 @@ def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
self.workspace = Path(self.temp_dir.name)
self.registry = build_default_registry()
self.context = ToolContext(workspace_root=self.workspace)
self.context = ToolContext(
workspace_root=self.workspace,
permission_context=ToolPermissionContext(mode="bypassPermissions"),
)

def tearDown(self):
self.temp_dir.cleanup()
Expand Down Expand Up @@ -224,7 +228,10 @@ def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
self.workspace = Path(self.temp_dir.name)
self.registry = build_default_registry()
self.context = ToolContext(workspace_root=self.workspace)
self.context = ToolContext(
workspace_root=self.workspace,
permission_context=ToolPermissionContext(mode="bypassPermissions"),
)

def tearDown(self):
self.temp_dir.cleanup()
Expand Down
10 changes: 9 additions & 1 deletion tests/parity/test_e2e_edit_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@
import unittest
from pathlib import Path

from src.permissions.types import ToolPermissionContext
from src.permissions.types import PermissionAskReply, ToolPermissionContext
from src.tool_system.context import ToolContext
from src.tool_system.defaults import build_default_registry
from src.tool_system.errors import ToolPermissionError
from src.tool_system.protocol import ToolCall


def _allow_all_handler(_request):
"""Auto-approve permission prompts — the default-mode workspace
allowlist stays live (outside-root paths still raise before any ask)."""
return PermissionAskReply(behavior="allow")


class TestE2EEditFlow(unittest.TestCase):
"""End-to-end edit tool flow."""

Expand All @@ -24,6 +30,7 @@ def setUp(self) -> None:
self.root = Path(self.tmp.name).resolve()
self.registry = build_default_registry(include_user_tools=False)
self.ctx = ToolContext(workspace_root=self.root)
self.ctx.permission_handler = _allow_all_handler

# Create a file to edit
self.test_file = self.root / "target.py"
Expand Down Expand Up @@ -119,6 +126,7 @@ def setUp(self) -> None:
self.root = Path(self.tmp.name).resolve()
self.registry = build_default_registry(include_user_tools=False)
self.ctx = ToolContext(workspace_root=self.root)
self.ctx.permission_handler = _allow_all_handler

def tearDown(self) -> None:
self.tmp.cleanup()
Expand Down
3 changes: 3 additions & 0 deletions tests/parity/test_permission_flow_parity.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def _make_mock_tool(name: str, is_mcp: bool = False):
message=f"Claude wants to use {name}. Allow?",
)
)
# The TS vectors model a default (non-read-only) tool; a bare MagicMock
# would return a truthy Mock and trip the read-only auto-allow (#274).
tool.is_read_only = MagicMock(return_value=False)
# Default: not user-interactive
if hasattr(tool, "requires_user_interaction"):
del tool.requires_user_interaction
Expand Down
26 changes: 21 additions & 5 deletions tests/tasks/test_agent_name_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pytest

from src.permissions.types import ToolPermissionContext
from src.tool_system.context import ToolContext
from src.tool_system.defaults import build_default_registry
from src.tool_system.protocol import ToolCall
Expand Down Expand Up @@ -33,15 +34,21 @@ def test_agent_input_schema_declares_name_field() -> None:


def test_registry_field_default_empty(tmp_path: Path) -> None:
ctx = ToolContext(workspace_root=tmp_path)
ctx = ToolContext(
workspace_root=tmp_path,
permission_context=ToolPermissionContext(mode="bypassPermissions"),
)
assert len(ctx.agent_name_registry) == 0


def test_named_async_agent_registers_name_to_id(tmp_path: Path) -> None:
"""Spawn with ``name="researcher"`` populates
``ctx.agent_name_registry["researcher"]``."""
registry = build_default_registry(provider=object())
ctx = ToolContext(workspace_root=tmp_path)
ctx = ToolContext(
workspace_root=tmp_path,
permission_context=ToolPermissionContext(mode="bypassPermissions"),
)

async def _fake(_params):
yield AssistantMessage(content=[TextBlock(text="ok")])
Expand All @@ -67,7 +74,10 @@ async def _fake(_params):
def test_unnamed_spawn_does_not_register(tmp_path: Path) -> None:
"""No ``name`` → registry stays empty."""
registry = build_default_registry(provider=object())
ctx = ToolContext(workspace_root=tmp_path)
ctx = ToolContext(
workspace_root=tmp_path,
permission_context=ToolPermissionContext(mode="bypassPermissions"),
)

async def _fake(_params):
yield AssistantMessage(content=[TextBlock(text="ok")])
Expand Down Expand Up @@ -105,7 +115,10 @@ def test_collision_with_running_agent_raises(tmp_path: Path) -> None:
from src.tool_system.errors import ToolInputError

registry = build_default_registry(provider=object())
ctx = ToolContext(workspace_root=tmp_path)
ctx = ToolContext(
workspace_root=tmp_path,
permission_context=ToolPermissionContext(mode="bypassPermissions"),
)

# Pre-populate the registry as if a running agent was already
# registered.
Expand Down Expand Up @@ -213,7 +226,10 @@ def test_collision_with_terminal_agent_overwrites(tmp_path: Path) -> None:
the new spawn overwrites the name → agent_id mapping. Older
terminal holders remain reachable via raw task_id."""
registry = build_default_registry(provider=object())
ctx = ToolContext(workspace_root=tmp_path)
ctx = ToolContext(
workspace_root=tmp_path,
permission_context=ToolPermissionContext(mode="bypassPermissions"),
)

from src.tasks.local_agent import (
complete_agent_task,
Expand Down
11 changes: 9 additions & 2 deletions tests/tasks/test_local_agent_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from src.tasks.progress import AgentProgress
from src.tasks_core import generate_task_id
from src.permissions.types import ToolPermissionContext
from src.tool_system.context import ToolContext
from src.tool_system.defaults import build_default_registry
from src.tool_system.protocol import ToolCall
Expand Down Expand Up @@ -270,7 +271,10 @@ def test_async_agent_writes_jsonl_transcript_on_disk(tmp_path: Path) -> None:
yielded message. This is the prerequisite for Phase 3 / WI-3.1
notification XML and Phase 7 / WI-7.4 auto-resume."""
registry = build_default_registry(provider=object())
ctx = ToolContext(workspace_root=tmp_path)
ctx = ToolContext(
workspace_root=tmp_path,
permission_context=ToolPermissionContext(mode="bypassPermissions"),
)

async def _fake(_params):
yield AssistantMessage(
Expand Down Expand Up @@ -320,7 +324,10 @@ def test_async_agent_finalize_total_tokens_is_no_longer_zero(tmp_path: Path) ->
the chapter-correct latest_input + cumulative_output instead of the
pre-WI-2.4 hard-coded ``0``."""
registry = build_default_registry(provider=object())
ctx = ToolContext(workspace_root=tmp_path)
ctx = ToolContext(
workspace_root=tmp_path,
permission_context=ToolPermissionContext(mode="bypassPermissions"),
)

async def _fake(_params):
yield AssistantMessage(
Expand Down
Loading