Skip to content

Commit a873acc

Browse files
committed
test: add runnable scenario 001 refusal harness
1 parent 54babd8 commit a873acc

1 file changed

Lines changed: 133 additions & 0 deletions

File tree

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Scenario 001 — AI-generated external email refusal.
4+
5+
Synthetic, review-only harness.
6+
7+
Claim boundary:
8+
This script demonstrates one bounded scenario path. It does not prove enterprise
9+
readiness, production deployment, compliance, certification, adoption, or
10+
path-universal governance.
11+
"""
12+
13+
from __future__ import annotations
14+
15+
import copy
16+
import hashlib
17+
import json
18+
from pathlib import Path
19+
from typing import Any, Dict
20+
21+
ROOT = Path(__file__).resolve().parent
22+
ATTEMPT_PATH = ROOT / "invalid_attempt_missing_authority.json"
23+
24+
REQUIRED_FIELDS = [
25+
"actor",
26+
"action_type",
27+
"recipient_scope",
28+
"payload_hash",
29+
"authority_token",
30+
"expiry",
31+
"nonce",
32+
]
33+
34+
35+
def stable_hash(value: Any) -> str:
36+
encoded = json.dumps(value, sort_keys=True, separators=(",", ":")).encode("utf-8")
37+
return "sha256:" + hashlib.sha256(encoded).hexdigest()
38+
39+
40+
def load_attempt() -> Dict[str, Any]:
41+
return json.loads(ATTEMPT_PATH.read_text(encoding="utf-8"))
42+
43+
44+
def initial_state() -> Dict[str, Any]:
45+
return {
46+
"sent_messages": [],
47+
"audit_receipts": [],
48+
}
49+
50+
51+
def commit_gate(attempt: Dict[str, Any], state: Dict[str, Any]) -> Dict[str, Any]:
52+
before_state_hash = stable_hash(state)
53+
54+
for field in REQUIRED_FIELDS:
55+
if field not in attempt or attempt[field] in (None, ""):
56+
receipt = {
57+
"receipt_id": "RCP-ESP-001-RUN",
58+
"scenario_id": attempt.get("scenario_id"),
59+
"attempt_id": attempt.get("attempt_id"),
60+
"attempted_action": attempt.get("action_type"),
61+
"actor": attempt.get("actor"),
62+
"action_type": attempt.get("action_type"),
63+
"recipient_scope": attempt.get("recipient_scope"),
64+
"payload_hash": attempt.get("payload_hash"),
65+
"missing_field": field,
66+
"decision": "DENY",
67+
"refusal_reason": (
68+
f"{field} absent — no valid DecisionRecord for this actor, "
69+
"action_type, recipient_scope, and payload at gate time"
70+
),
71+
"downstream_send": False,
72+
"receipt_written": True,
73+
"state_mutated": False,
74+
"before_state_hash": before_state_hash,
75+
"after_state_hash": before_state_hash,
76+
}
77+
state["audit_receipts"].append(receipt)
78+
return receipt
79+
80+
# This branch is deliberately unreachable for the invalid fixture.
81+
state["sent_messages"].append(copy.deepcopy(attempt["payload"]))
82+
after_state_hash = stable_hash(state)
83+
return {
84+
"scenario_id": attempt.get("scenario_id"),
85+
"attempt_id": attempt.get("attempt_id"),
86+
"decision": "ALLOW",
87+
"downstream_send": True,
88+
"receipt_written": False,
89+
"state_mutated": True,
90+
"before_state_hash": before_state_hash,
91+
"after_state_hash": after_state_hash,
92+
}
93+
94+
95+
def run_once() -> Dict[str, Any]:
96+
attempt = load_attempt()
97+
state = initial_state()
98+
receipt = commit_gate(attempt, state)
99+
100+
assert receipt["decision"] == "DENY"
101+
assert receipt["missing_field"] == "authority_token"
102+
assert receipt["downstream_send"] is False
103+
assert receipt["receipt_written"] is True
104+
assert receipt["state_mutated"] is False
105+
assert receipt["before_state_hash"] == receipt["after_state_hash"]
106+
assert state["sent_messages"] == []
107+
108+
return receipt
109+
110+
111+
def main() -> None:
112+
first = run_once()
113+
replay = run_once()
114+
115+
replay_stable = (
116+
first["decision"] == replay["decision"]
117+
and first["missing_field"] == replay["missing_field"]
118+
and first["downstream_send"] is False
119+
and replay["downstream_send"] is False
120+
)
121+
122+
assert replay_stable is True
123+
124+
print(f"Scenario: {first['scenario_id']}")
125+
print(f"Decision: {first['decision']}")
126+
print(f"Missing field: {first['missing_field']}")
127+
print(f"Downstream send: {str(first['downstream_send']).lower()}")
128+
print(f"Receipt written: {str(first['receipt_written']).lower()}")
129+
print(f"Replay stable: {str(replay_stable).lower()}")
130+
131+
132+
if __name__ == "__main__":
133+
main()

0 commit comments

Comments
 (0)