Skip to content

Commit 3ea840a

Browse files
committed
test: use importable scenario runner for ESP-001
1 parent 0a274a5 commit 3ea840a

1 file changed

Lines changed: 44 additions & 181 deletions

File tree

tests/test_enterprise_scenario_001.py

Lines changed: 44 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -13,195 +13,58 @@
1313
enforcement, external system control, compliance, or path-universal coverage.
1414
"""
1515

16-
import copy
17-
import hashlib
18-
import json
19-
from datetime import datetime, timezone
20-
21-
22-
# ---------------------------------------------------------------------------
23-
# CommitGate — minimal synthetic implementation for test surface
24-
# ---------------------------------------------------------------------------
25-
26-
REQUIRED_FIELDS = [
27-
"actor",
28-
"action_type",
29-
"recipient_scope",
30-
"payload_hash",
31-
"authority_token",
32-
"expiry",
33-
"nonce",
34-
]
35-
36-
_sent_messages = []
37-
38-
39-
def _compute_state_hash(state: dict) -> str:
40-
serialised = json.dumps(state, sort_keys=True).encode()
41-
return hashlib.sha256(serialised).hexdigest()
42-
43-
44-
def _write_receipt(attempt: dict, missing_field: str, decision: str, reason: str) -> dict:
45-
return {
46-
"receipt_id": f"RCP-{datetime.now(timezone.utc).strftime('%Y%m%d')}-001",
47-
"scenario_id": "ESP-001",
48-
"attempted_action": attempt.get("action_type", "UNKNOWN"),
49-
"actor": attempt.get("actor", "UNKNOWN"),
50-
"action_type": attempt.get("action_type", "UNKNOWN"),
51-
"recipient_scope": attempt.get("recipient_scope", "UNKNOWN"),
52-
"payload_hash": attempt.get("payload_hash", "UNKNOWN"),
53-
"missing_field": missing_field,
54-
"decision": decision,
55-
"refusal_reason": reason,
56-
"timestamp": datetime.now(timezone.utc).isoformat(),
57-
"downstream_send": False,
58-
}
59-
60-
61-
def commit_gate(attempt: dict, state_before: dict) -> dict:
62-
"""
63-
CommitGate — STRUCTURE_FIRST, FIRST_FAIL evaluation.
64-
Returns a result dict with decision, receipt, and state metadata.
65-
"""
66-
state_snapshot_before = copy.deepcopy(state_before)
67-
hash_before = _compute_state_hash(state_snapshot_before)
68-
69-
for field in REQUIRED_FIELDS:
70-
if field not in attempt or attempt[field] is None or attempt[field] == "":
71-
receipt = _write_receipt(
72-
attempt,
73-
missing_field=field,
74-
decision="DENY",
75-
reason=(
76-
f"{field} absent — no valid DecisionRecord for this actor, "
77-
"action_type, recipient_scope, and payload at gate time"
78-
),
79-
)
80-
hash_after = _compute_state_hash(state_snapshot_before)
81-
return {
82-
"decision": "DENY",
83-
"missing_field": field,
84-
"downstream_send": False,
85-
"receipt_written": True,
86-
"receipt": receipt,
87-
"sent_messages": list(_sent_messages),
88-
"before_state_hash": hash_before,
89-
"after_state_hash": hash_after,
90-
"state_mutated": hash_before != hash_after,
91-
}
92-
93-
# All fields present — ALLOW path. Not triggered in this scenario.
94-
_sent_messages.append(attempt)
95-
receipt = _write_receipt(attempt, missing_field="", decision="ALLOW", reason="all fields valid")
96-
hash_after = _compute_state_hash(state_snapshot_before)
97-
return {
98-
"decision": "ALLOW",
99-
"missing_field": None,
100-
"downstream_send": True,
101-
"receipt_written": True,
102-
"receipt": receipt,
103-
"sent_messages": list(_sent_messages),
104-
"before_state_hash": hash_before,
105-
"after_state_hash": hash_after,
106-
"state_mutated": False,
107-
}
108-
109-
110-
# ---------------------------------------------------------------------------
111-
# Fixtures
112-
# ---------------------------------------------------------------------------
113-
114-
INVALID_ATTEMPT_MISSING_AUTHORITY = {
115-
"actor": "agent://morpheus-draft-bot-v1",
116-
"action_type": "SEND_EXTERNAL_EMAIL",
117-
"recipient_scope": "external:partner-domain.com",
118-
"payload_hash": "sha256:a3f8c2d1e7b904561ac38fd2e190bba47c2d3f8a1e056b7c9d4e2f1a8b3c7d90",
119-
"authority_token": None, # MISSING — triggers refusal
120-
"expiry": "2026-05-12T23:59:59Z",
121-
"nonce": "nonce-abc-001",
122-
}
123-
124-
INITIAL_STATE = {
125-
"sent_count": 0,
126-
"last_action": None,
127-
"pending_receipts": [],
128-
}
129-
130-
131-
# ---------------------------------------------------------------------------
132-
# Tests
133-
# ---------------------------------------------------------------------------
16+
from commit_gate_core.scenario_runner import run_scenario_001
17+
13418

13519
def test_missing_authority_token_denies_send():
136-
"""
137-
ASSERT: A missing authority_token produces decision == DENY
138-
and missing_field == authority_token.
139-
"""
140-
result = commit_gate(INVALID_ATTEMPT_MISSING_AUTHORITY, copy.deepcopy(INITIAL_STATE))
141-
assert result["decision"] == "DENY", (
142-
f"Expected DENY, got {result['decision']}"
143-
)
144-
assert result["missing_field"] == "authority_token", (
145-
f"Expected missing_field='authority_token', got '{result['missing_field']}'"
146-
)
20+
"""Missing authority_token must produce DENY."""
21+
result = run_scenario_001()
22+
23+
assert result["verdict"] == "DENY"
24+
assert result["decision"] == "DENY"
25+
assert result["missing_field"] == "authority_token"
26+
assert result["downstream_send"] is False
27+
assert result["receipt_written"] is True
14728

14829

14930
def test_no_downstream_send_occurs():
150-
"""
151-
ASSERT: downstream_send is False and sent_messages list is empty.
152-
"""
153-
result = commit_gate(INVALID_ATTEMPT_MISSING_AUTHORITY, copy.deepcopy(INITIAL_STATE))
154-
assert result["downstream_send"] is False, (
155-
"downstream_send should be False — no email should leave the system"
156-
)
157-
assert result["sent_messages"] == [], (
158-
f"sent_messages should be empty, got: {result['sent_messages']}"
159-
)
160-
161-
162-
def test_receipt_is_written():
163-
"""
164-
ASSERT: receipt_written is True and receipt contains expected fields.
165-
"""
166-
result = commit_gate(INVALID_ATTEMPT_MISSING_AUTHORITY, copy.deepcopy(INITIAL_STATE))
167-
assert result["receipt_written"] is True, "receipt_written should be True"
168-
receipt = result["receipt"]
169-
required_receipt_fields = [
170-
"receipt_id", "scenario_id", "attempted_action", "actor",
171-
"action_type", "recipient_scope", "payload_hash", "missing_field",
172-
"decision", "refusal_reason", "timestamp", "downstream_send",
173-
]
174-
for field in required_receipt_fields:
175-
assert field in receipt, f"Receipt missing field: {field}"
176-
assert receipt["decision"] == "DENY"
177-
assert receipt["downstream_send"] is False
178-
assert receipt["missing_field"] == "authority_token"
31+
"""No email is sent when authority is missing."""
32+
result = run_scenario_001()
33+
34+
assert result["downstream_send"] is False
35+
assert result["sent_messages"] == []
17936

18037

18138
def test_state_hash_does_not_change():
182-
"""
183-
ASSERT: before_state_hash == after_state_hash and state_mutated is False.
184-
A refusal must not alter system state.
185-
"""
186-
result = commit_gate(INVALID_ATTEMPT_MISSING_AUTHORITY, copy.deepcopy(INITIAL_STATE))
187-
assert result["before_state_hash"] == result["after_state_hash"], (
188-
"State hash changed on refusal — state mutation detected"
189-
)
190-
assert result["state_mutated"] is False, (
191-
"state_mutated should be False after a DENY"
192-
)
39+
"""State snapshot proves no mutation occurred on the tested path."""
40+
result = run_scenario_001()
41+
42+
assert result["before_state_hash"] == result["after_state_hash"]
43+
assert result["state_mutated"] is False
19344

19445

19546
def test_replay_is_stable():
196-
"""
197-
ASSERT: Running the same invalid attempt twice produces the same
198-
decision and the same missing_field — refusal is deterministic.
199-
"""
200-
first = commit_gate(INVALID_ATTEMPT_MISSING_AUTHORITY, copy.deepcopy(INITIAL_STATE))
201-
replay = commit_gate(INVALID_ATTEMPT_MISSING_AUTHORITY, copy.deepcopy(INITIAL_STATE))
202-
assert first["decision"] == replay["decision"], (
203-
f"Replay decision mismatch: {first['decision']} vs {replay['decision']}"
204-
)
205-
assert first["missing_field"] == replay["missing_field"], (
206-
f"Replay missing_field mismatch: {first['missing_field']} vs {replay['missing_field']}"
207-
)
47+
"""Replay of the same invalid attempt produces identical refusal class."""
48+
first = run_scenario_001()
49+
replay = run_scenario_001()
50+
51+
assert first["verdict"] == replay["verdict"]
52+
assert first["decision"] == replay["decision"]
53+
assert first["missing_field"] == replay["missing_field"]
54+
assert first["downstream_send"] is False
55+
assert replay["downstream_send"] is False
56+
57+
58+
def test_receipt_structure_is_valid():
59+
"""Receipt contains required fields and correct refusal metadata."""
60+
result = run_scenario_001()
61+
receipt = result["receipt"]
62+
63+
assert receipt["verdict"] == "DENY"
64+
assert receipt["decision"] == "DENY"
65+
assert receipt["missing_field"] == "authority_token"
66+
assert receipt["payload_hash"] is not None
67+
assert receipt["receipt_hash"] is not None
68+
assert "issued_at" in receipt
69+
assert "refused_at" in receipt
70+
assert receipt["evidence"]

0 commit comments

Comments
 (0)