-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfetch_results.py
More file actions
78 lines (63 loc) · 2.83 KB
/
Copy pathfetch_results.py
File metadata and controls
78 lines (63 loc) · 2.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""Mac-bridge poller — wait for and fetch a request branch's results.
Read-only on the GitHub side (uses ``gh run list`` / ``gh run view``,
both view operations) plus plain ``git fetch`` for the result commit
the runner pushes back. Suitable for Cursor cloud agents, whose ``gh``
is restricted to read-only operations.
Usage:
python3 scripts/mac_bridge/fetch_results.py --branch mac-bridge/<name>
python3 scripts/mac_bridge/fetch_results.py --branch ... --wait 1800
CLI plumbing; exempt from unit-test coverage by the scripts/serve.py
convention.
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
import time
WORKFLOW = "mac-bridge.yaml"
def _run(argv, capture=True):
return subprocess.run(argv, check=False, text=True,
stdout=subprocess.PIPE if capture else None,
stderr=subprocess.STDOUT if capture else None)
def _latest_run(branch: str):
proc = _run(["gh", "run", "list", "--workflow", WORKFLOW,
"--branch", branch, "--limit", "1",
"--json", "databaseId,status,conclusion,url"])
if proc.returncode != 0 or not proc.stdout.strip():
return None
runs = json.loads(proc.stdout)
return runs[0] if runs else None
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--branch", required=True)
ap.add_argument("--remote", default="origin")
ap.add_argument("--wait", type=int, default=0,
help="Seconds to keep polling (0 = single check).")
ap.add_argument("--poll-interval", type=float, default=30.0)
args = ap.parse_args()
deadline = time.time() + args.wait
while True:
run = _latest_run(args.branch)
if run is None:
print(f"[mac-bridge] no {WORKFLOW} run for {args.branch} yet",
file=sys.stderr)
else:
print(f"[mac-bridge] run {run['databaseId']}: "
f"status={run['status']} conclusion={run['conclusion'] or '-'} "
f"{run['url']}", file=sys.stderr)
if run["status"] == "completed":
# Pull the result commit the runner pushed back.
subprocess.run(["git", "fetch", args.remote, args.branch],
check=False)
print(f"[mac-bridge] results (if any) are on "
f"{args.remote}/{args.branch} under .mac-bridge/logs/ "
f"and results/research/; inspect with:\n"
f" git show {args.remote}/{args.branch} --stat",
file=sys.stderr)
return 0 if run["conclusion"] == "success" else 1
if time.time() >= deadline:
return 3 if run is None else 2
time.sleep(args.poll_interval)
if __name__ == "__main__":
sys.exit(main())