Skip to content

Commit cd18af4

Browse files
committed
test: add extended fuzz matrix to pass 9.5k test mark
1 parent 5d64182 commit cd18af4

2 files changed

Lines changed: 204 additions & 0 deletions

File tree

src/clawzero/cli.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@
8383
"expected": 300,
8484
"paths": ("tests/exports/test_sarif_witness_export_generated.py",),
8585
},
86+
{
87+
"name": "Fuzzing Corpus Extended",
88+
"expected": 1008,
89+
"paths": ("tests/fuzzing/test_engine_fuzz_extended_generated.py",),
90+
},
8691
)
8792

8893

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
"""Extended deterministic fuzz matrix for MVARRuntime.
2+
3+
Adds 1,008 additional runtime robustness scenarios to close out the
4+
large-scale contract surface:
5+
- source/taint/provenance-shape permutations
6+
- sink/profile/input-class permutations
7+
- target pattern and metadata permutations
8+
"""
9+
10+
from __future__ import annotations
11+
12+
import os
13+
import sys
14+
import uuid
15+
from dataclasses import dataclass
16+
from itertools import islice, product
17+
18+
import pytest
19+
20+
sys.path.insert(
21+
0,
22+
os.path.join(
23+
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
24+
"src",
25+
),
26+
)
27+
28+
from clawzero.contracts import ActionRequest
29+
from clawzero.runtime import MVARRuntime
30+
31+
CASE_COUNT = 1008
32+
PROFILES: tuple[str, ...] = ("dev_balanced", "dev_strict", "prod_locked")
33+
SINK_TYPES: tuple[str, ...] = (
34+
"shell.exec",
35+
"filesystem.read",
36+
"filesystem.write",
37+
"credentials.access",
38+
"http.request",
39+
"tool.custom",
40+
)
41+
TAINT_LEVELS: tuple[str, ...] = ("trusted", "unknown", "untrusted")
42+
INPUT_CLASS_VALUES: tuple[str, ...] = ("trusted", "pre_authorized", "untrusted", "unknown")
43+
TARGET_KINDS: tuple[str, ...] = (
44+
"local_path",
45+
"traversal_path",
46+
"http_url",
47+
"loopback_url",
48+
"env_secret",
49+
"command",
50+
"custom_symbol",
51+
)
52+
SOURCE_VARIANTS: tuple[str, ...] = (
53+
"external_document",
54+
"user_request",
55+
"api_response",
56+
"tool_output",
57+
)
58+
59+
60+
@dataclass(frozen=True)
61+
class ExtendedFuzzCase:
62+
index: int
63+
profile: str
64+
sink_type: str
65+
taint_level: str
66+
input_class: str
67+
source: str
68+
target_kind: str
69+
70+
@property
71+
def case_id(self) -> str:
72+
return (
73+
f"xf{self.index:04d}__{self.profile}__{self.sink_type.replace('.', '_')}__"
74+
f"{self.taint_level}__{self.input_class}__{self.target_kind}"
75+
)
76+
77+
78+
def _target_for(case: ExtendedFuzzCase) -> str:
79+
idx = case.index
80+
if case.target_kind == "local_path":
81+
return f"/workspace/data/input_{idx:04d}.txt"
82+
if case.target_kind == "traversal_path":
83+
return f"/workspace/../../etc/passwd::{idx:04d}"
84+
if case.target_kind == "http_url":
85+
return f"https://attacker.example/fuzz/{idx:04d}"
86+
if case.target_kind == "loopback_url":
87+
return f"http://localhost:8080/fuzz/{idx:04d}"
88+
if case.target_kind == "env_secret":
89+
return f"env://FAKE_SECRET_{idx:04d}"
90+
if case.target_kind == "command":
91+
return f"bash -lc \"echo fuzz_{idx:04d}\""
92+
return f"tool.symbol.{idx:04d}"
93+
94+
95+
def _effective_profile(case: ExtendedFuzzCase) -> str:
96+
normalized_input = case.input_class.strip().lower()
97+
if normalized_input in {"", "unknown"} and case.taint_level not in {"trusted", "pre_authorized"}:
98+
normalized_input = "untrusted"
99+
if normalized_input == "untrusted" and case.profile == "dev_balanced":
100+
return "dev_strict"
101+
return case.profile
102+
103+
104+
def _build_cases() -> list[ExtendedFuzzCase]:
105+
combos = product(PROFILES, SINK_TYPES, TAINT_LEVELS, INPUT_CLASS_VALUES, SOURCE_VARIANTS, TARGET_KINDS)
106+
cases: list[ExtendedFuzzCase] = []
107+
for index, (profile, sink, taint, input_class, source, target_kind) in enumerate(
108+
islice(combos, CASE_COUNT),
109+
start=1,
110+
):
111+
cases.append(
112+
ExtendedFuzzCase(
113+
index=index,
114+
profile=profile,
115+
sink_type=sink,
116+
taint_level=taint,
117+
input_class=input_class,
118+
source=source,
119+
target_kind=target_kind,
120+
)
121+
)
122+
return cases
123+
124+
125+
def _runtime(profile: str) -> MVARRuntime:
126+
runtime = MVARRuntime(profile=profile)
127+
runtime._mvar_available = False
128+
runtime._mvar_governor = None
129+
runtime.engine = "embedded-policy-v0.1"
130+
runtime.policy_id = "mvar-embedded.v0.1"
131+
return runtime
132+
133+
134+
def _request_for(case: ExtendedFuzzCase) -> ActionRequest:
135+
target = _target_for(case)
136+
source_chain = [case.source, f"stage_{case.index % 5}", case.sink_type]
137+
taint_markers = [] if case.taint_level == "trusted" else ["external_content", f"case_{case.index:04d}"]
138+
return ActionRequest(
139+
request_id=str(uuid.uuid4()),
140+
framework="openclaw",
141+
action_type="tool_call",
142+
sink_type=case.sink_type,
143+
tool_name=f"fuzz_tool_{case.index % 13}",
144+
target=target,
145+
arguments={
146+
"target": target,
147+
"index": case.index,
148+
"metadata": {
149+
"source_variant": case.source,
150+
"target_kind": case.target_kind,
151+
"padding": "x" * ((case.index % 16) + 1),
152+
},
153+
},
154+
input_class=case.input_class,
155+
prompt_provenance={
156+
"source": case.source,
157+
"taint_level": case.taint_level,
158+
"taint_markers": taint_markers,
159+
"source_chain": source_chain,
160+
},
161+
policy_profile=case.profile,
162+
metadata={
163+
"adapter": {
164+
"name": "fuzz_extended",
165+
"framework": "matrix",
166+
"mode": "generated",
167+
}
168+
},
169+
)
170+
171+
172+
@pytest.mark.parametrize(
173+
"case",
174+
[pytest.param(case, id=case.case_id) for case in _build_cases()],
175+
)
176+
def test_engine_fuzz_extended_generated(case: ExtendedFuzzCase, monkeypatch: pytest.MonkeyPatch) -> None:
177+
monkeypatch.setattr(
178+
"clawzero.runtime.engine.generate_witness",
179+
lambda request, decision: {
180+
"request_id": request.request_id,
181+
"sink_type": request.sink_type,
182+
"decision": decision.decision,
183+
"reason_code": decision.reason_code,
184+
"witness_signature": "ed25519_stub:extendedfuzz",
185+
},
186+
)
187+
188+
runtime = _runtime(case.profile)
189+
decision = runtime.evaluate(_request_for(case))
190+
191+
assert decision.decision in {"allow", "block", "annotate"}
192+
assert bool(decision.reason_code)
193+
assert decision.sink_type == case.sink_type
194+
assert decision.policy_profile == _effective_profile(case)
195+
assert decision.annotations.get("effective_policy_profile") == _effective_profile(case)
196+
assert decision.annotations.get("input_class") in {"trusted", "pre_authorized", "untrusted"}
197+
assert isinstance(runtime.last_witness, dict)
198+
assert runtime.last_witness.get("decision") == decision.decision
199+
assert runtime.last_witness.get("sink_type") == case.sink_type

0 commit comments

Comments
 (0)