Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions backend/internal/review/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Launcher interface {
Notify(ctx context.Context, handleID string, spec LaunchSpec) error
// Alive reports whether a reviewer pane is still running.
Alive(ctx context.Context, handleID string) (bool, error)
// Stop tears down a reviewer pane before replacing it with a fresh pass.
Stop(ctx context.Context, handleID string) error
}

// LaunchSpec is the engine's request to (re)launch a reviewer for one pass.
Expand All @@ -40,6 +42,7 @@ type LaunchSpec struct {
// satisfies it.
type reviewerRuntime interface {
Create(ctx context.Context, cfg ports.RuntimeConfig) (ports.RuntimeHandle, error)
Destroy(ctx context.Context, handle ports.RuntimeHandle) error
IsAlive(ctx context.Context, handle ports.RuntimeHandle) (bool, error)
SendMessage(ctx context.Context, handle ports.RuntimeHandle, message string) error
}
Expand Down Expand Up @@ -150,3 +153,10 @@ func (l *agentLauncher) Alive(ctx context.Context, handleID string) (bool, error
}
return l.runtime.IsAlive(ctx, ports.RuntimeHandle{ID: handleID})
}

func (l *agentLauncher) Stop(ctx context.Context, handleID string) error {
if handleID == "" {
return nil
}
return l.runtime.Destroy(ctx, ports.RuntimeHandle{ID: handleID})
}
19 changes: 19 additions & 0 deletions backend/internal/review/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type fakeRuntime struct {
createCfg ports.RuntimeConfig
sentMsg string
sentTo string
destroyed string
alive bool
}

