|
23 | 23 |
|
24 | 24 | from clawzero.contracts import ActionRequest, InputClass |
25 | 25 | from clawzero.runtime import AgentSession, MVARRuntime |
| 26 | +from clawzero.runtime.chain_patterns import CHAIN_THRESHOLDS |
26 | 27 |
|
27 | 28 | PROFILES: tuple[str, ...] = ("dev_balanced", "dev_strict", "prod_locked") |
28 | 29 | TECHNIQUE_COUNT = 100 |
@@ -219,6 +220,13 @@ def _input_class_for(taint_level: str) -> str: |
219 | 220 | return InputClass.UNTRUSTED.value |
220 | 221 |
|
221 | 222 |
|
| 223 | +def _expected_witness_taint_level(case: EvasionCase) -> str: |
| 224 | + # Engine contract: unknown and untrusted inputs are normalized to untrusted. |
| 225 | + if case.taint_level in {"unknown", "untrusted"}: |
| 226 | + return "untrusted" |
| 227 | + return case.taint_level |
| 228 | + |
| 229 | + |
222 | 230 | def _runtime(profile: str, witness_dir: Path) -> MVARRuntime: |
223 | 231 | runtime = MVARRuntime(profile=profile, witness_dir=witness_dir) |
224 | 232 | runtime._mvar_available = False |
@@ -284,25 +292,59 @@ def test_adversarial_evasion_cases_block_and_detect(case: EvasionCase, tmp_path: |
284 | 292 | decisions.append(decision) |
285 | 293 | assert decision.decision == "block" |
286 | 294 | assert decision.reason_code == case.category.expected_reason_code |
| 295 | + assert decision.sink_type == case.category.sink_type |
287 | 296 | assert decision.annotations.get("effective_policy_profile") == case.expected_profile |
288 | | - assert runtime.last_witness is not None |
| 297 | + witness = runtime.last_witness |
| 298 | + assert isinstance(witness, dict) |
| 299 | + assert witness.get("request_id") == decision.request_id |
| 300 | + assert witness.get("decision") == "block" |
| 301 | + assert witness.get("reason_code") == case.category.expected_reason_code |
| 302 | + assert witness.get("sink_type") == case.category.sink_type |
| 303 | + provenance = witness.get("provenance") |
| 304 | + assert isinstance(provenance, dict) |
| 305 | + assert provenance.get("taint_level") == _expected_witness_taint_level(case) |
| 306 | + markers = provenance.get("taint_markers") |
| 307 | + assert isinstance(markers, list) |
| 308 | + assert case.technique.label in markers |
| 309 | + assert f"stage_{stage}" in markers |
289 | 310 |
|
290 | 311 | final_session = decisions[-1].annotations.get("session", {}) |
291 | | - chain_patterns = { |
292 | | - detection.get("pattern") |
293 | | - for detection in final_session.get("chain_detections", []) |
294 | | - if isinstance(detection, dict) |
295 | | - } |
| 312 | + detections = [d for d in final_session.get("chain_detections", []) if isinstance(d, dict)] |
| 313 | + chain_patterns = {detection.get("pattern") for detection in detections} |
296 | 314 | assert "taint_continuity" in chain_patterns |
297 | 315 |
|
| 316 | + taint_detections = [d for d in detections if d.get("pattern") == "taint_continuity"] |
| 317 | + assert taint_detections |
| 318 | + request_ids = {decision.request_id for decision in decisions} |
| 319 | + min_untrusted_count = int(CHAIN_THRESHOLDS[case.expected_profile]["min_untrusted_count"]) |
| 320 | + for detection in taint_detections: |
| 321 | + evidence = detection.get("evidence") |
| 322 | + assert isinstance(evidence, list) |
| 323 | + assert len(evidence) >= min_untrusted_count |
| 324 | + assert set(evidence).issubset(request_ids) |
| 325 | + |
298 | 326 | # Explicit evasion contract: source ID fragmentation must still trigger continuity detection. |
299 | 327 | if case.technique.family == "source_id_fragmentation": |
300 | 328 | stage_sources = [decision.annotations["session"]["source_id"] for decision in decisions] |
301 | 329 | assert len(set(stage_sources)) == CHAIN_DEPTH |
| 330 | + assert any( |
| 331 | + "source-independent" in str(detection.get("primary_signal", "")) |
| 332 | + for detection in taint_detections |
| 333 | + ) |
302 | 334 |
|
303 | 335 | report = session.get_session_report() |
304 | 336 | assert report["total_calls"] == CHAIN_DEPTH |
305 | 337 | assert report["blocked_calls"] == CHAIN_DEPTH |
306 | 338 | assert report["witness_chain_length"] == CHAIN_DEPTH |
307 | | - assert report["chain_detector"]["detections"] |
| 339 | + chain_detector_report = report["chain_detector"] |
| 340 | + assert chain_detector_report["events"] == CHAIN_DEPTH |
| 341 | + assert chain_detector_report["detections"] |
| 342 | + assert any( |
| 343 | + detection.get("pattern") == "taint_continuity" |
| 344 | + for detection in chain_detector_report["detections"] |
| 345 | + ) |
308 | 346 |
|
| 347 | + log_path = Path(report["log_path"]) |
| 348 | + assert log_path.exists() |
| 349 | + with log_path.open("r", encoding="utf-8") as handle: |
| 350 | + assert sum(1 for _ in handle) == CHAIN_DEPTH |
0 commit comments