Skip to content

Commit 49606c6

Browse files
committed
test: add phase 7 cross-session isolation suite
1 parent 9fcc325 commit 49606c6

2 files changed

Lines changed: 186 additions & 0 deletions

File tree

src/clawzero/cli.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@
7373
"expected": 250,
7474
"paths": ("tests/adapters/test_framework_adapter_matrix_generated.py",),
7575
},
76+
{
77+
"name": "Cross-Session Isolation",
78+
"expected": 50,
79+
"paths": ("tests/session/test_cross_session_isolation_generated.py",),
80+
},
7681
)
7782

7883

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
"""Generated cross-session isolation suite (Phase 7).
2+
3+
This suite adds 50 scenarios to prove session state isolation:
4+
- escalation score does not leak across sessions
5+
- chain detector event history stays session-local
6+
- append-only JSONL logs remain isolated per session id
7+
"""
8+
9+
from __future__ import annotations
10+
11+
import os
12+
import sys
13+
import uuid
14+
from dataclasses import dataclass
15+
from itertools import islice, product
16+
from pathlib import Path
17+
18+
import pytest
19+
20+
sys.path.insert(
21+
0,
22+
os.path.join(
23+
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
24+
"src",
25+
),
26+
)
27+
28+
from clawzero.contracts import ActionDecision
29+
from clawzero.runtime import AgentSession
30+
31+
PROFILES: tuple[str, ...] = ("dev_balanced", "dev_strict", "prod_locked")
32+
SINKS: tuple[str, ...] = (
33+
"tool.custom",
34+
"http.request",
35+
"filesystem.read",
36+
"filesystem.write",
37+
"credentials.access",
38+
"shell.exec",
39+
)
40+
TAINTS: tuple[str, ...] = ("trusted", "untrusted")
41+
42+
43+
@dataclass(frozen=True)
44+
class IsolationCase:
45+
index: int
46+
profile_a: str
47+
profile_b: str
48+
sink_type: str
49+
taint_level: str
50+
decision_a: str
51+
close_a_before_b: bool
52+
53+
@property
54+
def case_id(self) -> str:
55+
close_token = "closeA" if self.close_a_before_b else "keepA"
56+
return (
57+
f"s{self.index:02d}__{self.profile_a}__{self.profile_b}__"
58+
f"{self.sink_type.replace('.', '_')}__{self.taint_level}__"
59+
f"{self.decision_a}__{close_token}"
60+
)
61+
62+
63+
def generate_isolation_cases() -> list[IsolationCase]:
64+
combos = product(PROFILES, PROFILES, SINKS, TAINTS)
65+
cases: list[IsolationCase] = []
66+
for index, (profile_a, profile_b, sink_type, taint_level) in enumerate(islice(combos, 50), start=1):
67+
decision_a = "block" if taint_level == "untrusted" and sink_type in {"shell.exec", "credentials.access"} else "allow"
68+
cases.append(
69+
IsolationCase(
70+
index=index,
71+
profile_a=profile_a,
72+
profile_b=profile_b,
73+
sink_type=sink_type,
74+
taint_level=taint_level,
75+
decision_a=decision_a,
76+
close_a_before_b=(index % 2 == 0),
77+
)
78+
)
79+
return cases
80+
81+
82+
def _decision(
83+
*,
84+
request_id: str,
85+
sink_type: str,
86+
decision: str,
87+
taint_level: str,
88+
source: str,
89+
profile: str,
90+
) -> ActionDecision:
91+
reason = "POLICY_ALLOW" if decision == "allow" else "UNTRUSTED_TO_CRITICAL_SINK"
92+
return ActionDecision(
93+
request_id=request_id,
94+
decision=decision,
95+
reason_code=reason,
96+
human_reason=decision,
97+
sink_type=sink_type,
98+
target=f"target:{sink_type}",
99+
policy_profile=profile,
100+
annotations={
101+
"provenance": {
102+
"source": source,
103+
"taint_level": taint_level,
104+
},
105+
"input_class": taint_level,
106+
},
107+
trust_level=taint_level,
108+
)
109+
110+
111+
@pytest.mark.parametrize(
112+
"case",
113+
[pytest.param(case, id=case.case_id) for case in generate_isolation_cases()],
114+
)
115+
def test_cross_session_isolation_generated(case: IsolationCase, tmp_path: Path) -> None:
116+
root = tmp_path / case.case_id
117+
session_a = AgentSession(
118+
session_id=f"A_{case.index:02d}",
119+
profile=case.profile_a,
120+
persistence_root=root,
121+
)
122+
session_b = AgentSession(
123+
session_id=f"B_{case.index:02d}",
124+
profile=case.profile_b,
125+
persistence_root=root,
126+
)
127+
128+
assert session_a.log_path != session_b.log_path
129+
assert session_b.escalation_score == 0.0
130+
assert session_b.chain_detector.summary()["events"] == 0
131+
132+
req_a = f"req-A-{uuid.uuid4().hex[:8]}"
133+
source_a = f"source_A_{case.index:02d}"
134+
enriched_a = session_a.evaluate(
135+
_decision(
136+
request_id=req_a,
137+
sink_type=case.sink_type,
138+
decision=case.decision_a,
139+
taint_level=case.taint_level,
140+
source=source_a,
141+
profile=case.profile_a,
142+
)
143+
)
144+
assert enriched_a.annotations["session"]["session_id"] == session_a.session_id
145+
assert enriched_a.annotations["session"]["source_id"] == source_a
146+
147+
if case.close_a_before_b:
148+
session_a.close()
149+
150+
# Session B must start from a clean state regardless of session A history.
151+
assert session_b.escalation_score == 0.0
152+
assert len(session_b.decisions) == 0
153+
assert session_b.chain_detector.summary()["events"] == 0
154+
155+
req_b = f"req-B-{uuid.uuid4().hex[:8]}"
156+
source_b = f"source_B_{case.index:02d}"
157+
enriched_b = session_b.evaluate(
158+
_decision(
159+
request_id=req_b,
160+
sink_type="tool.custom",
161+
decision="allow",
162+
taint_level="trusted",
163+
source=source_b,
164+
profile=case.profile_b,
165+
)
166+
)
167+
168+
session_meta_b = enriched_b.annotations["session"]
169+
assert session_meta_b["session_id"] == session_b.session_id
170+
assert session_meta_b["call_index"] == 1
171+
assert session_meta_b["source_id"] == source_b
172+
assert session_meta_b["chain_detections"] == []
173+
assert session_b.chain_detector.summary()["events"] == 1
174+
175+
# JSONL logs stay isolated per session and never mix request ids.
176+
a_lines = session_a.log_path.read_text(encoding="utf-8").splitlines()
177+
b_lines = session_b.log_path.read_text(encoding="utf-8").splitlines()
178+
assert any(req_a in line for line in a_lines)
179+
assert all(req_b not in line for line in a_lines)
180+
assert any(req_b in line for line in b_lines)
181+
assert all(req_a not in line for line in b_lines)

0 commit comments

Comments
 (0)