Skip to content

Commit 413b6f4

Browse files
authored
feat: add agent artifact bundle validator
Adds a minimal deterministic validator CLI for agent artifact bundles with required field checks, ok/result consistency, safe_pr_gate status validation, conservative timestamp/random-id rejection, and focused tests.
1 parent 7e2cdbe commit 413b6f4

2 files changed

Lines changed: 355 additions & 0 deletions

File tree

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
#!/usr/bin/env python3
2+
"""Validate deterministic agent artifact bundle payloads."""
3+
4+
from __future__ import annotations
5+
6+
import argparse
7+
import json
8+
import re
9+
import sys
10+
from pathlib import Path
11+
from typing import Any
12+
13+
REPO_ROOT = Path(__file__).resolve().parents[1]
14+
DEFAULT_BUNDLE_PATH = REPO_ROOT / "artifacts" / "agent_artifact_bundle_example.json"
15+
16+
REQUIRED_BUNDLE_FIELDS = (
17+
"ok",
18+
"result",
19+
"branch",
20+
"changed_files",
21+
"safe_pr_gate",
22+
"validation_evidence",
23+
)
24+
REQUIRED_SAFE_GATE_FIELDS = (
25+
"allow_dirty",
26+
"allowed_prefixes",
27+
"branch",
28+
"changed_paths",
29+
"ok",
30+
"problems",
31+
"result",
32+
"status_short",
33+
)
34+
DISALLOWED_TIME_KEYS = {
35+
"timestamp",
36+
"generated_at",
37+
"created_at",
38+
"updated_at",
39+
"completed_at",
40+
"requested_at",
41+
}
42+
DISALLOWED_RANDOM_ID_KEYS = {
43+
"generated_id",
44+
"random_id",
45+
"request_id",
46+
"run_id",
47+
"uuid",
48+
}
49+
UUID_PATTERN = re.compile(
50+
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}$"
51+
)
52+
53+
54+
def _relative(path: Path) -> str:
55+
try:
56+
return path.resolve().relative_to(REPO_ROOT).as_posix()
57+
except ValueError:
58+
return path.as_posix()
59+
60+
61+
def _load_json_object(path: Path) -> dict[str, Any]:
62+
try:
63+
payload = json.loads(path.read_text(encoding="utf-8"))
64+
except FileNotFoundError as exc:
65+
raise RuntimeError(f"missing bundle file: {_relative(path)}") from exc
66+
except json.JSONDecodeError as exc:
67+
raise RuntimeError(f"invalid JSON in bundle file: {_relative(path)}") from exc
68+
if not isinstance(payload, dict):
69+
raise RuntimeError(f"bundle file must contain a JSON object: {_relative(path)}")
70+
return payload
71+
72+
73+
def _is_string_list(value: object) -> bool:
74+
return isinstance(value, list) and all(isinstance(item, str) for item in value)
75+
76+
77+
def _expected_result(ok: bool) -> str:
78+
return "PASS" if ok else "FAIL"
79+
80+
81+
def _bundle_from_payload(payload: dict[str, Any]) -> tuple[dict[str, Any] | None, list[str]]:
82+
bundle = payload.get("bundle", payload)
83+
if isinstance(bundle, dict):
84+
return bundle, []
85+
return None, ["bundle must be a JSON object"]
86+
87+
88+
def _scan_for_nondeterministic_fields(value: object, path: str = "$") -> list[str]:
89+
issues: list[str] = []
90+
if isinstance(value, dict):
91+
for key, child in value.items():
92+
key_path = f"{path}.{key}"
93+
normalized = key.lower()
94+
if normalized in DISALLOWED_TIME_KEYS:
95+
issues.append(f"{key_path}: timestamp-like field is not allowed")
96+
if normalized in DISALLOWED_RANDOM_ID_KEYS:
97+
issues.append(f"{key_path}: random-looking generated id field is not allowed")
98+
issues.extend(_scan_for_nondeterministic_fields(child, key_path))
99+
elif isinstance(value, list):
100+
for index, child in enumerate(value):
101+
issues.extend(_scan_for_nondeterministic_fields(child, f"{path}[{index}]"))
102+
elif isinstance(value, str) and UUID_PATTERN.fullmatch(value):
103+
issues.append(f"{path}: UUID-like value is not allowed")
104+
return issues
105+
106+
107+
def validate_bundle_payload(payload: dict[str, Any]) -> dict[str, Any]:
108+
issues: list[str] = []
109+
issues.extend(_scan_for_nondeterministic_fields(payload))
110+
111+
bundle, bundle_issues = _bundle_from_payload(payload)
112+
issues.extend(bundle_issues)
113+
if bundle is None:
114+
return {"issues": sorted(issues), "ok": False, "result": "FAIL"}
115+
116+
for field in REQUIRED_BUNDLE_FIELDS:
117+
if field not in bundle:
118+
issues.append(f"bundle missing required field: {field}")
119+
120+
ok = bundle.get("ok")
121+
result = bundle.get("result")
122+
branch = bundle.get("branch")
123+
changed_files = bundle.get("changed_files")
124+
safe_pr_gate = bundle.get("safe_pr_gate")
125+
validation_evidence = bundle.get("validation_evidence")
126+
127+
if not isinstance(ok, bool):
128+
issues.append("bundle.ok must be a boolean")
129+
if not isinstance(result, str):
130+
issues.append("bundle.result must be a string")
131+
if isinstance(ok, bool) and isinstance(result, str) and result != _expected_result(ok):
132+
issues.append("bundle.result must match bundle.ok")
133+
if not isinstance(branch, str):
134+
issues.append("bundle.branch must be a string")
135+
if not _is_string_list(changed_files):
136+
issues.append("bundle.changed_files must be a list of strings")
137+
138+
if not isinstance(safe_pr_gate, dict):
139+
issues.append("bundle.safe_pr_gate must be a JSON object")
140+
else:
141+
issues.extend(_validate_safe_pr_gate(safe_pr_gate, ok))
142+
143+
if not isinstance(validation_evidence, list):
144+
issues.append("bundle.validation_evidence must be a list")
145+
else:
146+
issues.extend(_validate_validation_evidence(validation_evidence))
147+
148+
return {"issues": sorted(issues), "ok": not issues, "result": "PASS" if not issues else "FAIL"}
149+
150+
151+
def _validate_safe_pr_gate(safe_pr_gate: dict[str, Any], bundle_ok: object) -> list[str]:
152+
issues: list[str] = []
153+
for field in REQUIRED_SAFE_GATE_FIELDS:
154+
if field not in safe_pr_gate:
155+
issues.append(f"bundle.safe_pr_gate missing required field: {field}")
156+
157+
gate_ok = safe_pr_gate.get("ok")
158+
gate_result = safe_pr_gate.get("result")
159+
if not isinstance(gate_ok, bool):
160+
issues.append("bundle.safe_pr_gate.ok must be a boolean")
161+
if not isinstance(gate_result, str):
162+
issues.append("bundle.safe_pr_gate.result must be a string")
163+
if isinstance(gate_ok, bool) and isinstance(gate_result, str) and gate_result != _expected_result(gate_ok):
164+
issues.append("bundle.safe_pr_gate.result must match bundle.safe_pr_gate.ok")
165+
if isinstance(bundle_ok, bool) and isinstance(gate_ok, bool) and bundle_ok != gate_ok:
166+
issues.append("bundle.ok must match bundle.safe_pr_gate.ok")
167+
if not isinstance(safe_pr_gate.get("allow_dirty"), bool):
168+
issues.append("bundle.safe_pr_gate.allow_dirty must be a boolean")
169+
if not isinstance(safe_pr_gate.get("branch"), str):
170+
issues.append("bundle.safe_pr_gate.branch must be a string")
171+
for field in ("allowed_prefixes", "changed_paths", "problems", "status_short"):
172+
if not _is_string_list(safe_pr_gate.get(field)):
173+
issues.append(f"bundle.safe_pr_gate.{field} must be a list of strings")
174+
return issues
175+
176+
177+
def _validate_validation_evidence(validation_evidence: list[object]) -> list[str]:
178+
issues: list[str] = []
179+
for index, entry in enumerate(validation_evidence):
180+
if not isinstance(entry, dict):
181+
issues.append(f"bundle.validation_evidence[{index}] must be a JSON object")
182+
continue
183+
if not isinstance(entry.get("command"), str):
184+
issues.append(f"bundle.validation_evidence[{index}].command must be a string")
185+
if not isinstance(entry.get("result"), str):
186+
issues.append(f"bundle.validation_evidence[{index}].result must be a string")
187+
return issues
188+
189+
190+
def validate_bundle_file(path: Path) -> dict[str, Any]:
191+
payload = _load_json_object(path)
192+
result = validate_bundle_payload(payload)
193+
return {
194+
"bundle": _relative(path),
195+
"issues": result["issues"],
196+
"ok": result["ok"],
197+
"result": result["result"],
198+
}
199+
200+
201+
def _parse_args(argv: list[str]) -> argparse.Namespace:
202+
parser = argparse.ArgumentParser(description="Validate a deterministic agent artifact bundle.")
203+
parser.add_argument("--bundle", type=Path, default=DEFAULT_BUNDLE_PATH, help="Bundle JSON path.")
204+
return parser.parse_args(argv)
205+
206+
207+
def main(argv: list[str] | None = None) -> int:
208+
args = _parse_args(sys.argv[1:] if argv is None else argv)
209+
try:
210+
result = validate_bundle_file(args.bundle)
211+
except RuntimeError as exc:
212+
result = {
213+
"error": {
214+
"message": str(exc),
215+
"type": exc.__class__.__name__,
216+
},
217+
"ok": False,
218+
"result": "ERROR",
219+
}
220+
sys.stdout.write(json.dumps(result, indent=2, sort_keys=True) + "\n")
221+
return 0 if result["ok"] else 1
222+
223+
224+
if __name__ == "__main__":
225+
raise SystemExit(main())
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
from __future__ import annotations
2+
3+
import json
4+
from pathlib import Path
5+
6+
import scripts.validate_agent_artifact_bundle as validator
7+
8+
ARTIFACT_PATH = Path("artifacts/agent_artifact_bundle_example.json")
9+
10+
11+
def _valid_bundle() -> dict[str, object]:
12+
return {
13+
"branch": "feat/example",
14+
"changed_files": ["scripts/example.py"],
15+
"ok": True,
16+
"result": "PASS",
17+
"safe_pr_gate": {
18+
"allow_dirty": False,
19+
"allowed_prefixes": [],
20+
"branch": "feat/example",
21+
"changed_paths": ["scripts/example.py"],
22+
"ok": True,
23+
"problems": [],
24+
"result": "PASS",
25+
"status_short": [],
26+
},
27+
"validation_evidence": [
28+
{
29+
"command": "python -m compileall -q scripts/example.py",
30+
"result": "pass",
31+
}
32+
],
33+
}
34+
35+
36+
def test_committed_agent_artifact_bundle_example_is_valid() -> None:
37+
result = validator.validate_bundle_file(ARTIFACT_PATH)
38+
39+
assert result == {
40+
"bundle": "artifacts/agent_artifact_bundle_example.json",
41+
"issues": [],
42+
"ok": True,
43+
"result": "PASS",
44+
}
45+
46+
47+
def test_validator_accepts_raw_bundle_payload() -> None:
48+
result = validator.validate_bundle_payload(_valid_bundle())
49+
50+
assert result == {"issues": [], "ok": True, "result": "PASS"}
51+
52+
53+
def test_validator_rejects_result_that_does_not_match_ok() -> None:
54+
bundle = _valid_bundle()
55+
bundle["result"] = "FAIL"
56+
57+
result = validator.validate_bundle_payload(bundle)
58+
59+
assert result["ok"] is False
60+
assert result["result"] == "FAIL"
61+
assert result["issues"] == ["bundle.result must match bundle.ok"]
62+
63+
64+
def test_validator_rejects_safe_gate_status_mismatch() -> None:
65+
bundle = _valid_bundle()
66+
safe_pr_gate = bundle["safe_pr_gate"]
67+
assert isinstance(safe_pr_gate, dict)
68+
safe_pr_gate["ok"] = False
69+
safe_pr_gate["result"] = "FAIL"
70+
71+
result = validator.validate_bundle_payload(bundle)
72+
73+
assert result["ok"] is False
74+
assert result["issues"] == ["bundle.ok must match bundle.safe_pr_gate.ok"]
75+
76+
77+
def test_validator_rejects_timestamp_and_random_id_fields() -> None:
78+
bundle = _valid_bundle()
79+
bundle["generated_at"] = "2026-01-01T00:00:00Z"
80+
bundle["run_id"] = "gha:123"
81+
bundle["uuid"] = "123e4567-e89b-12d3-a456-426614174000"
82+
83+
result = validator.validate_bundle_payload(bundle)
84+
85+
assert result["ok"] is False
86+
assert result["issues"] == [
87+
"$.generated_at: timestamp-like field is not allowed",
88+
"$.run_id: random-looking generated id field is not allowed",
89+
"$.uuid: UUID-like value is not allowed",
90+
"$.uuid: random-looking generated id field is not allowed",
91+
]
92+
93+
94+
def test_validator_rejects_missing_safe_gate_deterministic_field() -> None:
95+
bundle = _valid_bundle()
96+
safe_pr_gate = bundle["safe_pr_gate"]
97+
assert isinstance(safe_pr_gate, dict)
98+
del safe_pr_gate["status_short"]
99+
100+
result = validator.validate_bundle_payload(bundle)
101+
102+
assert result["ok"] is False
103+
assert result["issues"] == [
104+
"bundle.safe_pr_gate missing required field: status_short",
105+
"bundle.safe_pr_gate.status_short must be a list of strings",
106+
]
107+
108+
109+
def test_cli_outputs_deterministic_json_for_invalid_bundle(tmp_path: Path, capsys) -> None:
110+
invalid_path = tmp_path / "invalid_bundle.json"
111+
invalid_path.write_text(json.dumps({"bundle": {"ok": True, "result": "FAIL"}}, sort_keys=True), encoding="utf-8")
112+
113+
exit_code = validator.main(["--bundle", str(invalid_path)])
114+
output = json.loads(capsys.readouterr().out)
115+
116+
assert exit_code == 1
117+
assert output["bundle"] == invalid_path.as_posix()
118+
assert output["ok"] is False
119+
assert output["result"] == "FAIL"
120+
assert output["issues"] == [
121+
"bundle missing required field: branch",
122+
"bundle missing required field: changed_files",
123+
"bundle missing required field: safe_pr_gate",
124+
"bundle missing required field: validation_evidence",
125+
"bundle.branch must be a string",
126+
"bundle.changed_files must be a list of strings",
127+
"bundle.result must match bundle.ok",
128+
"bundle.safe_pr_gate must be a JSON object",
129+
"bundle.validation_evidence must be a list",
130+
]

0 commit comments

Comments
 (0)