Skip to content

Commit ae63af9

Browse files
authored
feat: add safe PR gate
Adds a minimal deterministic local safe PR gate for AI-assisted work, including branch and dirty-tree checks, optional allowed-prefix scope checks, deterministic JSON output, focused tests, and Gemini review hardening for git parsing and subprocess errors.
1 parent 2fb11d4 commit ae63af9

3 files changed

Lines changed: 354 additions & 0 deletions

File tree

docs/codex_skills/git_pr_workflow.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Use for branch setup, syncing from `main`, local commits, staged-file review, pu
3232
- Before commit: `git status --short`, `git diff --stat`, and `git diff --cached --name-only`.
3333
- Before push: clean `git status --short` and current branch confirmation.
3434
- For PRs: include validation commands actually run; do not claim CI status unless checked.
35+
- For agent-assisted pre-push or pre-PR checks, run `python scripts/safe_pr_gate.py` on the current branch before pushing.
3536

3637
## Stop conditions
3738

scripts/safe_pr_gate.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
"""Deterministic local safety gate for agent-assisted PR workflows."""
2+
3+
from __future__ import annotations
4+
5+
import argparse
6+
import json
7+
import subprocess
8+
import sys
9+
from dataclasses import dataclass
10+
from pathlib import Path
11+
from typing import Any
12+
13+
REPO_ROOT = Path(__file__).resolve().parents[1]
14+
if str(REPO_ROOT) not in sys.path:
15+
sys.path.insert(0, str(REPO_ROOT))
16+
17+
18+
@dataclass(frozen=True, slots=True)
19+
class GateState:
20+
branch: str
21+
status_short: tuple[str, ...]
22+
changed_paths: tuple[str, ...]
23+
24+
25+
@dataclass(frozen=True, slots=True)
26+
class GateResult:
27+
ok: bool
28+
branch: str
29+
allow_dirty: bool
30+
allowed_prefixes: tuple[str, ...]
31+
status_short: tuple[str, ...]
32+
changed_paths: tuple[str, ...]
33+
problems: tuple[str, ...]
34+
35+
def to_dict(self) -> dict[str, Any]:
36+
return {
37+
"allowed_prefixes": list(self.allowed_prefixes),
38+
"allow_dirty": self.allow_dirty,
39+
"branch": self.branch,
40+
"changed_paths": list(self.changed_paths),
41+
"ok": self.ok,
42+
"problems": list(self.problems),
43+
"result": "PASS" if self.ok else "FAIL",
44+
"status_short": list(self.status_short),
45+
}
46+
47+
48+
def _run_git(args: list[str]) -> str:
49+
try:
50+
completed = subprocess.run(
51+
["git", *args],
52+
cwd=REPO_ROOT,
53+
check=True,
54+
capture_output=True,
55+
text=True,
56+
)
57+
except FileNotFoundError as exc:
58+
raise RuntimeError(f"git executable not found while running: git {' '.join(args)}") from exc
59+
except subprocess.CalledProcessError as exc:
60+
raise RuntimeError(
61+
f"git command failed with exit code {exc.returncode}: git {' '.join(args)}"
62+
) from exc
63+
return completed.stdout
64+
65+
66+
def _parse_porcelain_paths(output: str) -> tuple[str, ...]:
67+
if not output:
68+
return ()
69+
70+
entries = output.split("\0")
71+
paths: list[str] = []
72+
index = 0
73+
while index < len(entries):
74+
entry = entries[index]
75+
if not entry:
76+
index += 1
77+
continue
78+
path = entry[3:]
79+
status = entry[:2]
80+
if status and any(char in {"R", "C"} for char in status):
81+
if index + 1 < len(entries) and entries[index + 1]:
82+
paths.append(entries[index + 1])
83+
index += 2
84+
continue
85+
paths.append(path)
86+
index += 1
87+
return tuple(paths)
88+
89+
90+
def collect_gate_state() -> GateState:
91+
branch = _run_git(["branch", "--show-current"]).strip()
92+
status_short_output = _run_git(["status", "--short", "--untracked-files=all"])
93+
porcelain_output = _run_git(["status", "--porcelain=v1", "-z"])
94+
status_short = tuple(line for line in status_short_output.splitlines() if line)
95+
changed_paths = _parse_porcelain_paths(porcelain_output)
96+
return GateState(branch=branch, status_short=status_short, changed_paths=changed_paths)
97+
98+
99+
def _path_in_prefix(path: str, prefix: str) -> bool:
100+
normalized = prefix.rstrip("/")
101+
return path == normalized or path.startswith(normalized + "/")
102+
103+
104+
def evaluate_gate(
105+
state: GateState,
106+
*,
107+
allow_dirty: bool = False,
108+
allowed_prefixes: tuple[str, ...] = (),
109+
) -> GateResult:
110+
problems: list[str] = []
111+
112+
if state.branch == "main":
113+
problems.append("on_main_branch")
114+
115+
if state.status_short and not allow_dirty:
116+
problems.append("dirty_working_tree")
117+
118+
if allowed_prefixes:
119+
disallowed_paths = sorted(
120+
path
121+
for path in state.changed_paths
122+
if not any(_path_in_prefix(path, prefix) for prefix in allowed_prefixes)
123+
)
124+
if disallowed_paths:
125+
problems.append("changed_files_outside_allowed_prefixes")
126+
problems.extend(f"outside_prefix:{path}" for path in disallowed_paths)
127+
128+
return GateResult(
129+
ok=not problems,
130+
branch=state.branch,
131+
allow_dirty=allow_dirty,
132+
allowed_prefixes=tuple(sorted(allowed_prefixes)),
133+
status_short=state.status_short,
134+
changed_paths=state.changed_paths,
135+
problems=tuple(problems),
136+
)
137+
138+
139+
def _parse_args(argv: list[str]) -> argparse.Namespace:
140+
parser = argparse.ArgumentParser(description="Run a deterministic local safety gate for PR-ready agent work.")
141+
parser.add_argument("--allow-dirty", action="store_true", help="Permit a dirty working tree while still checking allowed prefixes.")
142+
parser.add_argument(
143+
"--allowed-prefix",
144+
action="append",
145+
default=[],
146+
help="Require changed paths to stay within this repo-relative prefix. May be repeated.",
147+
)
148+
return parser.parse_args(argv)
149+
150+
151+
def _error_response(exc: RuntimeError) -> dict[str, Any]:
152+
return {
153+
"error": {
154+
"message": str(exc),
155+
"type": exc.__class__.__name__,
156+
},
157+
"ok": False,
158+
"result": "ERROR",
159+
}
160+
161+
162+
def main(argv: list[str] | None = None) -> int:
163+
args = _parse_args(sys.argv[1:] if argv is None else argv)
164+
try:
165+
state = collect_gate_state()
166+
result = evaluate_gate(
167+
state,
168+
allow_dirty=args.allow_dirty,
169+
allowed_prefixes=tuple(args.allowed_prefix),
170+
)
171+
sys.stdout.write(json.dumps(result.to_dict(), indent=2, sort_keys=True) + "\n")
172+
return 0 if result.ok else 1
173+
except RuntimeError as exc:
174+
sys.stdout.write(json.dumps(_error_response(exc), indent=2, sort_keys=True) + "\n")
175+
return 1
176+
177+
178+
if __name__ == "__main__":
179+
raise SystemExit(main())

