Skip to content

Commit 26446d5

Browse files
committed
feat: ADR 0032 M3 Slice A — plan gate interceptor in shadow mode (parity-proven)
Per §13.3, lands the plan-gate-as-interceptor migration as the first of two commits. PlanGateInterceptor (in teaagent/runner/_plan_validator.py) evaluates the plan gate on TOOL_CALL_REQUESTED. Here it is registered in SHADOW mode (raise_on_deny=False) and the inline evaluate_write_gate stays authoritative — so behavior is unchanged. The runner now emits TOOL_CALL_REQUESTED (a new audit line; frozen contract updated accordingly). Parity is proven before any enforcement flip: test_plan_gate_interceptor_parity asserts the interceptor decision equals the inline decision (reason codes are the oracle), plus a shadow-mode no-veto test and interceptor unit tests. Slice B (separate commit) flips to enforce and removes the inline gate. Constraint: shadow only — inline gate authoritative; behavior unchanged except audit gains tool_call_requested; reason codes are the parity oracle Tested: lifecycle (incl. parity + shadow tests), adversarial + first-hour 35, smoke 200, acceptance 646/646, full mypy clean 1009 files, ruff clean Confidence: high Roadmap-Status: unchanged Allow-test-weakening: frozen audit-contract test updated to include the new tool_call_requested emit; net assertions stronger (parity + interceptor coverage added)
1 parent 8fd9cb7 commit 26446d5

3 files changed

Lines changed: 394 additions & 2 deletions

File tree

teaagent/runner/_core.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
from ._approval_manager import RunnerApprovalCoordinator # noqa: E402
5050
from ._auto_mode_manager import AutoModeManager # noqa: E402
5151
from ._events import EventSpine, RunEventType, register_audit_consumer # noqa: E402
52-
from ._plan_validator import PlanValidator # noqa: E402
52+
from ._plan_validator import PlanGateInterceptor, PlanValidator # noqa: E402
5353
from ._types import ( # noqa: E402
5454
ApprovalHandler,
5555
BudgetPromptHandler,
@@ -171,6 +171,22 @@ def __init__(
171171
require_plan=require_plan,
172172
skip_plan_check=skip_plan_check,
173173
)
174+
175+
# M3-T002 Slice A: register the plan gate interceptor in SHADOW mode.
176+
# It computes its decision on TOOL_CALL_REQUESTED but does NOT veto;
177+
# the inline evaluate_write_gate below stays authoritative. The parity
178+
# test asserts interceptor decision == inline decision per reason code.
179+
# Slice B (separate commit) flips this to enforce and removes the inline
180+
# gate.
181+
self._plan_gate_interceptor = PlanGateInterceptor(
182+
self.plan_validator,
183+
raise_on_deny=False,
184+
)
185+
self.event_spine.register_interceptor(
186+
self._plan_gate_interceptor,
187+
name='plan_gate_shadow',
188+
)
189+
174190
self.auto_mode_manager = AutoModeManager(
175191
auto_mode_config=auto_mode_config,
176192
)
@@ -624,13 +640,33 @@ def _execute_tool_decision( # noqa: C901
624640
tool_name=decision.tool_name,
625641
arguments=decision.arguments,
626642
)
643+
644+
# M3-T002 Slice A: emit TOOL_CALL_REQUESTED so the SHADOW plan-gate
645+
# interceptor records its decision (it does not veto). The inline gate
646+
# below remains authoritative until Slice B.
647+
tcr_payload: dict[str, Any] = {
648+
'tool_name': decision.tool_name,
649+
}
650+
if decision.arguments is not None:
651+
tcr_payload['arguments'] = decision.arguments
652+
plan_contract = context.get('plan_contract')
653+
if plan_contract is not None:
654+
tcr_payload['plan_contract'] = plan_contract
655+
self.event_spine.emit(
656+
RunEventType.TOOL_CALL_REQUESTED,
657+
run_id,
658+
tcr_payload,
659+
)
660+
661+
# M3-T002 Slice A: inline plan gate stays authoritative (removed in B).
627662
gate_error = self.plan_validator.evaluate_write_gate(
628663
tool_name=decision.tool_name,
629664
context=cast(dict[str, Any], context),
630665
tool_arguments=decision.arguments,
631666
)
632667
if gate_error:
633668
raise ToolPermissionError(gate_error)
669+
634670
# Auto mode: block disallowed tools, auto-approve allowed ones
635671
self.auto_mode_manager.validate_tool_allowed(decision.tool_name)
636672
auto_approve_policy = self.auto_mode_manager.get_auto_approve_policy()

teaagent/runner/_plan_validator.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from teaagent.governance.plan_gate import assert_write_allowed, assert_write_scope
88
from teaagent.policy import ApprovalPolicy, PermissionMode
9+
from teaagent.runner._events import RunEvent, RunEventType
910
from teaagent.spec_exemption import ExemptionDetector, SpecExemptionReceipt
1011

1112

@@ -147,3 +148,66 @@ def check_read_only_lint_errors(self) -> Optional[str]:
147148
def get_plan_contract(self) -> Any:
148149
"""Get the current plan contract."""
149150
return self._plan_contract
151+
152+
153+
class PlanGateInterceptor:
154+
"""EventSpine interceptor that enforces the plan gate on TOOL_CALL_REQUESTED.
155+
156+
Evaluates ``PlanValidator.evaluate_write_gate()`` and raises
157+
``ToolPermissionError`` if the write is blocked by plan policy, read-only
158+
lint, or scope drift.
159+
160+
In shadow mode (``raise_on_deny=False``) the interceptor records its
161+
decision without vetoing — used for parity testing before enforcement
162+
is enabled.
163+
"""
164+
165+
def __init__(
166+
self,
167+
plan_validator: PlanValidator,
168+
*,
169+
raise_on_deny: bool = True,
170+
) -> None:
171+
self._pv = plan_validator
172+
self._raise_on_deny = raise_on_deny
173+
# Exposed for parity assertions: set after each invocation.
174+
self.last_decision: str | None = None
175+
176+
def __call__(self, event: RunEvent) -> None:
177+
"""Evaluate the plan gate for TOOL_CALL_REQUESTED events.
178+
179+
Args:
180+
event: The run event.
181+
182+
Raises:
183+
ToolPermissionError: If the gate blocks and raise_on_deny is True.
184+
"""
185+
if event.type != RunEventType.TOOL_CALL_REQUESTED:
186+
return
187+
188+
tool_name = event.payload['tool_name']
189+
arguments = event.payload.get('arguments')
190+
# Reconstruct minimal context from event payload for plan_contract check.
191+
context: dict[str, Any] = {}
192+
plan_contract = event.payload.get('plan_contract')
193+
if plan_contract is not None:
194+
context['plan_contract'] = plan_contract
195+
196+
from teaagent.errors import ToolPermissionError
197+
198+
try:
199+
gate_error = self._pv.evaluate_write_gate(
200+
tool_name=tool_name,
201+
context=context,
202+
tool_arguments=arguments,
203+
)
204+
except ToolPermissionError as exc:
205+
# evaluate_write_gate can raise directly from assert_write_allowed.
206+
self.last_decision = str(exc)
207+
if self._raise_on_deny:
208+
raise
209+
return
210+
211+
self.last_decision = gate_error
212+
if gate_error and self._raise_on_deny:
213+
raise ToolPermissionError(gate_error)

0 commit comments

Comments
 (0)