Expand All @@ -57,6 +58,10 @@ func (f *fakeRuntime) Create(_ context.Context, cfg ports.RuntimeConfig) (ports.
func (f *fakeRuntime) IsAlive(_ context.Context, _ ports.RuntimeHandle) (bool, error) {
return f.alive, nil
}
func (f *fakeRuntime) Destroy(_ context.Context, handle ports.RuntimeHandle) error {
f.destroyed = handle.ID
return nil
}
func (f *fakeRuntime) SendMessage(_ context.Context, handle ports.RuntimeHandle, msg string) error {
f.sentTo = handle.ID
f.sentMsg = msg
Expand Down Expand Up @@ -136,6 +141,20 @@ func TestLauncherAlive(t *testing.T) {
}
}

func TestLauncherStopDestroysHandle(t *testing.T) {
rt := &fakeRuntime{}
l := NewLauncher(fakeReviewerResolver{ok: true}, rt)
if err := l.Stop(context.Background(), "review-mer-1"); err != nil {
t.Fatalf("Stop: %v", err)
}
if rt.destroyed != "review-mer-1" {
t.Fatalf("destroyed = %q, want review-mer-1", rt.destroyed)
}
if err := l.Stop(context.Background(), ""); err != nil {
t.Fatalf("empty Stop: %v", err)
}
}

func TestLauncherSpawnNoAdapter(t *testing.T) {
l := NewLauncher(fakeReviewerResolver{ok: false}, &fakeRuntime{})
if _, err := l.Spawn(context.Background(), launchSpec()); err == nil || !strings.Contains(err.Error(), "no reviewer adapter") {
Expand Down
43 changes: 41 additions & 2 deletions backend/internal/review/review.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (e *Engine) Trigger(ctx stdctx.Context, workerID domain.SessionID) (Trigger

var created []domain.ReviewRun
batchID := ""
stoppedStaleReviewer := false
for _, reviewState := range reviews {
if reviewState.Status != ReviewStateNeedsReview && reviewState.Status != ReviewStateChangesRequested {
continue
Expand All @@ -225,9 +226,13 @@ func (e *Engine) Trigger(ctx stdctx.Context, workerID domain.SessionID) (Trigger
}
}
}
if _, err := e.store.SupersedeStaleRunningReviewRuns(ctx, workerID, reviewState.PRURL, reviewState.TargetSHA, "superseded by a review trigger for a newer commit"); err != nil {
stale, err := e.store.SupersedeStaleRunningReviewRuns(ctx, workerID, reviewState.PRURL, reviewState.TargetSHA, "superseded by a review trigger for a newer commit")
if err != nil {
return TriggerResult{}, err
}
if stale > 0 {
stoppedStaleReviewer = true
}
if batchID == "" {
batchID = e.newID()
}
Expand Down Expand Up @@ -277,7 +282,11 @@ func (e *Engine) Trigger(ctx stdctx.Context, workerID domain.SessionID) (Trigger
if err != nil {
return TriggerResult{}, failRuns(0, err)
}
if alive {
if alive && stoppedStaleReviewer {
if err := e.launcher.Stop(ctx, reviewRow.ReviewerHandleID); err != nil {
return TriggerResult{}, failRuns(0, fmt.Errorf("stop stale reviewer: %w", err))
}
} else if alive {
handleID = reviewRow.ReviewerHandleID
}
}
Expand Down Expand Up @@ -367,13 +376,43 @@ func (e *Engine) List(ctx stdctx.Context, workerID domain.SessionID) (SessionRev
} else if ok {
handle = review.ReviewerHandleID
}
if handle != "" && e.launcher != nil {
alive, err := e.launcher.Alive(ctx, handle)
if err != nil {
return SessionReviews{}, err
}
if !alive {
runs, err = e.failRunningRuns(ctx, runs, "reviewer stopped before submitting review")
if err != nil {
return SessionReviews{}, err
}
}
}
prs, err := e.prs.ListPRsBySession(ctx, workerID)
if err != nil {
return SessionReviews{}, err
}
return SessionReviews{ReviewerHandleID: handle, Runs: runs, Reviews: Plan(prs, runs)}, nil
}

func (e *Engine) failRunningRuns(ctx stdctx.Context, runs []domain.ReviewRun, body string) ([]domain.ReviewRun, error) {
for i := range runs {
if runs[i].Status != domain.ReviewRunRunning {
continue
}
updated, err := e.store.UpdateReviewRunResult(ctx, runs[i].ID, domain.ReviewRunFailed, domain.VerdictNone, body, "")
if err != nil {
return nil, err
}
if updated {
runs[i].Status = domain.ReviewRunFailed
runs[i].Verdict = domain.VerdictNone
runs[i].Body = body
}
}
return runs, nil
}

// reviewerHarness resolves which harness reviews the worker's PR: a configured
// reviewer wins, otherwise the worker's own harness is reused (falling back to
// claude-code), per domain.ResolveReviewerHarness.
Expand Down
41 changes: 38 additions & 3 deletions backend/internal/review/review_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ type fakeLauncher struct {
spawned bool
spawnCount int
notified bool
stopped bool
stopHandle string
gotSpec LaunchSpec
gotHandle string
specs []LaunchSpec
Expand Down Expand Up @@ -171,6 +173,12 @@ func (f *fakeLauncher) Notify(_ context.Context, handleID string, spec LaunchSpe
func (f *fakeLauncher) Alive(_ context.Context, _ string) (bool, error) {
return f.alive || f.spawned, nil
}
func (f *fakeLauncher) Stop(_ context.Context, handleID string) error {
f.stopped = true
f.stopHandle = handleID
f.alive = false
return nil
}

func liveWorker() domain.SessionRecord {
return domain.SessionRecord{
Expand Down Expand Up @@ -377,7 +385,7 @@ func TestTriggerNotifiesLiveReviewerOnNewCommit(t *testing.T) {
}
}

func TestTriggerSupersedesOlderRunningRunOnNewCommit(t *testing.T) {
func TestTriggerStopsStaleReviewerBeforeReplacingOlderRunningRun(t *testing.T) {
store := &fakeStore{
review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"},
runs: []domain.ReviewRun{{ID: "run-old", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha0", Status: domain.ReviewRunRunning}},
Expand All @@ -395,8 +403,11 @@ func TestTriggerSupersedesOlderRunningRunOnNewCommit(t *testing.T) {
if old := store.runs[0]; old.ID != "run-old" || old.Status != domain.ReviewRunFailed {
t.Fatalf("expected older running run to be failed, got %+v", old)
}
if !launcher.notified || launcher.spawned {
t.Fatalf("expected live reviewer pane reused for new commit: %+v", launcher)
if !launcher.stopped || launcher.stopHandle != "review-mer-1" {
t.Fatalf("expected stale live reviewer pane stopped before replacement: %+v", launcher)
}
if !launcher.spawned || launcher.notified {
t.Fatalf("expected fresh spawn after stopping stale reviewer: %+v", launcher)
}
}

Expand Down Expand Up @@ -605,3 +616,27 @@ func TestListReturnsHandleAndRuns(t *testing.T) {
t.Fatalf("list = %+v", got)
}
}

func TestListMarksRunningRunsFailedWhenReviewerStopped(t *testing.T) {
store := &fakeStore{
review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"},
runs: []domain.ReviewRun{{
ID: "run-1", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1",
Status: domain.ReviewRunRunning, Verdict: domain.VerdictNone,
}},
}
eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, &fakeLauncher{alive: false})
got, err := eng.List(context.Background(), "mer-1")
if err != nil {
t.Fatalf("List: %v", err)
}
if len(got.Runs) != 1 || got.Runs[0].Status != domain.ReviewRunFailed {
t.Fatalf("runs = %+v, want failed running run", got.Runs)
}
if got.Reviews[0].Status == ReviewStateRunning {
t.Fatalf("review state should not remain running after stopped reviewer: %+v", got.Reviews[0])
}
if !strings.Contains(store.runs[0].Body, "reviewer stopped") {
t.Fatalf("failed body = %q", store.runs[0].Body)
}
}
2 changes: 1 addition & 1 deletion backend/internal/service/review/review.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (s *Service) deliverableRuns(ctx context.Context, workerID domain.SessionID
if run.Status != domain.ReviewRunComplete || run.Verdict != domain.VerdictChangesRequested || run.DeliveredAt != nil {
continue
}
if run.BatchID != "" && currentHeads[run.PRURL] != run.TargetSHA {
if currentHeads[run.PRURL] != run.TargetSHA {
continue
}
deliverable = append(deliverable, run)
Expand Down
33 changes: 31 additions & 2 deletions backend/internal/service/review/review_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ func (f *fakeReducer) ApplyReviewBatch(_ context.Context, _ domain.SessionID, ba

func TestSubmitPersistsThenAppliesThenStampsDelivered(t *testing.T) {
now := time.Unix(100, 0).UTC()
st := &fakeStore{ok: true, run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}}
st := &fakeStore{
ok: true,
run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning},
prs: []domain.PullRequest{{URL: "pr1", HeadSHA: "sha1"}},
}
reducer := &fakeReducer{outcome: lifecycle.ReviewDeliverySent}
svc := New(nil, st, WithLifecycleReducer(reducer), WithClock(func() time.Time { return now }))

Expand All @@ -135,6 +139,27 @@ func TestSubmitPersistsThenAppliesThenStampsDelivered(t *testing.T) {
}
}

func TestSubmitSkipsStaleChangesRequestedForSupersededHead(t *testing.T) {
st := &fakeStore{
ok: true,
run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", PRURL: "pr1", TargetSHA: "old", Status: domain.ReviewRunRunning},
prs: []domain.PullRequest{{URL: "pr1", HeadSHA: "new"}},
}
reducer := &fakeReducer{outcome: lifecycle.ReviewDeliverySent}
svc := New(nil, st, WithLifecycleReducer(reducer))

run, err := svc.Submit(context.Background(), "mer-1", "run-1", domain.VerdictChangesRequested, "stale fix", "987")
if err != nil {
t.Fatalf("Submit: %v", err)
}
if run.Status != domain.ReviewRunComplete || run.DeliveredAt != nil {
t.Fatalf("stale review should be complete but undelivered: %+v", run)
}
if reducer.calls != 0 || reducer.batchCalls != 0 || st.markCalls != 0 {
t.Fatalf("stale review should not notify or mark delivered: calls=%d batch=%d mark=%d", reducer.calls, reducer.batchCalls, st.markCalls)
}
}

func TestSubmitBatchRunDoesNotWaitForOtherRunningRuns(t *testing.T) {
now := time.Unix(100, 0).UTC()
st := &fakeStore{
Expand Down Expand Up @@ -229,7 +254,11 @@ func TestSubmitBatchApprovedOnlySendsNothing(t *testing.T) {

func TestSubmitDeliveryFailureLeavesCompletedUndeliveredForRetry(t *testing.T) {
sendErr := errors.New("dead pane")
st := &fakeStore{ok: true, run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}}
st := &fakeStore{
ok: true,
run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning},
prs: []domain.PullRequest{{URL: "pr1", HeadSHA: "sha1"}},
}
reducer := &fakeReducer{err: sendErr}
svc := New(nil, st, WithLifecycleReducer(reducer))

Expand Down
Loading