diff --git a/artifacts/mcp_trace_corruption_manifest.json b/artifacts/mcp_trace_corruption_manifest.json new file mode 100644 index 0000000..0bccaf2 --- /dev/null +++ b/artifacts/mcp_trace_corruption_manifest.json @@ -0,0 +1,185 @@ +{ + "manifest_id": "mcp_trace_corruption_manifest_v1", + "version": "1.0", + "allowed_operators": [ + "DROP_APPROVAL_GATE", + "SWAP_TOOL_ORDER", + "TRUNCATE_RECOVERY_PATH", + "REMOVE_DEPENDENCY_EDGE", + "INSERT_UNVALIDATED_ACTION", + "COLLAPSE_CAPABILITY_BOUNDARY" + ], + "corruptions": [ + { + "corruption_id": "mcp_trace_replay_degraded_v1::collapse_capability_boundary", + "source_fixture": "fixtures/mcp_trace_replay_degraded_v1", + "operator": "COLLAPSE_CAPABILITY_BOUNDARY", + "expected_failure_label": "CAPABILITY_BOUNDARY_LOSS", + "expected_contract_violation": "capability_boundary_respected", + "deterministic": true, + "notes": "Collapse state capability boundary by removing enforcement link." + }, + { + "corruption_id": "mcp_trace_replay_degraded_v1::drop_approval_gate", + "source_fixture": "fixtures/mcp_trace_replay_degraded_v1", + "operator": "DROP_APPROVAL_GATE", + "expected_failure_label": "APPROVAL_GATE_LOSS", + "expected_contract_violation": "validation_before_unsafe_action", + "deterministic": true, + "notes": "Remove explicit human approval gate from state capability boundaries." + }, + { + "corruption_id": "mcp_trace_replay_degraded_v1::insert_unvalidated_action", + "source_fixture": "fixtures/mcp_trace_replay_degraded_v1", + "operator": "INSERT_UNVALIDATED_ACTION", + "expected_failure_label": "POLICY_ENFORCEMENT_GAP", + "expected_contract_violation": "validation_before_unsafe_action", + "deterministic": true, + "notes": "Insert execute action before validation step in trace ordering." + }, + { + "corruption_id": "mcp_trace_replay_degraded_v1::remove_dependency_edge", + "source_fixture": "fixtures/mcp_trace_replay_degraded_v1", + "operator": "REMOVE_DEPENDENCY_EDGE", + "expected_failure_label": "DEPENDENCY_CHAIN_BREAK", + "expected_contract_violation": "dependency_chain_preserved", + "deterministic": true, + "notes": "Remove a required prerequisite edge in dependency graph." + }, + { + "corruption_id": "mcp_trace_replay_degraded_v1::swap_tool_order", + "source_fixture": "fixtures/mcp_trace_replay_degraded_v1", + "operator": "SWAP_TOOL_ORDER", + "expected_failure_label": "TOOL_ORDER_VIOLATION", + "expected_contract_violation": "tool_call_order_preserved", + "deterministic": true, + "notes": "Swap consecutive tool/validation steps in trace to violate deterministic order." + }, + { + "corruption_id": "mcp_trace_replay_degraded_v1::truncate_recovery_path", + "source_fixture": "fixtures/mcp_trace_replay_degraded_v1", + "operator": "TRUNCATE_RECOVERY_PATH", + "expected_failure_label": "RECOVERY_PATH_INVALID", + "expected_contract_violation": "recovery_path_available", + "deterministic": true, + "notes": "Drop the terminal recovery event from trace path." + }, + { + "corruption_id": "mcp_trace_replay_mild_v1::collapse_capability_boundary", + "source_fixture": "fixtures/mcp_trace_replay_mild_v1", + "operator": "COLLAPSE_CAPABILITY_BOUNDARY", + "expected_failure_label": "CAPABILITY_BOUNDARY_LOSS", + "expected_contract_violation": "capability_boundary_respected", + "deterministic": true, + "notes": "Collapse state capability boundary by removing enforcement link." + }, + { + "corruption_id": "mcp_trace_replay_mild_v1::drop_approval_gate", + "source_fixture": "fixtures/mcp_trace_replay_mild_v1", + "operator": "DROP_APPROVAL_GATE", + "expected_failure_label": "APPROVAL_GATE_LOSS", + "expected_contract_violation": "validation_before_unsafe_action", + "deterministic": true, + "notes": "Remove explicit human approval gate from state capability boundaries." + }, + { + "corruption_id": "mcp_trace_replay_mild_v1::insert_unvalidated_action", + "source_fixture": "fixtures/mcp_trace_replay_mild_v1", + "operator": "INSERT_UNVALIDATED_ACTION", + "expected_failure_label": "POLICY_ENFORCEMENT_GAP", + "expected_contract_violation": "validation_before_unsafe_action", + "deterministic": true, + "notes": "Insert execute action before validation step in trace ordering." + }, + { + "corruption_id": "mcp_trace_replay_mild_v1::remove_dependency_edge", + "source_fixture": "fixtures/mcp_trace_replay_mild_v1", + "operator": "REMOVE_DEPENDENCY_EDGE", + "expected_failure_label": "DEPENDENCY_CHAIN_BREAK", + "expected_contract_violation": "dependency_chain_preserved", + "deterministic": true, + "notes": "Remove a required prerequisite edge in dependency graph." + }, + { + "corruption_id": "mcp_trace_replay_mild_v1::swap_tool_order", + "source_fixture": "fixtures/mcp_trace_replay_mild_v1", + "operator": "SWAP_TOOL_ORDER", + "expected_failure_label": "TOOL_ORDER_VIOLATION", + "expected_contract_violation": "tool_call_order_preserved", + "deterministic": true, + "notes": "Swap consecutive tool/validation steps in trace to violate deterministic order." + }, + { + "corruption_id": "mcp_trace_replay_mild_v1::truncate_recovery_path", + "source_fixture": "fixtures/mcp_trace_replay_mild_v1", + "operator": "TRUNCATE_RECOVERY_PATH", + "expected_failure_label": "RECOVERY_PATH_INVALID", + "expected_contract_violation": "recovery_path_available", + "deterministic": true, + "notes": "Drop the terminal recovery event from trace path." + }, + { + "corruption_id": "mcp_trace_replay_moderate_v1::collapse_capability_boundary", + "source_fixture": "fixtures/mcp_trace_replay_moderate_v1", + "operator": "COLLAPSE_CAPABILITY_BOUNDARY", + "expected_failure_label": "CAPABILITY_BOUNDARY_LOSS", + "expected_contract_violation": "capability_boundary_respected", + "deterministic": true, + "notes": "Collapse state capability boundary by removing enforcement link." + }, + { + "corruption_id": "mcp_trace_replay_moderate_v1::drop_approval_gate", + "source_fixture": "fixtures/mcp_trace_replay_moderate_v1", + "operator": "DROP_APPROVAL_GATE", + "expected_failure_label": "APPROVAL_GATE_LOSS", + "expected_contract_violation": "validation_before_unsafe_action", + "deterministic": true, + "notes": "Remove explicit human approval gate from state capability boundaries." + }, + { + "corruption_id": "mcp_trace_replay_moderate_v1::insert_unvalidated_action", + "source_fixture": "fixtures/mcp_trace_replay_moderate_v1", + "operator": "INSERT_UNVALIDATED_ACTION", + "expected_failure_label": "POLICY_ENFORCEMENT_GAP", + "expected_contract_violation": "validation_before_unsafe_action", + "deterministic": true, + "notes": "Insert execute action before validation step in trace ordering." + }, + { + "corruption_id": "mcp_trace_replay_moderate_v1::remove_dependency_edge", + "source_fixture": "fixtures/mcp_trace_replay_moderate_v1", + "operator": "REMOVE_DEPENDENCY_EDGE", + "expected_failure_label": "DEPENDENCY_CHAIN_BREAK", + "expected_contract_violation": "dependency_chain_preserved", + "deterministic": true, + "notes": "Remove a required prerequisite edge in dependency graph." + }, + { + "corruption_id": "mcp_trace_replay_moderate_v1::swap_tool_order", + "source_fixture": "fixtures/mcp_trace_replay_moderate_v1", + "operator": "SWAP_TOOL_ORDER", + "expected_failure_label": "TOOL_ORDER_VIOLATION", + "expected_contract_violation": "tool_call_order_preserved", + "deterministic": true, + "notes": "Swap consecutive tool/validation steps in trace to violate deterministic order." + }, + { + "corruption_id": "mcp_trace_replay_moderate_v1::truncate_recovery_path", + "source_fixture": "fixtures/mcp_trace_replay_moderate_v1", + "operator": "TRUNCATE_RECOVERY_PATH", + "expected_failure_label": "RECOVERY_PATH_INVALID", + "expected_contract_violation": "recovery_path_available", + "deterministic": true, + "notes": "Drop the terminal recovery event from trace path." + } + ], + "summary": { + "fixture_count": 3, + "corruption_count": 18, + "skipped_operator_count": 0, + "skipped_operators": [], + "deterministic_evaluation": true, + "llm_judges": "none", + "external_apis": "none" + } +} diff --git a/scripts/generate_mcp_trace_corruptions.py b/scripts/generate_mcp_trace_corruptions.py new file mode 100644 index 0000000..3fc13f9 --- /dev/null +++ b/scripts/generate_mcp_trace_corruptions.py @@ -0,0 +1,184 @@ +"""Generate deterministic MCP trace corruption manifest from checked-in fixtures.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +REPO_ROOT = Path(__file__).resolve().parents[1] +FIXTURES_ROOT = REPO_ROOT / "fixtures" +OUTPUT_PATH = REPO_ROOT / "artifacts" / "mcp_trace_corruption_manifest.json" +REQUIRED_FIXTURE_FILES: tuple[str, ...] = ("trace.json", "dependency_graph.json", "state.json") + +OPERATORS: tuple[str, ...] = ( + "DROP_APPROVAL_GATE", + "SWAP_TOOL_ORDER", + "TRUNCATE_RECOVERY_PATH", + "REMOVE_DEPENDENCY_EDGE", + "INSERT_UNVALIDATED_ACTION", + "COLLAPSE_CAPABILITY_BOUNDARY", +) + +OPERATOR_EXPECTATIONS: dict[str, dict[str, str]] = { + "DROP_APPROVAL_GATE": { + "expected_failure_label": "APPROVAL_GATE_LOSS", + "expected_contract_violation": "validation_before_unsafe_action", + "notes": "Remove explicit human approval gate from state capability boundaries.", + }, + "SWAP_TOOL_ORDER": { + "expected_failure_label": "TOOL_ORDER_VIOLATION", + "expected_contract_violation": "tool_call_order_preserved", + "notes": "Swap consecutive tool/validation steps in trace to violate deterministic order.", + }, + "TRUNCATE_RECOVERY_PATH": { + "expected_failure_label": "RECOVERY_PATH_INVALID", + "expected_contract_violation": "recovery_path_available", + "notes": "Drop the terminal recovery event from trace path.", + }, + "REMOVE_DEPENDENCY_EDGE": { + "expected_failure_label": "DEPENDENCY_CHAIN_BREAK", + "expected_contract_violation": "dependency_chain_preserved", + "notes": "Remove a required prerequisite edge in dependency graph.", + }, + "INSERT_UNVALIDATED_ACTION": { + "expected_failure_label": "POLICY_ENFORCEMENT_GAP", + "expected_contract_violation": "validation_before_unsafe_action", + "notes": "Insert execute action before validation step in trace ordering.", + }, + "COLLAPSE_CAPABILITY_BOUNDARY": { + "expected_failure_label": "CAPABILITY_BOUNDARY_LOSS", + "expected_contract_violation": "capability_boundary_respected", + "notes": "Collapse state capability boundary by removing enforcement link.", + }, +} + + +def _repo_relative(path: Path) -> str: + return path.relative_to(REPO_ROOT).as_posix() + + +def _load_json(path: Path) -> dict[str, Any]: + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except FileNotFoundError as exc: + raise RuntimeError(f"Required JSON file is missing: {_repo_relative(path)}") from exc + except json.JSONDecodeError as exc: + raise RuntimeError(f"Invalid JSON in {_repo_relative(path)}: {exc}") from exc + + if not isinstance(payload, dict): + raise RuntimeError(f"Expected JSON object in {_repo_relative(path)}") + return payload + + +def _as_list(value: Any, *, field: str) -> list[Any]: + if value is None: + return [] + if not isinstance(value, list): + raise RuntimeError(f"Expected list field: {field}") + return value + + +def _mcp_fixtures() -> list[Path]: + candidates = sorted(FIXTURES_ROOT.glob("mcp_trace_replay_*_v1/original")) + + for path in candidates: + missing = [name for name in REQUIRED_FIXTURE_FILES if not (path / name).exists()] + if missing: + raise RuntimeError( + f"Incomplete MCP fixture {_repo_relative(path)}; missing: {', '.join(missing)}" + ) + + return candidates + + +def _trace_actions(trace: dict[str, Any]) -> list[str]: + events = _as_list(trace.get("events"), field="trace.events") + return [str(event.get("action", "")) for event in events if isinstance(event, dict)] + + +def _supports_operator(operator: str, trace: dict[str, Any], graph: dict[str, Any], state: dict[str, Any]) -> bool: + actions = _trace_actions(trace) + edges = _as_list(graph.get("edges"), field="dependency_graph.edges") + boundaries = _as_list(state.get("capability_boundaries"), field="state.capability_boundaries") + + if operator == "DROP_APPROVAL_GATE": + return ["human_approval", "execute_external_action"] in boundaries + if operator == "SWAP_TOOL_ORDER": + return "tool_schema_validated" in actions and "read_context" in actions + if operator == "TRUNCATE_RECOVERY_PATH": + return "recovery_path_registered" in actions + if operator == "REMOVE_DEPENDENCY_EDGE": + return any( + isinstance(edge, dict) + and edge.get("source") == "read_context" + and edge.get("target") == "validate_external_action" + for edge in edges + ) + if operator == "INSERT_UNVALIDATED_ACTION": + return "validate_external_action" in actions and "execute_external_action" in actions + if operator == "COLLAPSE_CAPABILITY_BOUNDARY": + return ["capability_scope_checked", "validate_external_action"] in boundaries + return False + + +def generate_mcp_trace_corruption_manifest(output_path: Path = OUTPUT_PATH) -> Path: + entries: list[dict[str, Any]] = [] + skipped: list[dict[str, str]] = [] + fixtures = _mcp_fixtures() + + for original_dir in fixtures: + fixture_root = original_dir.parent + source_fixture = _repo_relative(fixture_root) + trace = _load_json(original_dir / "trace.json") + graph = _load_json(original_dir / "dependency_graph.json") + state = _load_json(original_dir / "state.json") + + for operator in OPERATORS: + if not _supports_operator(operator, trace, graph, state): + skipped.append({"source_fixture": source_fixture, "operator": operator}) + continue + + expected = OPERATOR_EXPECTATIONS[operator] + entries.append( + { + "corruption_id": f"{fixture_root.name}::{operator.lower()}", + "source_fixture": source_fixture, + "operator": operator, + "expected_failure_label": expected["expected_failure_label"], + "expected_contract_violation": expected["expected_contract_violation"], + "deterministic": True, + "notes": expected["notes"], + } + ) + + entries.sort(key=lambda item: (item["source_fixture"], item["operator"], item["corruption_id"])) + skipped.sort(key=lambda item: (item["source_fixture"], item["operator"])) + + payload = { + "manifest_id": "mcp_trace_corruption_manifest_v1", + "version": "1.0", + "allowed_operators": list(OPERATORS), + "corruptions": entries, + "summary": { + "fixture_count": len(fixtures), + "corruption_count": len(entries), + "skipped_operator_count": len(skipped), + "skipped_operators": skipped, + "deterministic_evaluation": True, + "llm_judges": "none", + "external_apis": "none", + }, + } + + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text( + json.dumps(payload, indent=2, sort_keys=False, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + return output_path + + +if __name__ == "__main__": + path = generate_mcp_trace_corruption_manifest() + print(path.relative_to(REPO_ROOT).as_posix()) diff --git a/tests/test_mcp_trace_corruption_manifest.py b/tests/test_mcp_trace_corruption_manifest.py new file mode 100644 index 0000000..a26a6c4 --- /dev/null +++ b/tests/test_mcp_trace_corruption_manifest.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import json +from pathlib import Path + +from scripts.generate_mcp_trace_corruptions import ( + OPERATORS, + generate_mcp_trace_corruption_manifest, +) +from src.validation.failure_taxonomy import FAILURE_TAXONOMY + +REPO_ROOT = Path(__file__).resolve().parents[1] +MANIFEST_PATH = REPO_ROOT / "artifacts" / "mcp_trace_corruption_manifest.json" +FORBIDDEN_TOKENS = ( + "generated_at", + "timestamp", + "host", + "user", + "env", +) + + +def _load_manifest() -> dict[str, object]: + return json.loads(MANIFEST_PATH.read_text(encoding="utf-8")) + + +def test_manifest_exists() -> None: + assert MANIFEST_PATH.exists() + + +def test_manifest_top_level_schema_is_stable() -> None: + manifest = _load_manifest() + assert list(manifest) == ["manifest_id", "version", "allowed_operators", "corruptions", "summary"] + assert manifest["manifest_id"] == "mcp_trace_corruption_manifest_v1" + assert manifest["version"] == "1.0" + assert manifest["allowed_operators"] == list(OPERATORS) + + +def test_entries_are_deterministically_sorted_and_ids_unique() -> None: + manifest = _load_manifest() + corruptions = manifest["corruptions"] + assert isinstance(corruptions, list) + sort_keys = [ + (entry["source_fixture"], entry["operator"], entry["corruption_id"]) + for entry in corruptions + ] + assert sort_keys == sorted(sort_keys) + + ids = [entry["corruption_id"] for entry in corruptions] + assert len(ids) == len(set(ids)) + + +def test_entries_use_allowed_operators_and_registered_labels() -> None: + manifest = _load_manifest() + allowed_ops = set(OPERATORS) + registered_labels = set(FAILURE_TAXONOMY) + + for entry in manifest["corruptions"]: + assert entry["operator"] in allowed_ops + assert entry["expected_failure_label"] in registered_labels + assert entry["deterministic"] is True + + +def test_source_fixtures_exist_and_paths_are_relative() -> None: + manifest = _load_manifest() + + for entry in manifest["corruptions"]: + source_fixture = entry["source_fixture"] + source_path = REPO_ROOT / source_fixture + assert source_path.exists() + assert not Path(source_fixture).is_absolute() + + +def test_manifest_summary_matches_actual_entries() -> None: + manifest = _load_manifest() + corruptions = manifest["corruptions"] + summary = manifest["summary"] + assert isinstance(corruptions, list) + assert isinstance(summary, dict) + + assert summary["corruption_count"] == len(corruptions) + assert summary["fixture_count"] == len( + {entry["source_fixture"] for entry in corruptions} + ) + + +def test_manifest_has_no_time_or_environment_fields() -> None: + manifest_text = MANIFEST_PATH.read_text(encoding="utf-8") + lower_text = manifest_text.lower() + + for token in FORBIDDEN_TOKENS: + assert f'"{token}":' not in lower_text + + +def test_generator_reproduces_committed_manifest(tmp_path: Path) -> None: + generated_path = tmp_path / "manifest.json" + generate_mcp_trace_corruption_manifest(generated_path) + assert generated_path.read_text(encoding="utf-8") == MANIFEST_PATH.read_text(encoding="utf-8")