-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathloop.py
More file actions
1288 lines (1213 loc) · 75.8 KB
/
Copy pathloop.py
File metadata and controls
1288 lines (1213 loc) · 75.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""The orchestration loop — the deterministic state machine around the spine.
A background surface (ADR 0018 ``register_surface``) that pulls ``ready`` features
and drives each: worktree → coder → PR → review. It is the ONLY thing that moves a
feature forward through the build states; ``done`` it never sets — that is the
merge webhook (``api.record_merge``), the single external edge (invariant #2).
ready ──claim──▶ in_progress
worktree add → delegate_to(coder) in worktree → push + gh pr create
│ [finally: reap coder subprocess]
└──▶ in_review ──delegate_to(reviewer)──▶ (CI + review on the PR)
│
merge webhook ▼ CI fail ▼ any failure ▼
/merge poll in_progress (bounce) blocked (flag + reason)
done
CI status arrives out-of-band via the board API (``api.py``). ``done`` is set by
the merge webhook (``api.record_merge``) — or, when no public webhook URL is
reachable, by the loop's **PR reconcile** (``merge_poll``), which asks ``gh`` for
each ``in_review`` PR's state and drives the terminal edges: merged → done (the same
idempotent edge), closed-unmerged → blocked. Up to ``max_concurrent`` features build
concurrently, each in its own worktree.
**coder.solve() board seam (ADR 0064 P2, opt-in, see ``coder_seam.py``).** On a
fresh build (not a keep-worktree/CI-bounce re-dispatch), when the `coder` plugin is
importable AND the feature has acceptance criteria AND a runnable acceptance-test
command is configured, ``delegate_to(coder)`` is replaced by
``coder_seam.dispatch()`` — an execution-grounded ladder (greedy → best-of-k →
tree-search) that runs the feature's acceptance tests in real candidate worktrees
and gates on them actually PASSING, never an LLM judge. It composes WITH the
`coders`-map tier ladder below (search happens WITHIN a tier; a search that never
passes is a capability failure that escalates/blocks exactly like a no-diff
dispatch). Missing any of the three gates ⇒ honest degrade to the single shot above.
"""
from __future__ import annotations
import asyncio
import logging
import re
import time
from . import coder_seam, worktree
from .failures import classify
from .store import BoardError, escalation_enabled, get_store
log = logging.getLogger("protoagent.plugins.project_board")
# Deterministic test-coverage gate (path-based — no LLM, no diff). A code change must
# ship a test; checking the changed-file LIST is instant and immune to the truncation
# that made the old LLM-eyeballs-the-diff verifier false-reject tests it couldn't see.
_TEST_PATH_RE = re.compile(r"(^|/)tests?/|(^|/)(test_[^/]+|conftest)\.py$|(^|/)[^/]+_test\.py$|\.(test|spec)\.[jt]sx?$")
_CODE_EXTS = (".py", ".ts", ".tsx", ".js", ".jsx", ".mjs", ".go", ".rs")
def _is_test_path(p: str) -> bool:
return bool(_TEST_PATH_RE.search(p))
def _is_code_path(p: str) -> bool:
return p.endswith(_CODE_EXTS)
# Error/summary lines worth keeping from a failing CI log — the ones that name the
# ACTUAL failure (pytest's "FAILED … - AssertionError: golden field map …", ruff's
# "F841"/"would reformat", a conflict, version drift) so the attempt comment the retro
# mines is CLASSIFIABLE, not a generic "checks red".
_CI_SIGNAL_RE = re.compile(
r"FAILED|Error|assert|\bF\d{3}\b|reformat|no column|out of sync|conflict|drift|lint-imports", re.I
)
def _ci_failure_reason(summary: str, max_chars: int = 500) -> str:
"""Distill a CI summary into a compact but classifiable failure reason for the
attempt comment (the loop-retro mines these to bucket recurring failures).
The useful signal is NOT the ``Failing checks:`` header — it's the failing check
NAMES plus the tail of the failing log, where pytest/ruff print the real error.
Falls back to the header / ``checks red`` when there's nothing better."""
if not summary:
return "checks red"
checks = [ln[2:].strip() for ln in summary.splitlines() if ln.startswith("- ")]
head = "; ".join(checks) if checks else summary.splitlines()[0].strip()
detail = ""
if "Failing log" in summary:
log = summary.split("Failing log", 1)[1]
errs = [ln.strip() for ln in log.splitlines() if ln.strip() and _CI_SIGNAL_RE.search(ln)]
if errs:
detail = " · ".join(errs[-4:])
else:
tail = [ln.strip() for ln in log.splitlines() if ln.strip()]
detail = tail[-1] if tail else ""
reason = f"{head} — {detail}" if detail else head
return reason[:max_chars]
_MAX_MODE_JUDGE_SYS = (
"You are a strict code reviewer choosing the best of several diffs for the same "
"task. Pick the one that most completely and correctly satisfies the acceptance "
"criteria. Answer with ONLY the candidate number."
)
class BoardLoop:
def __init__(self, cfg: dict):
self.cfg = cfg or {}
self.coder_name = self.cfg.get("coder", "proto")
self.reviewer_name = self.cfg.get("reviewer", "quinn")
# Review dispatch is OPT-IN (default off). The fleet's PR-review pipeline
# already reviews PRs the moment they're opened, so the loop doesn't need to
# `delegate_to(reviewer)` — it just opens the PR and lets the pipeline + CI +
# the merge webhook gate it. Turn this on only for repos NOT covered by a
# PR-review pipeline (then a reachable `reviewer` a2a delegate is required).
self.review_dispatch = bool(self.cfg.get("review_dispatch", False))
# Goal-verification gate (OPT-IN, default off). When on, a DETERMINISTIC pre-PR
# check (no LLM, no diff dump): a code change must ship a test — CI runs tests but
# can't require their presence, so the gate does. A miss → re-dispatch/escalate
# instead of opening a testless PR; correctness itself is CI's job. (Was an
# LLM-eyeballs-the-diff check — it false-rejected tests it couldn't see past the
# diff truncation, burning whole tier ladders on phantom gaps; see _verify_goal.)
self.goal_verify = bool(self.cfg.get("goal_verify", False))
# Max-Mode (MiMo Tier-2, OPT-IN, default 1 = off). When >1, a hard feature is
# attempted with N parallel candidates and `_judge_candidates` picks the best
# diff. Costs N× tokens, so gate it to hard work. The parallel-dispatch wiring
# is tracked in #21; this ships the reusable best-of-N judge it composes.
self.max_mode_n = max(1, int(self.cfg.get("max_mode_n", 1) or 1))
self.interval = float(self.cfg.get("loop_interval_s", 30))
self.root = self.cfg.get("worktrees_root", ".worktrees")
self.enabled = bool(self.cfg.get("loop_enabled", False))
# Escalation is OPT-IN: a `coders` map (tier → delegate name) with ≥2
# distinct delegates. With a single ACP coder there's no ladder — one
# dispatch, then Blocked on failure — so difficulty/tier stay irrelevant
# and we never write redundant tier/attempt labels.
self.coders = {str(k): str(v) for k, v in (self.cfg.get("coders") or {}).items()}
self.escalation_on = escalation_enabled(self.cfg)
# Concurrency: drive up to `max_concurrent` features at once, each in its own
# worktree. 1 (the default) = serial — the safe default for token + merge-
# integration cost; raise it on a repo that parallelizes cleanly.
self.max_concurrent = max(1, int(self.cfg.get("max_concurrent", 1)))
# Review-queue WIP limit: pause new claims when this many PRs already await
# review, so the loop can't pile up PRs faster than they merge (flooding CI /
# reviewers). 0 = unlimited.
self.max_pending_reviews = int(self.cfg.get("max_pending_reviews", 5))
# Dependency gate: "merge" (default) — a dependent waits for every blocker to
# merge (done); "review" — a NON-foundation blocker releases its dependents at
# in_review (more parallelism, at the risk of building on un-merged code).
# Foundation blockers always gate on merge.
self.relaxed_gate = str(self.cfg.get("dep_gate", "merge")).lower() == "review"
# Stuck-drive watchdog: hard cap on a single coder dispatch (the only
# otherwise-unbounded await in a drive — git/gh calls already self-time-out).
# 0 disables it. A timeout reaps the coder subprocess and is a capability
# failure (escalate-or-block), not a transient retry.
self.coder_timeout = float(self.cfg.get("coder_timeout_s", 1800))
# Merge poll: a fallback to the /webhook/pr Done edge for deployments with no
# public webhook URL. On by default (cheap; only probes `in_review` PRs).
self.merge_poll = bool(self.cfg.get("merge_poll", True))
self.merge_poll_interval = float(self.cfg.get("merge_poll_interval_s", 60))
# Health sweep: periodic self-heal (reclaim slots from dead drives, reap
# orphaned worktrees). 0 disables it.
self.sweep_interval = float(self.cfg.get("health_sweep_interval_s", 300))
# CI-feedback edge (closed-loop verify): poll in_review PRs' check-runs and,
# on a FAILING rollup, bounce the feature back to the coder with the failure
# injected as feedback (vs the old open-loop: a red PR sat in_review forever).
# Rides the merge-poll cadence. `ci_fix_max` caps re-dispatches before the
# feature is blocked for human triage (a real bug, not a self-fixable nit).
self.ci_poll = bool(self.cfg.get("ci_poll", self.merge_poll))
self.ci_fix_max = max(0, int(self.cfg.get("ci_fix_max", 2)))
# Auto-rebase a stale/conflicting in_review PR onto base. Parallel PRs branch
# off the SAME base, and the hot-file guard serializes DISPATCH not the branch
# BASE — so each merge re-stales the others (a sibling's change lands in the
# same files). On BEHIND (stale, no conflict) a clean rebase + force-push fixes
# it with NO coder; on DIRTY (a real conflict) the rebase aborts and the coder
# is re-dispatched to re-resolve, bounded by rebase_fix_max. Rides the
# merge-poll cadence; defaults to merge_poll's value.
self.auto_rebase = bool(self.cfg.get("auto_rebase", self.merge_poll))
self.rebase_fix_max = max(0, int(self.cfg.get("rebase_fix_max", 1)))
# Pre-PR goal-verify gap: a rejected diff (e.g. missing tests) is fixable by
# the SAME coder told what's missing — NOT a model-capability failure. So
# carry the gap as feedback + re-dispatch the same tier, bounded by
# `goal_fix_max`, BEFORE escalating/blocking (else a top-tier diff:large
# feature blocks on attempt 1 with no chance to add the tests).
self.goal_fix_max = max(0, int(self.cfg.get("goal_fix_max", 2)))
# Auto-fix command run in the worktree BEFORE opening the PR (e.g.
# "ruff check --fix . && ruff format ."). The coder is edit-only — it can't run
# the repo's linter/formatter, so trivial lint/format nits would otherwise fail
# CI and burn a whole bounce/escalation (bd-2fd: a full opus fix blocked on one
# unused import). Best-effort; CI is still the real gate. Empty = off.
self.format_cmd = str(self.cfg.get("format_cmd", "")).strip()
# Pre-PR LOCAL GATE: the repo's real check command(s) run in the worktree
# AFTER fixups and BEFORE open_pr (e.g. "ruff check . && uv run --no-sync pytest
# tests/ -q"). The coder is edit-only — it can't run the suite — so a failure on
# a knowable fact (a lint nit, a golden-map test, a wrong schema/column, version
# drift) only surfaces in CI, then thrashes the bounce/escalation ladder. Running
# it here hands the SAME coder the actual output to fix in-worktree, so the PR
# opens already-green. Best-effort early filter: if it can't pass within
# local_gate_max same-tier tries, the PR opens anyway (CI + the ci-fix budget
# stay the backstop) — a flaky/misconfigured gate never blocks good work. Empty = off.
self.local_gate_cmd = str(self.cfg.get("local_gate_cmd", "")).strip()
self.local_gate_max = max(0, int(self.cfg.get("local_gate_max", 2)))
self.local_gate_timeout = float(self.cfg.get("local_gate_timeout_s", 600))
self.local_gate_output_chars = max(500, int(self.cfg.get("local_gate_output_chars", 4000)))
# ── coder.solve() board seam (ADR 0064 P2, opt-in) ─────────────────────────
# Route a FRESH build (not a keep-worktree/CI-bounce re-dispatch) through the
# `coder` plugin's execution-grounded solve() ladder (greedy → best-of-k →
# tree-search) instead of a single delegate_to(acp) shot — gated on the
# feature's acceptance tests actually PASSING in a real worktree, never an
# LLM judge. HONEST DEGRADE (coder_seam.should_use_solve): only fires when
# the `coder` plugin is importable (host has it enabled) AND this feature
# carries acceptance_criteria AND a runnable test command is configured
# below — missing any of the three falls back to today's single shot, so an
# existing deployment can't regress just by upgrading. Composes WITH (does
# NOT replace) the coders-map tier ladder: solve() searches within the
# CURRENT tier; a search that never passes raises SolveExhausted, which
# `_drive` treats as the same capability failure as a no-diff dispatch
# (escalate a tier, or block) — the tier ladder still climbs when search
# itself stalls.
#
# Precedence vs. Max-Mode (`max_mode_n>1`, below): coder_solve ONLY preempts
# Max-Mode when Max-Mode itself is off (`max_mode_n<=1`) — see
# `_use_coder_solve`. Without this, a board already running the README's own
# execution-grounded Max-Mode recipe (`max_mode_n>1` + `local_gate_cmd`) would
# silently stop using Max-Mode the moment the separate `coder` plugin became
# importable for any unrelated reason, with zero change to THIS board's own
# config — and unlike Max-Mode's LLM-judge fallback (which always ships a
# best-effort PR), an exhausted solve() ladder blocks the feature outright.
# That's a behavior change an operator must opt into, not inherit for free.
self.coder_solve = bool(self.cfg.get("coder_solve", True))
# The ladder's verifier: the command that runs THIS feature's (coder-
# authored) acceptance tests in a candidate worktree, e.g. "pytest tests/ -q".
# Blank ⇒ falls back to local_gate_cmd (many repos already configure that as
# the real test command); still blank ⇒ no runnable oracle ⇒ honest degrade.
self.coder_solve_test_cmd = str(self.cfg.get("coder_solve_test_cmd", "")).strip() or self.local_gate_cmd
self.coder_solve_test_timeout = float(self.cfg.get("coder_solve_test_timeout_s", 300))
self.coder_solve_budget = max(1, int(self.cfg.get("coder_solve_budget", 6)))
self.coder_solve_k = max(1, int(self.cfg.get("coder_solve_k", 3)))
self.coder_solve_tree_depth = max(0, int(self.cfg.get("coder_solve_tree_depth", 2)))
# Rung 4 (ADR 0064 P3): a richer generator for the HARDEST features — reached
# only after greedy AND best-of-k AND tree-search all fail their tests. Fusion
# can't tool-call (it's a plain completion, not an ACP session), so it's an
# `openai`-type delegate name, resolved per-dispatch in `_drive` (mirroring how
# `coder`/`reviewer` are resolved) — never here, this is just config plumbing.
# Blank ⇒ no fusion rung; the ladder stops at tree-search exactly as before.
self.coder_solve_fusion_delegate = str(self.cfg.get("coder_solve_fusion_delegate", "")).strip()
self.coder_solve_fusion_k = max(1, int(self.cfg.get("coder_solve_fusion_k", 2)))
# Fusion can't tool-call and returns whole-file replacements with no diff —
# a file over this cap risks a silent truncated "complete" rewrite (see
# coder_seam.fusion_viable_for_files). Gated BEFORE dispatch, not after:
# an oversized feature just skips the fusion rung (fusion_delegate=None
# for that dispatch), it never gets to attempt-and-corrupt.
self.coder_solve_fusion_max_file_chars = max(
1, int(self.cfg.get("coder_solve_fusion_max_file_chars", coder_seam.FUSION_MAX_FILE_CHARS_DEFAULT))
)
self.coder_solve_fusion_max_total_chars = max(
1, int(self.cfg.get("coder_solve_fusion_max_total_chars", coder_seam.FUSION_MAX_TOTAL_CHARS_DEFAULT))
)
# KG lessons (the flywheel READ half): before dispatching a coder, query the
# knowledge graph (via graph.sdk) for distilled lessons relevant to THIS feature
# and inject them into the prompt — so the coder heeds this area's known failure
# modes on attempt 1. The loop-retro writes those lessons (domain `loop-lessons`).
# Best-effort: any SDK/store error → no injection (never blocks a build). Off when
# kg_lessons is false or no store is configured.
self.kg_lessons = bool(self.cfg.get("kg_lessons", True))
self.kg_lessons_k = max(1, int(self.cfg.get("kg_lessons_k", 3)))
self.kg_lessons_domain = str(self.cfg.get("kg_lessons_domain", "loop-lessons")).strip()
self._store_kw = dict(
db=self.cfg.get("db_path") or None,
repo=self.cfg.get("repo", "."),
base_branch=self.cfg.get("base_branch", "main"),
)
self._task: asyncio.Task | None = None
self._stop = asyncio.Event()
# The running drive tasks, and the worktrees they hold (fid → (repo, wt,
# branch)) so shutdown can reap any a cancel mid-drive would orphan; the coder
# subprocess itself is reaped by dispatch_coder's finally.
self._drives: set[asyncio.Task] = set()
self._inflight: dict[str, tuple[str, str, str]] = {}
# files_to_modify of each in-flight feature, for the hot-file overlap guard
# (don't run two parallel coders that edit the same file → sure conflict).
self._inflight_files: dict[str, set[str]] = {}
self._last_poll = 0.0 # monotonic ts of the last merge poll
self._last_sweep = 0.0 # monotonic ts of the last health sweep
# CI-feedback state (in-memory, per run): fid → last failing-CI summary (fed
# into the re-dispatch prompt) and fid → count of CI-fix re-dispatches so far.
self._ci_feedback: dict[str, str] = {}
self._ci_prior_diff: dict[str, str] = {}
self._ci_fix_attempts: dict[str, int] = {}
# Pre-PR goal-verify gap re-dispatches so far (fid → count), same-tier.
self._goal_fix_attempts: dict[str, int] = {}
# Pre-PR local-gate failure re-dispatches so far (fid → count), same-tier.
self._gate_fix_attempts: dict[str, int] = {}
# Rebase-conflict re-dispatches so far (fid → count) when a sibling merge
# leaves a PR with a real (non-clean) conflict against base.
self._rebase_attempts: dict[str, int] = {}
def _store(self):
return get_store(**self._store_kw)
# ── lifecycle (register_surface start/stop) ───────────────────────────────
def start(self):
if not self.enabled:
log.info("[project_board] loop disabled (project_board.loop_enabled=false) — board API still serves")
return None
self._task = asyncio.create_task(self._run(), name="project-board-loop")
log.info(
"[project_board] loop started (coder=%s reviewer=%s every %ss, max_concurrent=%d, "
"merge_poll=%s, coder_timeout=%ss)",
self.coder_name,
self.reviewer_name,
self.interval,
self.max_concurrent,
self.merge_poll,
self.coder_timeout,
)
return self._task
async def stop(self):
self._stop.set()
if self._task:
self._task.cancel()
try:
await self._task
except (asyncio.CancelledError, Exception): # noqa: BLE001
pass
# Cancel any in-flight drives and await them out. A drive cancelled mid-flight
# can't run its own cleanup, so its worktree stays in self._inflight — reaped
# below. (A completed/blocked drive already popped itself.)
drives, self._drives = list(self._drives), set()
for t in drives:
t.cancel()
if drives:
await asyncio.gather(*drives, return_exceptions=True)
inflight, self._inflight = dict(self._inflight), {}
for fid, (repo, wt, branch) in inflight.items():
try:
await worktree.remove_worktree(repo, wt, branch or "")
log.info("[project_board] reaped in-flight worktree on shutdown: %s", wt)
except Exception: # noqa: BLE001 — teardown must not raise out of shutdown
log.warning("[project_board] worktree reap on shutdown failed: %s", wt, exc_info=True)
# ── crash recovery (runs once, before the puller claims new work) ──────────
async def _reconcile_orphan(self, fid: str):
"""A claimed feature with no live drive: if its PR actually got opened (a crash
between ``open_pr`` and ``open_review``) adopt it → ``in_review``; otherwise
reset it to ``ready`` for a clean rebuild (a stale worktree is cleaned when the
puller re-claims it). Shared by boot recovery and the health sweep."""
store = self._store()
pr_url = await worktree.pr_url_for_branch(f"feat/{fid}", cwd=self._store_kw["repo"])
if pr_url:
store.open_review(fid, pr_url=pr_url)
log.info("[project_board] %s already had a PR → in_review (%s)", fid, pr_url)
else:
store.requeue(fid)
log.info("[project_board] %s reset to ready (no PR — rebuild fresh)", fid)
async def _recover(self):
"""On boot, reconcile every ``in_progress`` feature the previous run left
mid-drive (a drive doesn't survive a restart). ``in_review`` features are NOT
touched — they have a PR and the webhook/poll resolves them."""
for f in self._store().list_features(state="in_progress"):
try:
await self._reconcile_orphan(f["id"])
except Exception: # noqa: BLE001 — recovery is best-effort, per feature
log.warning("[project_board] recovery for %s failed", f["id"], exc_info=True)
# ── periodic health sweep (self-heal during the run) ───────────────────────
async def _maybe_sweep(self):
"""Run the health sweep at most once per ``health_sweep_interval`` (0 = off)."""
if not self.sweep_interval:
return
now = time.monotonic()
if now - self._last_sweep < self.sweep_interval:
return
self._last_sweep = now
await self._sweep()
async def _sweep(self):
"""Self-heal: (a) reset ``in_progress`` features that no live drive owns (a
drive died without finishing) — same reconcile as boot recovery; (b) reap
``feat-<id>`` worktrees whose feature is gone or already ``done`` (a missed
reap). Best-effort; a per-item failure never stops the sweep or the loop."""
store = self._store()
repo = self._store_kw["repo"]
for f in store.list_features(state="in_progress"):
fid = f["id"]
if fid in self._inflight_files:
continue # a live drive owns it
try:
log.info("[project_board] sweep: %s in_progress with no live drive", fid)
await self._reconcile_orphan(fid)
except Exception: # noqa: BLE001
log.warning("[project_board] sweep reconcile for %s failed", fid, exc_info=True)
for fid in worktree.list_feature_worktrees(repo, self.root):
if fid in self._inflight_files:
continue # a live drive owns this worktree
try:
f = store.get_feature(fid)
if f is None or f["board_state"] == "done":
await worktree.reap_feature_worktree(repo, self.root, fid)
log.info("[project_board] sweep: reaped orphaned worktree feat-%s", fid)
except Exception: # noqa: BLE001
log.warning("[project_board] sweep reap for %s failed", fid, exc_info=True)
# ── the puller ────────────────────────────────────────────────────────────
async def _run(self):
try:
await self._recover()
except Exception: # noqa: BLE001 — recovery must never stop the loop from starting
log.exception("[project_board] crash recovery failed")
log.info("[project_board] recovery done — entering tick loop")
while not self._stop.is_set():
spawned = False
try:
await self._maybe_reconcile()
await self._maybe_sweep()
spawned = self._spawn_ready()
except Exception: # noqa: BLE001 — a bad tick must never kill the loop
log.exception("[project_board] loop tick failed")
# Idle (nothing started, nothing running) → sleep the full interval. Busy
# → re-check soon so a freed concurrency slot refills and merges land
# promptly (the poll itself stays rate-limited by merge_poll_interval).
idle = not spawned and not self._drives
timeout = self.interval if idle else min(self.interval, 3.0)
try:
await asyncio.wait_for(self._stop.wait(), timeout=timeout)
except asyncio.TimeoutError:
pass
def _spawn_ready(self) -> bool:
"""Claim Ready features up to the concurrency cap and spawn a drive for each,
with two back-pressure gates: pause when too many PRs already await review
(``max_pending_reviews``), and skip a candidate whose ``files_to_modify``
overlap an in-flight build (the hot-file guard — two parallel coders editing
the same file are a guaranteed merge conflict). Returns True if it started at
least one drive (so the runner stays hot)."""
if len(self._drives) >= self.max_concurrent:
return False
store = self._store()
# Review-queue WIP limit — don't claim new work while the review queue is full.
if self.max_pending_reviews and len(store.list_features(state="in_review")) >= self.max_pending_reviews:
return False
spawned = False
busy = set().union(*self._inflight_files.values()) if self._inflight_files else set()
for candidate in store.ready_queue(relaxed=self.relaxed_gate): # priority order, dep-unblocked
if len(self._drives) >= self.max_concurrent:
break
if candidate.get("board_state") != "ready" or candidate.get("blocked"):
continue # a blocked-flagged feature can carry the `ready` label too
files = set(candidate.get("files_to_modify") or [])
if files & busy:
continue # would edit a file an in-flight build owns → defer a tick
claimed = store.claim(candidate["id"], assignee=self.coder_name)
if claimed is None:
continue # raced / no longer ready
self._inflight_files[claimed["id"]] = files
task = asyncio.create_task(self._drive(claimed), name=f"pb-drive-{claimed['id']}")
self._drives.add(task)
task.add_done_callback(self._make_drive_done_cb(claimed["id"]))
busy |= files
spawned = True
return spawned
def _make_drive_done_cb(self, fid: str):
"""A drive task's done-callback: drop it from the running set and release the
files it held (so a deferred file-conflicting candidate can be claimed next)."""
def _cb(task: asyncio.Task):
self._drives.discard(task)
self._inflight_files.pop(fid, None)
return _cb
# ── the PR reconcile (terminal-edge fallback to the webhook) ───────────────
async def _maybe_reconcile(self):
"""Run the PR reconcile at most once per ``merge_poll_interval`` (and only when
enabled) — cheap, but no reason to hammer ``gh`` every busy tick."""
if not self.merge_poll:
return
now = time.monotonic()
if now - self._last_poll < self.merge_poll_interval:
return
self._last_poll = now
await self._reconcile_prs()
async def _reconcile_prs(self):
"""Reconcile each ``in_review`` feature against its PR's real state — the
fallback to the webhook and the active half of the terminal edges (for
deployments GitHub can't post a webhook to, where a feature would otherwise
sit in_review forever): ``MERGED`` → done (+reap); ``CLOSED`` unmerged →
Blocked for triage (+reap; the work was rejected, don't silently re-dispatch);
``OPEN`` → leave it in review."""
store = self._store()
repo = self._store_kw["repo"]
for f in store.list_features(state="in_review"):
fid = f["id"]
pr_url = f.get("pr_url")
if not pr_url:
continue
try:
state = await worktree.pr_state(pr_url, cwd=repo)
if state == "MERGED":
if store.record_merge(pr_url=pr_url):
await worktree.reap_feature_worktree(repo, self.root, fid)
self._ci_feedback.pop(fid, None)
self._ci_fix_attempts.pop(fid, None)
self._rebase_attempts.pop(fid, None)
log.info("[project_board] reconcile → done: %s (%s)", fid, pr_url)
elif state == "CLOSED":
store.flag_blocked(fid, f"PR closed without merging — needs triage: {pr_url}")
await worktree.reap_feature_worktree(repo, self.root, fid)
self._ci_feedback.pop(fid, None)
self._ci_fix_attempts.pop(fid, None)
self._rebase_attempts.pop(fid, None)
log.info("[project_board] reconcile → blocked (PR closed): %s (%s)", fid, pr_url)
elif state == "OPEN":
# Keep a stale/conflicting PR mergeable BEFORE the CI reconcile: a
# sibling merge re-stales the others off the shared base, and a rebase
# force-pushes + re-runs CI — so checking CI on the stale head first
# would just be thrown away.
if self.auto_rebase and await self._maybe_rebase(store, f, pr_url, repo):
continue
if self.ci_poll:
await self._reconcile_ci(store, fid, pr_url, repo)
except Exception: # noqa: BLE001 — a reconcile error must never kill the loop
log.warning("[project_board] reconcile for %s failed", fid, exc_info=True)
async def _maybe_rebase(self, store, feature: dict, pr_url: str, repo: str) -> bool:
"""If a sibling merge left this in_review PR BEHIND/DIRTY vs base, refresh it.
Returns True if it acted (rebased / re-dispatched / blocked) so the caller skips
the CI reconcile this pass; False when there's nothing to do (CLEAN, a checks-only
BLOCKED, an UNKNOWN still computing, or a transient gh/infra hiccup → next poll
retries). BEHIND (stale, no conflict) → a clean rebase + force-push, NO coder.
DIRTY (a real conflict) → the rebase aborts, so re-dispatch the coder to re-resolve
off the fresh base, bounded by rebase_fix_max, then Blocked for a manual rebase."""
fid = feature["id"]
mss = await worktree.pr_merge_state(pr_url, cwd=repo)
if mss not in ("BEHIND", "DIRTY"):
return False # CLEAN / BLOCKED(checks) / UNKNOWN(computing) / DRAFT → not ours
base = feature.get("base_branch") or self._store_kw.get("base_branch") or "main"
outcome, detail = await worktree.rebase_onto_base(repo, f"feat/{fid}", base, root=self.root)
if outcome == "clean":
log.info("[project_board] %s auto-rebased onto %s (was %s) — force-pushed", fid, base, mss)
return True
if outcome == "error":
log.warning(
"[project_board] %s auto-rebase hit infra trouble (%s) — next poll retries: %s", fid, mss, detail
)
return False # transient — don't burn the coder budget on an infra blip
# outcome == "conflict": a real merge conflict only the coder can resolve.
n = self._rebase_attempts.get(fid, 0)
if n >= self.rebase_fix_max:
store.flag_blocked(
fid, f"rebase conflict with {base} after {n} attempt(s) — needs a manual rebase: {pr_url}"
)
await worktree.reap_feature_worktree(repo, self.root, fid)
log.warning("[project_board] %s blocked (rebase conflict, %d attempt(s)): %s", fid, n, detail)
return True
self._rebase_attempts[fid] = n + 1
self._ci_prior_diff.pop(fid, None)
self._ci_feedback[fid] = (
f"Your branch now CONFLICTS with `{base}` — a sibling change merged into the same "
f"file(s): {detail}. Re-apply your change onto the latest `{base}` and resolve the "
"conflict, keeping BOTH sides' intent. Then stop."
)
store.requeue(fid)
log.info(
"[project_board] %s rebase conflict — re-dispatch %d/%d to resolve (%s): %s",
fid,
n + 1,
self.rebase_fix_max,
mss,
detail,
)
return True
async def _reconcile_ci(self, store, fid: str, pr_url: str, repo: str):
"""Closed-loop verify edge: an OPEN ``in_review`` PR whose checks FAILED is
bounced back to the coder — and the re-dispatch *improves on the last try*
rather than blindly repeating it (the missing OODA correction; before this a
red PR sat in_review forever, then a same-model retry re-made the same mistake).
Two improvement levers, both ProtoMaker-style:
- **Carry the lesson forward** — inject the CI failure summary AND the prior
attempt's diff into the next prompt (fresh-both keeps a fresh session, but
the coder sees what it tried and why it failed).
- **Same-tier fix, THEN escalate** — a red check is usually a fixable nit (a
lint error, a golden-map update, a flaky assertion) the current tier can
self-correct once it SEES the error, not a model-capability ceiling. So
spend ``ci_fix_max`` same-tier retries first; only when those are exhausted
does a configured `coders` ladder climb a tier (smart→reasoning→opus, the
ladder is the bound → top tier fails → Blocked). Without a ladder the
exhausted budget blocks directly. (Escalating on the FIRST failure burned
the expensive tiers on one-line lint fixes — the goal-fix budget already
learned this lesson; the CI path now mirrors it.)
Passing/pending/no-checks → left in review (the merge edge resolves it)."""
status, summary = await worktree.pr_ci_status(pr_url, cwd=repo)
if status != "failing":
return
# Carry the lesson: the CI error + the diff that failed it (best-effort).
self._ci_feedback[fid] = summary
self._ci_prior_diff[fid] = await worktree.pr_diff(pr_url, cwd=repo)
def _block(reason: str):
store.flag_blocked(fid, reason)
self._ci_feedback.pop(fid, None)
self._ci_prior_diff.pop(fid, None)
self._ci_fix_attempts.pop(fid, None)
# Same-tier CI-fix budget FIRST (both ladder and single-coder): a red check
# is usually a fixable nit the current tier can correct once it sees the
# error — don't burn a stronger model on a one-line lint fix. The CI error +
# prior diff are already injected above, so the re-dispatch improves on the
# last try rather than repeating it.
attempts = self._ci_fix_attempts.get(fid, 0)
if attempts < self.ci_fix_max:
self._ci_fix_attempts[fid] = attempts + 1
store.requeue(fid)
log.info(
"[project_board] reconcile → same-tier CI-fix (attempt %d/%d): %s",
attempts + 1,
self.ci_fix_max,
fid,
)
return
# Same-tier budget exhausted. With a ladder, climb a model tier and reset the
# per-tier budget so the new rung gets its own fix attempts; without one, block.
if self.escalation_on:
nxt = store.escalate(fid, f"CI failed: {_ci_failure_reason(summary)}")
if not nxt:
_block(f"CI failing at the top model tier after {attempts} same-tier fix(es) — needs triage: {pr_url}")
await worktree.reap_feature_worktree(repo, self.root, fid)
log.warning("[project_board] reconcile → blocked (CI fails at top tier): %s", fid)
return
self._ci_fix_attempts.pop(fid, None) # fresh same-tier budget at the new rung
store.requeue(fid)
log.info("[project_board] reconcile → escalate to %s + re-dispatch (CI failed): %s", nxt, fid)
return
_block(f"CI still failing after {attempts} fix attempt(s) — needs triage: {pr_url}")
await worktree.reap_feature_worktree(repo, self.root, fid)
log.warning("[project_board] reconcile → blocked (CI fails, %d attempt(s) exhausted): %s", attempts, fid)
async def _drive(self, feature: dict):
"""Drive one feature ready→in_review (or →blocked). `done` is set later by
the merge webhook. With per-tier coders configured, a *capability* failure
(coder errored / produced no diff) climbs the ladder; with a single coder
it blocks at once — no redundant tier dance."""
store = self._store()
fid = feature["id"]
repo = feature.get("repo") or "."
base = feature.get("base_branch") or "main"
title = f"feat: {feature['title']}"
tier = store.current_tier(fid) if self.escalation_on else ""
retries = 0 # transient-failure retries at the current tier (reset on a climb)
wt = branch = None
keep_wt = False # reuse the worktree on a goal-fix retry (keep the impl; add tests)
try:
while True:
# Rebuild the prompt each attempt so a re-dispatch (CI bounce,
# goal-verify gap, or tier escalation) picks up the latest
# _ci_feedback + _ci_prior_diff. Fetch this area's distilled lessons
# from the KG (best-effort, async) and inject them — the flywheel READ.
lessons = await self._fetch_kg_lessons(feature)
prompt = self._build_prompt(feature, lessons=lessons)
coder_name = self.coders.get(tier, self.coder_name) if self.escalation_on else self.coder_name
coder = self._resolve_delegate(coder_name, "acp")
if coder is None:
store.flag_blocked(fid, f"coder delegate {coder_name!r} not configured/enabled")
return
try:
# How this attempt gets its worktree + coder result:
# • keep_wt → REUSE the kept worktree (impl intact), one re-dispatch.
# A goal-fix/gate-fix retry must not throw the implementation away —
# the coder only ADDS what the reviewer flagged (usually tests); a
# fresh rebuild makes it re-implement and never reach the tests (the
# bd-2fd/bd-3cj block).
# • coder.solve (ADR 0064 P2, opt-in) → the execution-grounded
# ladder over the feature's acceptance tests (coder_seam.py).
# Same "from-scratch build only" rule as max-mode: a carried-
# forward re-dispatch FIXES the existing diff with one coder.
# Only preempts max-mode when max_mode_n<=1 (_use_coder_solve) —
# a board already running Max-Mode keeps that behavior.
# • max-mode → N parallel candidates, judge, promote the winner (#21).
# ONLY for a from-scratch build: a carried-forward re-dispatch (a CI
# bounce / goal-fix / gate-fix — all signalled by _ci_feedback) FIXES
# the existing diff with one coder, so it must NOT re-fan-out N.
# • otherwise → one fresh worktree, one dispatch.
if keep_wt and wt is not None:
keep_wt = False # consume the reuse
self._inflight[fid] = (repo, wt, branch)
result = await worktree.dispatch_coder(coder, wt, prompt, timeout=self.coder_timeout or None)
elif self._use_coder_solve(feature) and not self._ci_feedback.get(fid):
files_to_modify = feature.get("files_to_modify") or []
fusion = (
self._resolve_delegate(self.coder_solve_fusion_delegate, "openai")
if self.coder_solve_fusion_delegate
else None
)
if fusion is not None:
# Gate BEFORE dispatch: fusion can't tool-call and returns
# whole-file replacements, so an oversized file risks a
# silent truncated rewrite (coder_seam.fusion_viable_for_files).
# Not viable ⇒ this dispatch just skips the fusion rung — the
# ladder still runs greedy/best-of-k/tree-search unchanged.
viable, reason = coder_seam.fusion_viable_for_files(
repo,
files_to_modify,
max_file_chars=self.coder_solve_fusion_max_file_chars,
max_total_chars=self.coder_solve_fusion_max_total_chars,
)
if not viable:
log.info("[project_board] %s fusion rung skipped for this dispatch: %s", fid, reason)
fusion = None
wt, branch, result = await coder_seam.dispatch(
task=prompt,
coder=coder,
repo=repo,
base=base,
root=self.root,
fid=fid,
dispatch_timeout=self.coder_timeout or None,
test_cmd=self.coder_solve_test_cmd,
test_timeout=self.coder_solve_test_timeout,
budget=self.coder_solve_budget,
k=self.coder_solve_k,
tree_depth=self.coder_solve_tree_depth,
record_gens=lambda n: store.record_gens_spent(fid, n),
fusion_delegate=fusion,
fusion_k=self.coder_solve_fusion_k,
files_to_modify=files_to_modify,
fusion_max_file_chars=self.coder_solve_fusion_max_file_chars,
)
self._inflight[fid] = (repo, wt, branch)
elif self.max_mode_n > 1 and not self._ci_feedback.get(fid):
wt, branch, result = await self._dispatch_max_mode(feature, coder, prompt, repo, base, fid)
self._inflight[fid] = (repo, wt, branch)
else:
wt, branch = await worktree.create_worktree(repo, base, fid, self.root)
self._inflight[fid] = (repo, wt, branch) # track for shutdown reaping
result = await worktree.dispatch_coder(
coder, wt, prompt, timeout=self.coder_timeout or None
) # reaps subprocess; CoderTimeout if it overruns
# Goal-verification gate: confirm the diff meets the acceptance
# criteria before opening a PR. A gap is a capability failure (the
# coder didn't deliver) → escalate/block, don't open the PR.
if self.goal_verify:
gap = await self._verify_goal(feature, wt, base, result or "")
if gap:
# A goal-verify gap (e.g. the coder skipped tests) is
# fixable by the SAME coder told what's missing — not a
# model-capability failure. Carry the gap (+ the rejected
# diff, stashed by _verify_goal) as feedback and re-dispatch
# the same tier, bounded by goal_fix_max, BEFORE escalating.
n = self._goal_fix_attempts.get(fid, 0)
if n < self.goal_fix_max:
self._goal_fix_attempts[fid] = n + 1
# KEEP the worktree (the impl is in its files); the coder
# only ADDS what the reviewer flagged. The diff is on disk,
# so don't also carry it as prompt text (redundant/confusing).
self._ci_prior_diff.pop(fid, None)
self._ci_feedback[fid] = (
"Your implementation from the previous attempt is ALREADY in this "
"worktree's files. A reviewer rejected it before it could open a PR "
f"for: {gap}. ADD what's missing to the existing files (usually the "
"tests) — do NOT rewrite or delete the working implementation. Then stop."
)
log.info(
"[project_board] %s goal-verify gap — re-dispatch %d/%d (tier=%s, keep worktree): %s",
fid,
n + 1,
self.goal_fix_max,
tier or "default",
gap,
)
keep_wt = True # reuse the worktree (impl intact) on the retry
continue
raise worktree.WorktreeError(f"goal verification failed: {gap}")
# Auto-fix lint/format before the PR — the coder can't run the repo's
# formatter (edit-only), so this clears trivial nits that would fail CI.
await self._run_fixups(wt)
# Pre-PR local gate: run the repo's real checks in the worktree and, on
# failure, hand the coder the actual output to fix IN-WORKTREE before a PR
# (and a CI round-trip) ever opens. Same-tier, keep-worktree, bounded by
# local_gate_max; on exhaustion open the PR anyway (CI is the backstop).
gate_out = await self._run_local_gate(wt)
if gate_out is not None:
n = self._gate_fix_attempts.get(fid, 0)
if n < self.local_gate_max:
self._gate_fix_attempts[fid] = n + 1
self._ci_prior_diff.pop(fid, None) # impl is on disk; don't echo it back
self._ci_feedback[fid] = (
"Your changes are ALREADY in this worktree's files, but the pre-PR "
"gate failed. FIX what it reports in the existing files, then stop — "
"the loop opens the PR. Do NOT rewrite working code. Gate output:\n\n" + gate_out
)
log.info(
"[project_board] %s pre-PR gate failed — re-dispatch %d/%d (tier=%s, keep worktree)",
fid,
n + 1,
self.local_gate_max,
tier or "default",
)
keep_wt = True
continue
log.warning(
"[project_board] %s pre-PR gate still failing after %d fix(es) — opening PR anyway (CI backstop)",
fid,
n,
)
pr_url = await worktree.open_pr(wt, branch, base=base, title=title, body=(result or "")[:4000])
except (worktree.NoChangesError, worktree.WorktreeError) as exc:
policy = classify(str(exc))
# A capability failure = the coder didn't deliver (no diff / dispatch
# error / timed out). Those are NOT transient-retried (re-running the
# same coder won't help) — they escalate a tier or block. Only true
# infra failures (push/fetch/gh network/rate-limit) get the backoff.
capability = (
isinstance(exc, (worktree.NoChangesError, worktree.CoderTimeout, coder_seam.SolveExhausted))
or str(exc).startswith("coder dispatch failed")
or str(exc).startswith("goal verification failed")
)
# 1. Transient infra → back off and retry the SAME tier (a re-dispatch
# off the latest base also clears a merge conflict).
if policy.retryable and not capability and retries < policy.max_attempts - 1:
retries += 1
log.info(
"[project_board] %s %s — retry %d/%d in %ss: %s",
fid,
policy.category,
retries + 1,
policy.max_attempts,
policy.base_delay_s,
exc,
)
await asyncio.sleep(policy.base_delay_s)
continue
# 2. Capability failure + a ladder → climb a model tier (fresh budget).
if self.escalation_on and capability:
nxt = store.escalate(fid, str(exc)[:200])
if nxt:
log.info("[project_board] %s escalating %s→%s: %s", fid, tier, nxt, exc)
tier = nxt
retries = 0
# Fresh goal-fix budget at the new tier — otherwise a tier that
# exhausted its goal-fix retries hands the next (stronger) tier a
# spent budget, so it blocks on its first gap without a real shot.
self._goal_fix_attempts.pop(fid, None)
self._gate_fix_attempts.pop(fid, None) # fresh local-gate budget too
continue
# 3. Terminal, or retries/ladder exhausted → Blocked.
log.warning("[project_board] %s blocked (%s): %s", fid, policy.category, exc)
store.flag_blocked(fid, f"{policy.category}: {exc}")
if wt:
await worktree.remove_worktree(repo, wt, branch or "")
self._inflight.pop(fid, None)
return
# Built + PR opened. The fleet PR-review pipeline reviews it on open;
# only dispatch an explicit review when configured to (review_dispatch).
log.info("[project_board] %s coder done (%d chars) → %s", fid, len(result or ""), pr_url)
store.open_review(fid, pr_url=pr_url)
self._goal_fix_attempts.pop(fid, None) # gate passed — reset the goal-fix budget
self._gate_fix_attempts.pop(fid, None) # and the local-gate budget
if self.review_dispatch:
await self._request_review(fid, pr_url)
# Keep the worktree (a CI-fail bounce re-dispatches); reaping happens
# on a terminal block above, and the coder subprocess is already reaped.
self._inflight.pop(fid, None) # built OK — not an interrupted build to reap
return
except BoardError as exc:
log.warning("[project_board] %s blocked (board): %s", fid, exc)
store.flag_blocked(fid, str(exc))
self._inflight.pop(fid, None)
except Exception as exc: # noqa: BLE001 — unexpected; block, don't crash the loop
log.exception("[project_board] %s unexpected failure", fid)
store.flag_blocked(fid, f"unexpected: {type(exc).__name__}: {exc}")
if wt:
await worktree.remove_worktree(repo, wt, branch or "")
self._inflight.pop(fid, None)
async def _request_review(self, fid: str, pr_url: str):
"""Hand the PR to the reviewer (an a2a delegate, e.g. quinn). Best-effort:
a review-dispatch failure doesn't block the feature — CI + the merge
webhook are the gate; the reviewer is advisory signal."""
reviewer = self._resolve_delegate(self.reviewer_name, "a2a")
if reviewer is None:
log.info("[project_board] no reviewer %r configured — skipping review dispatch", self.reviewer_name)
return
from plugins.delegates.adapters import ADAPTERS
try:
msg = f"Please review this PR for correctness and acceptance: {pr_url}"
await ADAPTERS["a2a"].dispatch(reviewer, msg)
except Exception as exc: # noqa: BLE001 — fully best-effort: a review-dispatch
# failure (DelegateError, httpx/connection, anything) must NEVER block a
# feature whose PR already opened. CI + the merge webhook are the gate.
log.warning("[project_board] review dispatch for %s failed: %s", fid, exc)
# ── helpers ───────────────────────────────────────────────────────────────
def _use_coder_solve(self, feature: dict) -> bool:
"""The P2 board-seam dispatch decision (ADR 0064) — see coder_seam.py.
`coder_solve` is this repo's own opt-out valve (default on); the actual
grounding gate (coder plugin importable + acceptance criteria + a runnable
test command) lives in ``coder_seam.should_use_solve``.
Max-Mode takes precedence when both are configured (`max_mode_n>1`): a
board already relying on Max-Mode's judge-fallback (always ships a
best-effort PR) must not have that silently swapped for solve()'s harder
"block if nothing passes" behavior just because the separate `coder`
plugin became importable. Enabling coder.solve on such a board is a
deliberate config change (set `max_mode_n<=1`), never a side effect of
installing `coder` for something else."""
if not self.coder_solve:
return False
if self.max_mode_n > 1:
return False
return coder_seam.should_use_solve(feature, test_cmd=self.coder_solve_test_cmd)
def _resolve_delegate(self, name: str, expect_type: str):
"""Look up a live delegate by name from the delegates registry. Returns the
Delegate or None (not configured / wrong type / plugin disabled). Thin
wrapper — the real lookup is shared with api.py's test-rung route via
``coder_seam.resolve_delegate``."""
return coder_seam.resolve_delegate(name, expect_type)
async def _run_fixups(self, wt: str) -> None:
"""Run the repo's auto-fix command (``format_cmd``, e.g.
``ruff check --fix . && ruff format .``) in the worktree before opening the PR.
The coder is edit-only — it can't run the linter/formatter, so trivial lint/format
nits would otherwise fail CI and burn a bounce/escalation. Best-effort: no command
configured, or any error/timeout, just proceeds (CI is still the real lint gate)."""
if not self.format_cmd:
return
try:
proc = await asyncio.create_subprocess_shell(
self.format_cmd,
cwd=wt,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await asyncio.wait_for(proc.communicate(), timeout=180)
except Exception as exc: # noqa: BLE001 — best-effort; CI still gates lint
log.info("[project_board] fixups command failed (proceeding — CI still gates): %s", exc)
async def _run_local_gate(self, wt: str) -> str | None:
"""Run the pre-PR local gate (``local_gate_cmd``) in the worktree.
Returns ``None`` when the gate passes (exit 0), when no gate is configured,
or when the gate itself couldn't run (timeout / unlaunchable command) — a
broken or flaky gate must never block otherwise-good work, so those degrade
to "pass" (CI is still the real gate). Returns the captured output (tail,
truncated to ``local_gate_output_chars``) on a CLEAN non-zero exit, so the
caller can hand it to the coder to fix."""
cmd = self.local_gate_cmd
if not cmd:
return None
try:
proc = await asyncio.create_subprocess_shell(
cmd,
cwd=wt,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
try:
out, _ = await asyncio.wait_for(proc.communicate(), timeout=self.local_gate_timeout)
except asyncio.TimeoutError:
try:
proc.kill()
except ProcessLookupError:
pass
log.warning("[project_board] pre-PR gate timed out (%ss) — treating as pass", self.local_gate_timeout)
return None
if proc.returncode == 0:
return None
text = (out or b"").decode("utf-8", "replace").strip()
if len(text) > self.local_gate_output_chars:
text = "…(truncated)…\n" + text[-self.local_gate_output_chars :]
return text or f"gate command exited {proc.returncode} with no output"
except Exception as exc: # noqa: BLE001 — a gate that can't run must not block
log.info("[project_board] pre-PR gate failed to run (treating as pass — CI still gates): %s", exc)
return None
async def _verify_goal(self, feature: dict, wt: str, base: str, coder_reply: str = "") -> str | None:
"""Pre-PR gate — DETERMINISTIC: no LLM, no diff dump. The one thing it adds over
CI is requiring a test to EXIST for a code change (CI runs tests but can't require
their presence). So it just checks the changed-file LIST for a test file — cheap,
instant, and immune to the truncation that made the old "LLM eyeballs the diff"
version false-reject tests it couldn't see (smart/reasoning/opus each "failed" on
tests they'd actually written — tests sort LAST by path and fell off the cap, ~40
min of cycles wasted). CORRECTNESS is CI's job — it runs the tests the coder wrote;
a wrong diff fails CI and the CI-feedback edge bounces it back.
ESCAPE HATCH: not every code change needs a test (a pure refactor, config/docs-as-
code, a constant tweak). The coder — which saw the actual change — can declare
``NO_TEST_NEEDED: <reason>`` in its reply; we log the reason and pass, rather than
burning retries on a test that doesn't apply. Returns a gap string (→ re-dispatch/
escalate) or None. Fails OPEN on any error (never blocks a good PR on infra)."""
ac = (feature.get("acceptance_criteria") or "").strip()
if not ac: