22
33from __future__ import annotations
44
5+ from collections import defaultdict
56import os
67import sys
78import uuid
1415
1516from clawzero .contracts import ActionRequest , InputClass
1617from clawzero .runtime import MVARRuntime
17- from policy_matrix_data import generate_policy_matrix_cases
18+ from policy_matrix_data import TARGET_BY_SINK , generate_policy_matrix_cases
1819
1920
2021def _runtime (tmp_path : Path , profile : str ) -> MVARRuntime :
@@ -37,6 +38,13 @@ def _input_class_for_taint(taint_level: str) -> str | None:
3738 return None
3839
3940
41+ def _expected_witness_taint_level (taint_level : str ) -> str :
42+ # Engine contract: unknown and untrusted inputs normalize to untrusted.
43+ if taint_level in {"unknown" , "untrusted" }:
44+ return "untrusted"
45+ return taint_level
46+
47+
4048MATRIX_CASES = generate_policy_matrix_cases ()
4149
4250
@@ -65,5 +73,54 @@ def test_policy_matrix(case, tmp_path: Path) -> None:
6573
6674 assert decision .decision == case .expected_decision
6775 assert decision .reason_code == case .expected_reason_code
76+ assert decision .sink_type == case .sink_type
77+ assert decision .target == case .target
6878 assert decision .policy_profile == case .expected_profile
6979 assert decision .annotations ["effective_policy_profile" ] == case .expected_profile
80+
81+ witness = runtime .last_witness
82+ assert isinstance (witness , dict )
83+ assert witness .get ("request_id" ) == decision .request_id
84+ assert witness .get ("decision" ) == case .expected_decision
85+ assert witness .get ("reason_code" ) == case .expected_reason_code
86+ assert witness .get ("sink_type" ) == case .sink_type
87+ assert witness .get ("target" ) == case .target
88+ assert witness .get ("policy_id" ) == decision .policy_id
89+
90+ provenance = witness .get ("provenance" )
91+ assert isinstance (provenance , dict )
92+ assert provenance .get ("source" ) == case .source
93+ assert provenance .get ("taint_level" ) == _expected_witness_taint_level (case .taint_level )
94+ assert provenance .get ("source_chain" ) == [case .source , "policy_matrix" ]
95+ expected_markers = [] if case .taint_level == "trusted" else ["matrix_untrusted" ]
96+ assert provenance .get ("taint_markers" ) == expected_markers
97+
98+
99+ def test_policy_matrix_contract_source_dimension_is_explicitly_invariant () -> None :
100+ """Source values are coverage labels; enforcement is driven by taint/sink/profile."""
101+ by_contract_key : dict [tuple [str , str , str ], set [tuple [str , str , str ]]] = defaultdict (set )
102+ for case in MATRIX_CASES :
103+ by_contract_key [(case .taint_level , case .sink_type , case .profile )].add (
104+ (case .expected_decision , case .expected_reason_code , case .expected_profile )
105+ )
106+
107+ for key , outcomes in by_contract_key .items ():
108+ assert len (outcomes ) == 1 , f"Source-invariance violated for { key } : { outcomes } "
109+
110+
111+ def test_policy_matrix_contract_fixed_targets_define_scope () -> None :
112+ """Matrix target choices are explicit and define current coverage boundaries."""
113+ assert TARGET_BY_SINK ["filesystem.read" ] == "/etc/passwd"
114+ filesystem_read_cases = [case for case in MATRIX_CASES if case .sink_type == "filesystem.read" ]
115+ assert filesystem_read_cases
116+ for case in filesystem_read_cases :
117+ assert case .target == "/etc/passwd"
118+ assert case .expected_decision == "block"
119+ assert case .expected_reason_code == "PATH_BLOCKED"
120+
121+
122+ def test_policy_matrix_gap_filesystem_read_allow_paths_not_covered () -> None :
123+ pytest .skip (
124+ "Gap (explicit): this matrix intentionally pins filesystem.read to /etc/passwd, "
125+ "so allowlist read paths (for example /workspace/project/*) are out of scope here."
126+ )
0 commit comments