diff --git a/backend/internal/review/launcher.go b/backend/internal/review/launcher.go index d9639f0bd9..f56071a5ce 100644 --- a/backend/internal/review/launcher.go +++ b/backend/internal/review/launcher.go @@ -21,6 +21,8 @@ type Launcher interface { Notify(ctx context.Context, handleID string, spec LaunchSpec) error // Alive reports whether a reviewer pane is still running. Alive(ctx context.Context, handleID string) (bool, error) + // Stop tears down a reviewer pane before replacing it with a fresh pass. + Stop(ctx context.Context, handleID string) error } // LaunchSpec is the engine's request to (re)launch a reviewer for one pass. @@ -40,6 +42,7 @@ type LaunchSpec struct { // satisfies it. type reviewerRuntime interface { Create(ctx context.Context, cfg ports.RuntimeConfig) (ports.RuntimeHandle, error) + Destroy(ctx context.Context, handle ports.RuntimeHandle) error IsAlive(ctx context.Context, handle ports.RuntimeHandle) (bool, error) SendMessage(ctx context.Context, handle ports.RuntimeHandle, message string) error } @@ -150,3 +153,10 @@ func (l *agentLauncher) Alive(ctx context.Context, handleID string) (bool, error } return l.runtime.IsAlive(ctx, ports.RuntimeHandle{ID: handleID}) } + +func (l *agentLauncher) Stop(ctx context.Context, handleID string) error { + if handleID == "" { + return nil + } + return l.runtime.Destroy(ctx, ports.RuntimeHandle{ID: handleID}) +} diff --git a/backend/internal/review/launcher_test.go b/backend/internal/review/launcher_test.go index 0717841ec0..5ff9b74176 100644 --- a/backend/internal/review/launcher_test.go +++ b/backend/internal/review/launcher_test.go @@ -47,6 +47,7 @@ type fakeRuntime struct { createCfg ports.RuntimeConfig sentMsg string sentTo string + destroyed string alive bool } @@ -57,6 +58,10 @@ func (f *fakeRuntime) Create(_ context.Context, cfg ports.RuntimeConfig) (ports. func (f *fakeRuntime) IsAlive(_ context.Context, _ ports.RuntimeHandle) (bool, error) { return f.alive, nil } +func (f *fakeRuntime) Destroy(_ context.Context, handle ports.RuntimeHandle) error { + f.destroyed = handle.ID + return nil +} func (f *fakeRuntime) SendMessage(_ context.Context, handle ports.RuntimeHandle, msg string) error { f.sentTo = handle.ID f.sentMsg = msg @@ -136,6 +141,20 @@ func TestLauncherAlive(t *testing.T) { } } +func TestLauncherStopDestroysHandle(t *testing.T) { + rt := &fakeRuntime{} + l := NewLauncher(fakeReviewerResolver{ok: true}, rt) + if err := l.Stop(context.Background(), "review-mer-1"); err != nil { + t.Fatalf("Stop: %v", err) + } + if rt.destroyed != "review-mer-1" { + t.Fatalf("destroyed = %q, want review-mer-1", rt.destroyed) + } + if err := l.Stop(context.Background(), ""); err != nil { + t.Fatalf("empty Stop: %v", err) + } +} + func TestLauncherSpawnNoAdapter(t *testing.T) { l := NewLauncher(fakeReviewerResolver{ok: false}, &fakeRuntime{}) if _, err := l.Spawn(context.Background(), launchSpec()); err == nil || !strings.Contains(err.Error(), "no reviewer adapter") { diff --git a/backend/internal/review/review.go b/backend/internal/review/review.go index 46d47b67cf..dac71aa58e 100644 --- a/backend/internal/review/review.go +++ b/backend/internal/review/review.go @@ -207,6 +207,7 @@ func (e *Engine) Trigger(ctx stdctx.Context, workerID domain.SessionID) (Trigger var created []domain.ReviewRun batchID := "" + stoppedStaleReviewer := false for _, reviewState := range reviews { if reviewState.Status != ReviewStateNeedsReview && reviewState.Status != ReviewStateChangesRequested { continue @@ -225,9 +226,13 @@ func (e *Engine) Trigger(ctx stdctx.Context, workerID domain.SessionID) (Trigger } } } - if _, err := e.store.SupersedeStaleRunningReviewRuns(ctx, workerID, reviewState.PRURL, reviewState.TargetSHA, "superseded by a review trigger for a newer commit"); err != nil { + stale, err := e.store.SupersedeStaleRunningReviewRuns(ctx, workerID, reviewState.PRURL, reviewState.TargetSHA, "superseded by a review trigger for a newer commit") + if err != nil { return TriggerResult{}, err } + if stale > 0 { + stoppedStaleReviewer = true + } if batchID == "" { batchID = e.newID() } @@ -277,7 +282,11 @@ func (e *Engine) Trigger(ctx stdctx.Context, workerID domain.SessionID) (Trigger if err != nil { return TriggerResult{}, failRuns(0, err) } - if alive { + if alive && stoppedStaleReviewer { + if err := e.launcher.Stop(ctx, reviewRow.ReviewerHandleID); err != nil { + return TriggerResult{}, failRuns(0, fmt.Errorf("stop stale reviewer: %w", err)) + } + } else if alive { handleID = reviewRow.ReviewerHandleID } } @@ -367,6 +376,18 @@ func (e *Engine) List(ctx stdctx.Context, workerID domain.SessionID) (SessionRev } else if ok { handle = review.ReviewerHandleID } + if handle != "" && e.launcher != nil { + alive, err := e.launcher.Alive(ctx, handle) + if err != nil { + return SessionReviews{}, err + } + if !alive { + runs, err = e.failRunningRuns(ctx, runs, "reviewer stopped before submitting review") + if err != nil { + return SessionReviews{}, err + } + } + } prs, err := e.prs.ListPRsBySession(ctx, workerID) if err != nil { return SessionReviews{}, err @@ -374,6 +395,24 @@ func (e *Engine) List(ctx stdctx.Context, workerID domain.SessionID) (SessionRev return SessionReviews{ReviewerHandleID: handle, Runs: runs, Reviews: Plan(prs, runs)}, nil } +func (e *Engine) failRunningRuns(ctx stdctx.Context, runs []domain.ReviewRun, body string) ([]domain.ReviewRun, error) { + for i := range runs { + if runs[i].Status != domain.ReviewRunRunning { + continue + } + updated, err := e.store.UpdateReviewRunResult(ctx, runs[i].ID, domain.ReviewRunFailed, domain.VerdictNone, body, "") + if err != nil { + return nil, err + } + if updated { + runs[i].Status = domain.ReviewRunFailed + runs[i].Verdict = domain.VerdictNone + runs[i].Body = body + } + } + return runs, nil +} + // reviewerHarness resolves which harness reviews the worker's PR: a configured // reviewer wins, otherwise the worker's own harness is reused (falling back to // claude-code), per domain.ResolveReviewerHarness. diff --git a/backend/internal/review/review_test.go b/backend/internal/review/review_test.go index 455ae690de..d483684c94 100644 --- a/backend/internal/review/review_test.go +++ b/backend/internal/review/review_test.go @@ -144,6 +144,8 @@ type fakeLauncher struct { spawned bool spawnCount int notified bool + stopped bool + stopHandle string gotSpec LaunchSpec gotHandle string specs []LaunchSpec @@ -171,6 +173,12 @@ func (f *fakeLauncher) Notify(_ context.Context, handleID string, spec LaunchSpe func (f *fakeLauncher) Alive(_ context.Context, _ string) (bool, error) { return f.alive || f.spawned, nil } +func (f *fakeLauncher) Stop(_ context.Context, handleID string) error { + f.stopped = true + f.stopHandle = handleID + f.alive = false + return nil +} func liveWorker() domain.SessionRecord { return domain.SessionRecord{ @@ -377,7 +385,7 @@ func TestTriggerNotifiesLiveReviewerOnNewCommit(t *testing.T) { } } -func TestTriggerSupersedesOlderRunningRunOnNewCommit(t *testing.T) { +func TestTriggerStopsStaleReviewerBeforeReplacingOlderRunningRun(t *testing.T) { store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, runs: []domain.ReviewRun{{ID: "run-old", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha0", Status: domain.ReviewRunRunning}}, @@ -395,8 +403,11 @@ func TestTriggerSupersedesOlderRunningRunOnNewCommit(t *testing.T) { if old := store.runs[0]; old.ID != "run-old" || old.Status != domain.ReviewRunFailed { t.Fatalf("expected older running run to be failed, got %+v", old) } - if !launcher.notified || launcher.spawned { - t.Fatalf("expected live reviewer pane reused for new commit: %+v", launcher) + if !launcher.stopped || launcher.stopHandle != "review-mer-1" { + t.Fatalf("expected stale live reviewer pane stopped before replacement: %+v", launcher) + } + if !launcher.spawned || launcher.notified { + t.Fatalf("expected fresh spawn after stopping stale reviewer: %+v", launcher) } } @@ -605,3 +616,27 @@ func TestListReturnsHandleAndRuns(t *testing.T) { t.Fatalf("list = %+v", got) } } + +func TestListMarksRunningRunsFailedWhenReviewerStopped(t *testing.T) { + store := &fakeStore{ + review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, + runs: []domain.ReviewRun{{ + ID: "run-1", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1", + Status: domain.ReviewRunRunning, Verdict: domain.VerdictNone, + }}, + } + eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, &fakeLauncher{alive: false}) + got, err := eng.List(context.Background(), "mer-1") + if err != nil { + t.Fatalf("List: %v", err) + } + if len(got.Runs) != 1 || got.Runs[0].Status != domain.ReviewRunFailed { + t.Fatalf("runs = %+v, want failed running run", got.Runs) + } + if got.Reviews[0].Status == ReviewStateRunning { + t.Fatalf("review state should not remain running after stopped reviewer: %+v", got.Reviews[0]) + } + if !strings.Contains(store.runs[0].Body, "reviewer stopped") { + t.Fatalf("failed body = %q", store.runs[0].Body) + } +} diff --git a/backend/internal/service/review/review.go b/backend/internal/service/review/review.go index 61f09e3b79..9d04abca89 100644 --- a/backend/internal/service/review/review.go +++ b/backend/internal/service/review/review.go @@ -255,7 +255,7 @@ func (s *Service) deliverableRuns(ctx context.Context, workerID domain.SessionID if run.Status != domain.ReviewRunComplete || run.Verdict != domain.VerdictChangesRequested || run.DeliveredAt != nil { continue } - if run.BatchID != "" && currentHeads[run.PRURL] != run.TargetSHA { + if currentHeads[run.PRURL] != run.TargetSHA { continue } deliverable = append(deliverable, run) diff --git a/backend/internal/service/review/review_test.go b/backend/internal/service/review/review_test.go index 259c47f56c..1d25d0cdb8 100644 --- a/backend/internal/service/review/review_test.go +++ b/backend/internal/service/review/review_test.go @@ -116,7 +116,11 @@ func (f *fakeReducer) ApplyReviewBatch(_ context.Context, _ domain.SessionID, ba func TestSubmitPersistsThenAppliesThenStampsDelivered(t *testing.T) { now := time.Unix(100, 0).UTC() - st := &fakeStore{ok: true, run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}} + st := &fakeStore{ + ok: true, + run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}, + prs: []domain.PullRequest{{URL: "pr1", HeadSHA: "sha1"}}, + } reducer := &fakeReducer{outcome: lifecycle.ReviewDeliverySent} svc := New(nil, st, WithLifecycleReducer(reducer), WithClock(func() time.Time { return now })) @@ -135,6 +139,27 @@ func TestSubmitPersistsThenAppliesThenStampsDelivered(t *testing.T) { } } +func TestSubmitSkipsStaleChangesRequestedForSupersededHead(t *testing.T) { + st := &fakeStore{ + ok: true, + run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", PRURL: "pr1", TargetSHA: "old", Status: domain.ReviewRunRunning}, + prs: []domain.PullRequest{{URL: "pr1", HeadSHA: "new"}}, + } + reducer := &fakeReducer{outcome: lifecycle.ReviewDeliverySent} + svc := New(nil, st, WithLifecycleReducer(reducer)) + + run, err := svc.Submit(context.Background(), "mer-1", "run-1", domain.VerdictChangesRequested, "stale fix", "987") + if err != nil { + t.Fatalf("Submit: %v", err) + } + if run.Status != domain.ReviewRunComplete || run.DeliveredAt != nil { + t.Fatalf("stale review should be complete but undelivered: %+v", run) + } + if reducer.calls != 0 || reducer.batchCalls != 0 || st.markCalls != 0 { + t.Fatalf("stale review should not notify or mark delivered: calls=%d batch=%d mark=%d", reducer.calls, reducer.batchCalls, st.markCalls) + } +} + func TestSubmitBatchRunDoesNotWaitForOtherRunningRuns(t *testing.T) { now := time.Unix(100, 0).UTC() st := &fakeStore{ @@ -229,7 +254,11 @@ func TestSubmitBatchApprovedOnlySendsNothing(t *testing.T) { func TestSubmitDeliveryFailureLeavesCompletedUndeliveredForRetry(t *testing.T) { sendErr := errors.New("dead pane") - st := &fakeStore{ok: true, run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}} + st := &fakeStore{ + ok: true, + run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}, + prs: []domain.PullRequest{{URL: "pr1", HeadSHA: "sha1"}}, + } reducer := &fakeReducer{err: sendErr} svc := New(nil, st, WithLifecycleReducer(reducer))