tests/test_safe_pr_gate.py

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
from __future__ import annotations
2+
3+
import json
4+
import shutil
5+
import subprocess
6+
import sys
7+
import tempfile
8+
from contextlib import contextmanager
9+
from pathlib import Path
10+
11+
import pytest
12+
13+
import scripts.safe_pr_gate as safe_pr_gate
14+
from scripts.safe_pr_gate import GateState, _parse_porcelain_paths, evaluate_gate
15+
16+
17+
REPO_ROOT = Path(__file__).resolve().parents[1]
18+
19+
20+
def _run_gate(*args: str, cwd: Path | None = None) -> subprocess.CompletedProcess[str]:
21+
return subprocess.run(
22+
[sys.executable, "scripts/safe_pr_gate.py", *args],
23+
check=False,
24+
capture_output=True,
25+
text=True,
26+
cwd=cwd,
27+
)
28+
29+
30+
@contextmanager
31+
def _temporary_git_repo(branch: str):
32+
with tempfile.TemporaryDirectory() as temp_dir:
33+
repo_root = Path(temp_dir)
34+
scripts_dir = repo_root / "scripts"
35+
scripts_dir.mkdir()
36+
shutil.copy2(REPO_ROOT / "scripts" / "safe_pr_gate.py", scripts_dir / "safe_pr_gate.py")
37+
subprocess.run(["git", "init"], cwd=repo_root, check=True, capture_output=True, text=True)
38+
subprocess.run(["git", "checkout", "-b", branch], cwd=repo_root, check=True, capture_output=True, text=True)
39+
yield repo_root
40+
41+
42+
def test_evaluate_gate_passes_on_clean_feature_branch_state() -> None:
43+
result = evaluate_gate(
44+
GateState(branch="feat/safe-pr-gate", status_short=(), changed_paths=()),
45+
allowed_prefixes=("scripts/",),
46+
)
47+
48+
assert result.ok is True
49+
assert result.problems == ()
50+
assert result.to_dict() == {
51+
"allowed_prefixes": ["scripts/"],
52+
"allow_dirty": False,
53+
"branch": "feat/safe-pr-gate",
54+
"changed_paths": [],
55+
"ok": True,
56+
"problems": [],
57+
"result": "PASS",
58+
"status_short": [],
59+
}
60+
61+
62+
def test_evaluate_gate_fails_on_main_branch() -> None:
63+
result = evaluate_gate(GateState(branch="main", status_short=(), changed_paths=()))
64+
65+
assert result.ok is False
66+
assert result.problems == ("on_main_branch",)
67+
68+
69+
def test_evaluate_gate_allows_detached_head_state() -> None:
70+
result = evaluate_gate(GateState(branch="", status_short=(), changed_paths=()))
71+
72+
assert result.ok is True
73+
assert result.problems == ()
74+
75+
76+
def test_evaluate_gate_fails_on_dirty_tree_without_allow_dirty() -> None:
77+
result = evaluate_gate(
78+
GateState(branch="feat/safe-pr-gate", status_short=(" M scripts/example.py",), changed_paths=("scripts/example.py",))
79+
)
80+
81+
assert result.ok is False
82+
assert result.problems == ("dirty_working_tree",)
83+
84+
85+
def test_evaluate_gate_flags_paths_outside_allowed_prefixes() -> None:
86+
result = evaluate_gate(
87+
GateState(branch="feat/safe-pr-gate", status_short=(" M docs/example.md",), changed_paths=("docs/example.md",)),
88+
allow_dirty=True,
89+
allowed_prefixes=("scripts/",),
90+
)
91+
92+
assert result.ok is False
93+
assert result.problems == ("changed_files_outside_allowed_prefixes", "outside_prefix:docs/example.md")
94+
95+
96+
def test_parse_porcelain_paths_handles_rename_status_in_second_position() -> None:
97+
assert _parse_porcelain_paths(" R old-name.txt\0new-name.txt\0") == ("new-name.txt",)
98+
99+
100+
@pytest.mark.parametrize(
101+
("raised", "expected_message"),
102+
[
103+
(subprocess.CalledProcessError(1, ["git", "status", "--short"]), "git command failed with exit code 1: git status --short"),
104+
(FileNotFoundError(), "git executable not found while running: git status --short"),
105+
],
106+
)
107+
def test_run_git_wraps_git_subprocess_failures(monkeypatch: pytest.MonkeyPatch, raised: BaseException, expected_message: str) -> None:
108+
def fake_run(*args: object, **kwargs: object) -> subprocess.CompletedProcess[str]:
109+
raise raised
110+
111+
monkeypatch.setattr(safe_pr_gate.subprocess, "run", fake_run)
112+
113+
with pytest.raises(RuntimeError, match=expected_message):
114+
safe_pr_gate._run_git(["status", "--short"])
115+
116+
117+
def test_main_reports_deterministic_error_json_on_git_failure(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
118+
monkeypatch.setattr(safe_pr_gate, "collect_gate_state", lambda: (_ for _ in ()).throw(RuntimeError("git command failed with exit code 1: git status --short")))
119+
120+
exit_code = safe_pr_gate.main([])
121+
output = json.loads(capsys.readouterr().out)
122+
123+
assert exit_code == 1
124+
assert output == {
125+
"error": {
126+
"message": "git command failed with exit code 1: git status --short",
127+
"type": "RuntimeError",
128+
},
129+
"ok": False,
130+
"result": "ERROR",
131+
}
132+
133+
134+
def test_cli_pass_and_fail_outputs_are_deterministic() -> None:
135+
with _temporary_git_repo("feat/safe-pr-gate") as repo_root:
136+
passing = _run_gate(
137+
"--allow-dirty",
138+
"--allowed-prefix",
139+
"docs/",
140+
"--allowed-prefix",
141+
"scripts/",
142+
"--allowed-prefix",
143+
"tests/",
144+
cwd=repo_root,
145+
)
146+
dirty_path = repo_root / "_safe_pr_gate_dirty_test.tmp"
147+
dirty_path.write_text("dirty\n", encoding="utf-8")
148+
try:
149+
failing = _run_gate(
150+
"--allow-dirty",
151+
"--allowed-prefix",
152+
"docs/",
153+
"--allowed-prefix",
154+
"scripts/",
155+
"--allowed-prefix",
156+
"tests/",
157+
cwd=repo_root,
158+
)
159+
finally:
160+
dirty_path.unlink(missing_ok=True)
161+
162+
assert passing.returncode == 0
163+
assert failing.returncode == 1
164+
165+
passing_payload = json.loads(passing.stdout)
166+
failing_payload = json.loads(failing.stdout)
167+
168+
assert passing_payload["result"] == "PASS"
169+
assert passing_payload["allow_dirty"] is True
170+
assert failing_payload["result"] == "FAIL"
171+
assert failing_payload["problems"] == [
172+
"changed_files_outside_allowed_prefixes",
173+
"outside_prefix:_safe_pr_gate_dirty_test.tmp",
174+
]

0 commit comments

Comments
 (0)