Skip to content

Commit 0a274a5

Browse files
committed
feat: add importable enterprise scenario runner
1 parent c1c4b08 commit 0a274a5

1 file changed

Lines changed: 138 additions & 0 deletions

File tree

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
"""
2+
Scenario runners for bounded, synthetic execution-boundary evidence.
3+
4+
Scope:
5+
NON_EXEC / REVIEW_ONLY.
6+
7+
Claim boundary:
8+
These runners demonstrate synthetic refusal behaviour on demonstrated paths only.
9+
They do not prove production enforcement, external system control, compliance,
10+
certification, deployment, adoption, or path-universal coverage.
11+
"""
12+
13+
from __future__ import annotations
14+
15+
import copy
16+
import hashlib
17+
import json
18+
from datetime import datetime, timezone
19+
from pathlib import Path
20+
from typing import Any, Dict
21+
22+
REQUIRED_FIELDS = [
23+
"actor",
24+
"action_type",
25+
"recipient_scope",
26+
"payload_hash",
27+
"authority_token",
28+
"expiry",
29+
"nonce",
30+
]
31+
32+
SCENARIO_ID = "ESP-001"
33+
FIXTURE_PATH = (
34+
Path(__file__).resolve().parents[2]
35+
/ "docs"
36+
/ "enterprise-shaped-scenarios"
37+
/ "invalid_attempt_missing_authority.json"
38+
)
39+
40+
INITIAL_STATE = {
41+
"sent_messages": [],
42+
"audit_receipts": [],
43+
}
44+
45+
46+
def stable_hash(value: Any) -> str:
47+
encoded = json.dumps(value, sort_keys=True, separators=(",", ":")).encode("utf-8")
48+
return "sha256:" + hashlib.sha256(encoded).hexdigest()
49+
50+
51+
def load_attempt(path: Path = FIXTURE_PATH) -> Dict[str, Any]:
52+
return json.loads(path.read_text(encoding="utf-8"))
53+
54+
55+
def initial_state() -> Dict[str, Any]:
56+
return copy.deepcopy(INITIAL_STATE)
57+
58+
59+
def write_receipt(attempt: Dict[str, Any], missing_field: str, state_hash: str) -> Dict[str, Any]:
60+
issued_at = datetime.now(timezone.utc).isoformat()
61+
receipt = {
62+
"receipt_id": f"RCP-{SCENARIO_ID}-RUN",
63+
"scenario_id": attempt.get("scenario_id", SCENARIO_ID),
64+
"attempt_id": attempt.get("attempt_id"),
65+
"attempted_action": attempt.get("action_type", "UNKNOWN"),
66+
"actor": attempt.get("actor", "UNKNOWN"),
67+
"action_type": attempt.get("action_type", "UNKNOWN"),
68+
"recipient_scope": attempt.get("recipient_scope", "UNKNOWN"),
69+
"payload_hash": attempt.get("payload_hash", "UNKNOWN"),
70+
"missing_field": missing_field,
71+
"decision": "DENY",
72+
"verdict": "DENY",
73+
"refusal_reason": (
74+
f"{missing_field} absent — no valid DecisionRecord for this actor, "
75+
"action_type, recipient_scope, and payload at gate time"
76+
),
77+
"issued_at": issued_at,
78+
"refused_at": issued_at,
79+
"timestamp": issued_at,
80+
"downstream_send": False,
81+
"state_mutated": False,
82+
"before_state_hash": state_hash,
83+
"after_state_hash": state_hash,
84+
"evidence": [
85+
"missing authority_token",
86+
"downstream_send=false",
87+
"before_state_hash == after_state_hash",
88+
],
89+
}
90+
receipt["receipt_hash"] = stable_hash(receipt)
91+
return receipt
92+
93+
94+
def commit_gate(attempt: Dict[str, Any], state: Dict[str, Any]) -> Dict[str, Any]:
95+
"""STRUCTURE_FIRST, FIRST_FAIL synthetic CommitGate."""
96+
state_before = copy.deepcopy(state)
97+
before_state_hash = stable_hash(state_before)
98+
99+
for field in REQUIRED_FIELDS:
100+
if field not in attempt or attempt[field] in (None, ""):
101+
receipt = write_receipt(attempt, field, before_state_hash)
102+
state["audit_receipts"].append(receipt)
103+
return {
104+
"decision": "DENY",
105+
"verdict": "DENY",
106+
"missing_field": field,
107+
"downstream_send": False,
108+
"receipt_written": True,
109+
"receipt": receipt,
110+
"receipt_hash": receipt["receipt_hash"],
111+
"sent_messages": list(state["sent_messages"]),
112+
"before_state_hash": before_state_hash,
113+
"after_state_hash": before_state_hash,
114+
"state_mutated": False,
115+
}
116+
117+
# ALLOW branch is intentionally not reached by ESP-001.
118+
state["sent_messages"].append(copy.deepcopy(attempt.get("payload")))
119+
after_state_hash = stable_hash(state)
120+
return {
121+
"decision": "ALLOW",
122+
"verdict": "ALLOW",
123+
"missing_field": None,
124+
"downstream_send": True,
125+
"receipt_written": False,
126+
"receipt": None,
127+
"receipt_hash": None,
128+
"sent_messages": list(state["sent_messages"]),
129+
"before_state_hash": before_state_hash,
130+
"after_state_hash": after_state_hash,
131+
"state_mutated": before_state_hash != after_state_hash,
132+
}
133+
134+
135+
def run_scenario_001() -> Dict[str, Any]:
136+
attempt = load_attempt()
137+
state = initial_state()
138+
return commit_gate(attempt, state)

0 commit comments

Comments
 (0)