diff --git a/tests/test_mcp_trace_corruption_validation.py b/tests/test_mcp_trace_corruption_validation.py new file mode 100644 index 0000000..834fc5d --- /dev/null +++ b/tests/test_mcp_trace_corruption_validation.py @@ -0,0 +1,196 @@ +from __future__ import annotations + +import filecmp +import json +from pathlib import Path + +from src.validation.failure_taxonomy import FAILURE_TAXONOMY + +REPO_ROOT = Path(__file__).resolve().parents[1] +MANIFEST_PATH = REPO_ROOT / "artifacts" / "mcp_trace_corruption_manifest.json" +MATERIALIZED_ROOT = REPO_ROOT / "fixtures" / "mcp_trace_replay_corruptions" +REQUIRED_FILES = ("trace.json", "dependency_graph.json", "state.json") +TARGET_OPERATORS = { + "DROP_APPROVAL_GATE": "drop_approval_gate", + "REMOVE_DEPENDENCY_EDGE": "remove_dependency_edge", + "TRUNCATE_RECOVERY_PATH": "truncate_recovery_path", +} + + +def _repo_relative(path: Path) -> str: + return path.relative_to(REPO_ROOT).as_posix() + + +def _load_json(path: Path) -> dict[str, object]: + repo_rel = _repo_relative(path) + try: + data = json.loads(path.read_text(encoding="utf-8")) + except FileNotFoundError as exc: + raise RuntimeError(f"Required JSON file is missing: {repo_rel}") from exc + except json.JSONDecodeError as exc: + raise RuntimeError(f"Invalid JSON in {repo_rel}: {exc}") from exc + + if not isinstance(data, dict): + raise RuntimeError(f"expected JSON object at root: {repo_rel}") + return data + + +def _as_list(value: object, *, field: str, path: Path) -> list[object]: + if value is None: + return [] + if isinstance(value, list): + return value + raise RuntimeError(f"expected list for '{field}' in {_repo_relative(path)}, got {type(value).__name__}") + + +def _split_corruption_id(corruption_id: str) -> tuple[str, str]: + parts = corruption_id.split("::", maxsplit=1) + assert len(parts) == 2 and parts[0] and parts[1], f"invalid corruption_id: {corruption_id}" + return parts[0], parts[1] + + +def _materialized_entries() -> list[dict[str, object]]: + manifest = _load_json(MANIFEST_PATH) + entries = _as_list(manifest.get("corruptions"), field="corruptions", path=MANIFEST_PATH) + return [entry for entry in entries if isinstance(entry, dict) and entry.get("operator") in TARGET_OPERATORS] + + +def _has_boundary(state_doc: dict[str, object], source: str, target: str, *, path: Path) -> bool: + boundaries = _as_list(state_doc.get("capability_boundaries"), field="capability_boundaries", path=path) + return [source, target] in boundaries + + +def _has_edge(graph_doc: dict[str, object], source: str, target: str, *, path: Path) -> bool: + edges = _as_list(graph_doc.get("edges"), field="edges", path=path) + for edge in edges: + assert isinstance(edge, dict), f"expected object edge in {_repo_relative(path)}" + if edge.get("source") == source and edge.get("target") == target: + return True + return False + + +def _terminal_action(trace_doc: dict[str, object], *, path: Path) -> str | None: + events = _as_list(trace_doc.get("events"), field="events", path=path) + if not events: + return None + last = events[-1] + assert isinstance(last, dict), f"expected object event in {_repo_relative(path)}" + action = last.get("action") + assert isinstance(action, str), f"expected terminal event action string in {_repo_relative(path)}" + return action + + +def test_materialized_manifest_identity_and_required_files() -> None: + entries = _materialized_entries() + assert entries, "no target operator entries found in manifest" + assert len(entries) == 9 + + for entry in entries: + corruption_id = entry["corruption_id"] + source_fixture = entry["source_fixture"] + operator = entry["operator"] + expected_label = entry["expected_failure_label"] + + source_slug, operator_slug = _split_corruption_id(corruption_id) + assert source_slug == Path(source_fixture).name + assert operator_slug == TARGET_OPERATORS[operator] + + source_original = REPO_ROOT / source_fixture / "original" + materialized_dir = MATERIALIZED_ROOT / source_slug / operator_slug + assert source_original.exists(), f"missing source fixture: {_repo_relative(source_original)}" + assert materialized_dir.exists(), f"missing materialized fixture: {_repo_relative(materialized_dir)}" + + for name in REQUIRED_FILES: + assert (source_original / name).exists(), f"missing source file: {_repo_relative(source_original / name)}" + assert (materialized_dir / name).exists(), f"missing materialized file: {_repo_relative(materialized_dir / name)}" + + assert expected_label in FAILURE_TAXONOMY + + +def test_drop_approval_gate_structural_violation_and_unchanged_non_targets() -> None: + for entry in _materialized_entries(): + if entry["operator"] != "DROP_APPROVAL_GATE": + continue + + source_slug, operator_slug = _split_corruption_id(entry["corruption_id"]) + source_original = REPO_ROOT / entry["source_fixture"] / "original" + materialized_dir = MATERIALIZED_ROOT / source_slug / operator_slug + + source_state_path = source_original / "state.json" + materialized_state_path = materialized_dir / "state.json" + source_state = _load_json(source_state_path) + materialized_state = _load_json(materialized_state_path) + + assert _has_boundary(source_state, "human_approval", "execute_external_action", path=source_state_path) + assert not _has_boundary( + materialized_state, + "human_approval", + "execute_external_action", + path=materialized_state_path, + ) + + assert filecmp.cmp(source_original / "trace.json", materialized_dir / "trace.json", shallow=False) + assert filecmp.cmp( + source_original / "dependency_graph.json", + materialized_dir / "dependency_graph.json", + shallow=False, + ) + + +def test_remove_dependency_edge_structural_violation_and_unchanged_non_targets() -> None: + for entry in _materialized_entries(): + if entry["operator"] != "REMOVE_DEPENDENCY_EDGE": + continue + + source_slug, operator_slug = _split_corruption_id(entry["corruption_id"]) + source_original = REPO_ROOT / entry["source_fixture"] / "original" + materialized_dir = MATERIALIZED_ROOT / source_slug / operator_slug + + source_graph_path = source_original / "dependency_graph.json" + materialized_graph_path = materialized_dir / "dependency_graph.json" + source_graph = _load_json(source_graph_path) + materialized_graph = _load_json(materialized_graph_path) + + assert _has_edge(source_graph, "read_context", "validate_external_action", path=source_graph_path) + assert not _has_edge( + materialized_graph, + "read_context", + "validate_external_action", + path=materialized_graph_path, + ) + + assert filecmp.cmp(source_original / "trace.json", materialized_dir / "trace.json", shallow=False) + assert filecmp.cmp(source_original / "state.json", materialized_dir / "state.json", shallow=False) + + +def test_truncate_recovery_path_structural_violation_and_unchanged_non_targets() -> None: + for entry in _materialized_entries(): + if entry["operator"] != "TRUNCATE_RECOVERY_PATH": + continue + + source_slug, operator_slug = _split_corruption_id(entry["corruption_id"]) + source_original = REPO_ROOT / entry["source_fixture"] / "original" + materialized_dir = MATERIALIZED_ROOT / source_slug / operator_slug + + source_trace_path = source_original / "trace.json" + materialized_trace_path = materialized_dir / "trace.json" + source_trace = _load_json(source_trace_path) + materialized_trace = _load_json(materialized_trace_path) + + assert _terminal_action(source_trace, path=source_trace_path) == "recovery_path_registered" + assert _terminal_action(materialized_trace, path=materialized_trace_path) != "recovery_path_registered" + + source_events = _as_list(source_trace.get("events"), field="events", path=source_trace_path) + materialized_events = _as_list( + materialized_trace.get("events"), + field="events", + path=materialized_trace_path, + ) + assert len(materialized_events) == len(source_events) - 1 + + assert filecmp.cmp( + source_original / "dependency_graph.json", + materialized_dir / "dependency_graph.json", + shallow=False, + ) + assert filecmp.cmp(source_original / "state.json", materialized_dir / "state.json", shallow=False)