Skip to content

Commit 5d64182

Browse files
committed
test: add phase 8 sarif and witness export matrix suite
1 parent 49606c6 commit 5d64182

2 files changed

Lines changed: 199 additions & 0 deletions

File tree

src/clawzero/cli.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@
7878
"expected": 50,
7979
"paths": ("tests/session/test_cross_session_isolation_generated.py",),
8080
},
81+
{
82+
"name": "SARIF + Witness Export",
83+
"expected": 300,
84+
"paths": ("tests/exports/test_sarif_witness_export_generated.py",),
85+
},
8186
)
8287

8388

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
"""Generated SARIF + witness export matrix suite (Phase 8).
2+
3+
Adds 300 scenarios:
4+
- 150 SARIF report construction/validation cases
5+
- 150 witness verification/export integrity cases
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import hashlib
11+
import json
12+
import os
13+
import sys
14+
from dataclasses import dataclass
15+
from itertools import islice, product
16+
from typing import Any
17+
18+
import pytest
19+
20+
sys.path.insert(
21+
0,
22+
os.path.join(
23+
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
24+
"src",
25+
),
26+
)
27+
28+
from clawzero.sarif import build_sarif_report, validate_sarif_report
29+
from clawzero.witnesses.verify import verify_witness_object
30+
31+
DECISIONS: tuple[str, ...] = ("allow", "annotate", "block")
32+
SINKS: tuple[str, ...] = (
33+
"tool.custom",
34+
"http.request",
35+
"filesystem.read",
36+
"filesystem.write",
37+
"credentials.access",
38+
"shell.exec",
39+
)
40+
REASON_CODES: tuple[str, ...] = (
41+
"POLICY_ALLOW",
42+
"STEP_UP_REQUIRED",
43+
"UNTRUSTED_TO_CRITICAL_SINK",
44+
"DOMAIN_BLOCKED",
45+
"PATH_BLOCKED",
46+
)
47+
ENGINES: tuple[str, ...] = ("embedded-policy-v0.1", "mvar-security")
48+
SOURCES: tuple[str, ...] = ("user_request", "external_document", "api_response")
49+
TAINTS: tuple[str, ...] = ("trusted", "untrusted")
50+
51+
52+
@dataclass(frozen=True)
53+
class SarifCase:
54+
index: int
55+
decision: str
56+
sink_type: str
57+
reason_code: str
58+
engine: str
59+
source: str
60+
61+
@property
62+
def case_id(self) -> str:
63+
return (
64+
f"sarif_{self.index:03d}__{self.decision}__"
65+
f"{self.sink_type.replace('.', '_')}__{self.reason_code.lower()}"
66+
)
67+
68+
69+
@dataclass(frozen=True)
70+
class WitnessCase:
71+
index: int
72+
decision: str
73+
sink_type: str
74+
reason_code: str
75+
source: str
76+
taint_level: str
77+
78+
@property
79+
def case_id(self) -> str:
80+
return (
81+
f"witness_{self.index:03d}__{self.decision}__"
82+
f"{self.sink_type.replace('.', '_')}__{self.taint_level}"
83+
)
84+
85+
86+
def _sarif_cases() -> list[SarifCase]:
87+
combos = product(DECISIONS, SINKS, REASON_CODES, ENGINES, SOURCES)
88+
return [
89+
SarifCase(
90+
index=i,
91+
decision=decision,
92+
sink_type=sink_type,
93+
reason_code=reason_code,
94+
engine=engine,
95+
source=source,
96+
)
97+
for i, (decision, sink_type, reason_code, engine, source) in enumerate(islice(combos, 150), start=1)
98+
]
99+
100+
101+
def _witness_cases() -> list[WitnessCase]:
102+
combos = product(DECISIONS, SINKS, REASON_CODES, SOURCES, TAINTS)
103+
return [
104+
WitnessCase(
105+
index=i,
106+
decision=decision,
107+
sink_type=sink_type,
108+
reason_code=reason_code,
109+
source=source,
110+
taint_level=taint_level,
111+
)
112+
for i, (decision, sink_type, reason_code, source, taint_level) in enumerate(
113+
islice(combos, 150), start=1
114+
)
115+
]
116+
117+
118+
def _sarif_level(decision: str) -> str:
119+
if decision == "block":
120+
return "error"
121+
if decision == "annotate":
122+
return "warning"
123+
return "note"
124+
125+
126+
def _with_content_hash(payload: dict[str, Any]) -> dict[str, Any]:
127+
witness = dict(payload)
128+
canonical = json.dumps(witness, sort_keys=True, separators=(",", ":"), ensure_ascii=True)
129+
witness["content_hash"] = f"sha256:{hashlib.sha256(canonical.encode('utf-8')).hexdigest()}"
130+
return witness
131+
132+
133+
@pytest.mark.parametrize("case", [pytest.param(case, id=case.case_id) for case in _sarif_cases()])
134+
def test_sarif_export_generated(case: SarifCase) -> None:
135+
witness = {
136+
"witness_id": f"w-{case.index:03d}",
137+
"decision": case.decision,
138+
"reason_code": case.reason_code,
139+
"sink_type": case.sink_type,
140+
"target": f"target://{case.source}/{case.index:03d}",
141+
"policy_id": "mvar-embedded.v0.1",
142+
"engine": case.engine,
143+
"chain_index": 1,
144+
"_source_file": f"witness_{case.index:03d}.json",
145+
}
146+
147+
report = build_sarif_report([witness], tool_version="0.3.0")
148+
assert validate_sarif_report(report) == []
149+
150+
result = report["runs"][0]["results"][0]
151+
assert result["properties"]["decision"] == case.decision
152+
assert result["properties"]["reason_code"] == case.reason_code
153+
assert result["properties"]["engine"] == case.engine
154+
assert result["level"] == _sarif_level(case.decision)
155+
156+
157+
@pytest.mark.parametrize("case", [pytest.param(case, id=case.case_id) for case in _witness_cases()])
158+
def test_witness_export_generated(case: WitnessCase) -> None:
159+
base = {
160+
"timestamp": f"2026-04-12T20:{case.index % 60:02d}:00+00:00",
161+
"agent_runtime": "clawzero",
162+
"sink_type": case.sink_type,
163+
"target": f"target://{case.source}/{case.index:03d}",
164+
"decision": case.decision,
165+
"reason_code": case.reason_code,
166+
"policy_id": "mvar-embedded.v0.1",
167+
"engine": "embedded-policy-v0.1",
168+
"provenance": {
169+
"source": case.source,
170+
"taint_level": case.taint_level,
171+
"source_chain": [case.source, "tool_call"],
172+
"taint_markers": [] if case.taint_level == "trusted" else ["external_content"],
173+
},
174+
"adapter": {
175+
"name": "generated",
176+
"framework": "matrix",
177+
"mode": "test",
178+
},
179+
"witness_signature": "ed25519_stub:0123456789abcdef",
180+
"schema_version": "1.1",
181+
"chain_index": 1,
182+
"previous_hash": "genesis",
183+
}
184+
witness = _with_content_hash(base)
185+
186+
valid = verify_witness_object(witness, require_chain=True)
187+
assert valid.valid is True
188+
assert valid.reasons == []
189+
190+
tampered = dict(witness)
191+
tampered["reason_code"] = "TAMPERED_REASON"
192+
invalid = verify_witness_object(tampered, require_chain=True)
193+
assert invalid.valid is False
194+
assert any("content_hash mismatch" in reason for reason in invalid.reasons)

0 commit comments

Comments
 (0)