1818import sys
1919import uuid
2020from dataclasses import dataclass
21+ from urllib .parse import urlparse
2122
2223import pytest
2324
@@ -79,14 +80,31 @@ def expected_profile(self) -> str:
7980 return "dev_strict"
8081 return self .profile
8182
83+ @property
84+ def expected_input_class (self ) -> str :
85+ normalized_input = self .input_class .strip ().lower ()
86+ if normalized_input in {"trusted" , "pre_authorized" , "untrusted" }:
87+ return normalized_input
88+ normalized_taint = self .taint_level .strip ().lower ()
89+ if normalized_taint in {"trusted" , "clean" }:
90+ return "trusted"
91+ if normalized_taint in {"pre_authorized" , "pre-authorized" }:
92+ return "pre_authorized"
93+ return "untrusted"
94+
8295
8396def _random_token (rng : random .Random , * , min_len : int = 4 , max_len : int = 64 ) -> str :
8497 length = rng .randint (min_len , max_len )
8598 alphabet = string .ascii_letters + string .digits + "-_.:/?&=%+@!$[]{}()"
8699 return "" .join (rng .choice (alphabet ) for _ in range (length ))
87100
88101
89- def _target_for (rng : random .Random , sink_type : str , index : int ) -> str :
102+ def _target_for (
103+ rng : random .Random ,
104+ sink_type : str ,
105+ index : int ,
106+ sink_case_index : int ,
107+ ) -> str :
90108 token = _random_token (rng , min_len = 8 , max_len = 80 )
91109 if sink_type == "shell.exec" :
92110 return f"bash -lc \" echo fuzz_{ index } ; { token } \" "
@@ -96,7 +114,7 @@ def _target_for(rng: random.Random, sink_type: str, index: int) -> str:
96114 f"/etc/passwd::{ token } " ,
97115 f"/workspace/../../etc/shadow::{ token } " ,
98116 )
99- return candidates [index % len (candidates )]
117+ return candidates [sink_case_index % len (candidates )]
100118 if sink_type == "filesystem.write" :
101119 return f"/tmp/fuzz_write_{ index :04d} _{ token [:12 ]} .txt"
102120 if sink_type == "credentials.access" :
@@ -111,13 +129,16 @@ def _target_for(rng: random.Random, sink_type: str, index: int) -> str:
111129def _build_cases () -> list [FuzzCase ]:
112130 rng = random .Random (20260412 )
113131 cases : list [FuzzCase ] = []
132+ sink_counts = {sink : 0 for sink in SINK_TYPES }
114133 for index in range (1 , CASE_COUNT + 1 ):
115134 profile = PROFILES [(index - 1 ) % len (PROFILES )]
116135 sink_type = SINK_TYPES [(index - 1 ) % len (SINK_TYPES )]
136+ sink_case_index = sink_counts [sink_type ]
137+ sink_counts [sink_type ] += 1
117138 taint_level = TAINT_LEVELS [(index - 1 ) % len (TAINT_LEVELS )]
118139 input_class = INPUT_CLASS_VALUES [(index - 1 ) % len (INPUT_CLASS_VALUES )]
119140 source = f"fuzz_source_{ index :04d} _{ _random_token (rng , min_len = 4 , max_len = 10 )} "
120- target = _target_for (rng , sink_type , index )
141+ target = _target_for (rng , sink_type , index , sink_case_index )
121142 markers = (
122143 "fuzzing" ,
123144 sink_type .replace ("." , "_" ),
@@ -173,6 +194,65 @@ def _request_for(case: FuzzCase) -> ActionRequest:
173194 )
174195
175196
197+ def _expected_decision_for (case : FuzzCase ) -> tuple [str , str ]:
198+ profile = case .expected_profile
199+
200+ if case .sink_type == "shell.exec" :
201+ return "block" , "UNTRUSTED_TO_CRITICAL_SINK"
202+ if case .sink_type == "credentials.access" :
203+ return "block" , "CREDENTIAL_ACCESS_BLOCKED"
204+ if case .sink_type == "filesystem.read" :
205+ target = case .target
206+ target_lower = target .lower ()
207+ has_traversal_signal = any (
208+ token in target_lower for token in ("../" , "..\\ " , "%2e%2e" , "%2f" , "%5c" , "%32%65" , "\x00 " )
209+ )
210+ is_sensitive_target = target_lower .startswith (
211+ ("/etc/" , "/proc/" , "/root/" , "/home/" , "~/.ssh/" , "/users/" )
212+ )
213+ if profile == "dev_balanced" :
214+ if has_traversal_signal or is_sensitive_target :
215+ return "block" , "PATH_BLOCKED"
216+ return "allow" , "ALLOWLIST_MATCH"
217+ if profile == "dev_strict" :
218+ if (
219+ not has_traversal_signal
220+ and not is_sensitive_target
221+ and target_lower .startswith ("/workspace/" )
222+ ):
223+ return "allow" , "ALLOWLIST_MATCH"
224+ return "block" , "PATH_BLOCKED"
225+ if (
226+ not has_traversal_signal
227+ and not is_sensitive_target
228+ and target_lower .startswith ("/workspace/project/" )
229+ ):
230+ return "allow" , "ALLOWLIST_MATCH"
231+ return "block" , "PATH_BLOCKED"
232+ if case .sink_type == "http.request" :
233+ if profile == "dev_balanced" :
234+ return "allow" , "POLICY_ALLOW"
235+ if profile == "dev_strict" :
236+ return "block" , "DOMAIN_BLOCKED"
237+ hostname = urlparse (case .target ).hostname or case .target
238+ if hostname in {"localhost" , "127.0.0.1" }:
239+ return "allow" , "ALLOWLIST_MATCH"
240+ return "block" , "DOMAIN_BLOCKED"
241+ if case .sink_type == "tool.custom" :
242+ if profile == "dev_strict" :
243+ if case .expected_input_class in {"trusted" , "pre_authorized" }:
244+ return "allow" , "POLICY_ALLOW"
245+ return "annotate" , "STEP_UP_REQUIRED"
246+ if profile == "dev_balanced" and case .expected_input_class == "untrusted" :
247+ return "annotate" , "STEP_UP_REQUIRED"
248+ return "allow" , "POLICY_ALLOW"
249+ return "allow" , "POLICY_ALLOW"
250+
251+
252+ def _expected_witness_taint_level (case : FuzzCase ) -> str :
253+ return "trusted" if case .expected_input_class in {"trusted" , "pre_authorized" } else "untrusted"
254+
255+
176256@pytest .mark .parametrize (
177257 "case" ,
178258 [pytest .param (case , id = case .case_id ) for case in _build_cases ()],
@@ -184,20 +264,45 @@ def test_engine_fuzz_corpus_generated(case: FuzzCase, monkeypatch: pytest.Monkey
184264 lambda request , decision : {
185265 "request_id" : request .request_id ,
186266 "sink_type" : request .sink_type ,
267+ "target" : request .target ,
187268 "decision" : decision .decision ,
269+ "reason_code" : decision .reason_code ,
270+ "policy_profile" : decision .policy_profile ,
271+ "provenance" : request .prompt_provenance ,
188272 "witness_signature" : "ed25519_stub:fuzz" ,
189273 },
190274 )
191275
192276 runtime = _runtime (case .profile )
193277 decision = runtime .evaluate (_request_for (case ))
278+ expected_decision , expected_reason = _expected_decision_for (case )
194279
195- assert decision .decision in { "allow" , "block" , "annotate" }
196- assert bool ( decision .reason_code )
280+ assert decision .decision == expected_decision
281+ assert decision .reason_code == expected_reason
197282 assert decision .sink_type == case .sink_type
283+ assert decision .target == case .target
284+ assert decision .policy_profile == case .expected_profile
198285 assert decision .annotations .get ("effective_policy_profile" ) == case .expected_profile
199- assert decision .annotations .get ("input_class" ) in {"trusted" , "pre_authorized" , "untrusted" }
200- assert isinstance (runtime .last_witness , dict )
201- assert runtime .last_witness .get ("decision" ) == decision .decision
202- assert runtime .last_witness .get ("sink_type" ) == case .sink_type
286+ assert decision .annotations .get ("input_class" ) == case .expected_input_class
287+ witness = runtime .last_witness
288+ assert isinstance (witness , dict )
289+ assert witness .get ("request_id" ) == decision .request_id
290+ assert witness .get ("decision" ) == decision .decision
291+ assert witness .get ("reason_code" ) == decision .reason_code
292+ assert witness .get ("sink_type" ) == case .sink_type
293+ assert witness .get ("target" ) == case .target
294+ assert witness .get ("policy_profile" ) == case .expected_profile
295+ provenance = witness .get ("provenance" )
296+ assert isinstance (provenance , dict )
297+ assert provenance .get ("source" ) == case .source
298+ assert provenance .get ("taint_level" ) == _expected_witness_taint_level (case )
299+ assert provenance .get ("source_chain" ) == ["fuzz" , case .source , case .sink_type ]
300+ assert provenance .get ("taint_markers" ) == list (case .markers )
301+
203302
303+ def test_engine_fuzz_generated_gap_cross_suite_dedup_not_enforced () -> None :
304+ pytest .skip (
305+ "Gap (explicit): this legacy fuzz suite now enforces deterministic runtime contracts, "
306+ "but does not yet enforce deduplication boundaries versus "
307+ "test_engine_fuzz_extended_generated.py targeted adversarial scenarios."
308+ )
0 commit comments