Skip to content

Commit f70339f

Browse files
authored
fix(code-reviews): recover stuck reviews (#3040)
* fix(code-reviews): recover stuck queued reviews * fix(review): avoid false cancel
1 parent 9b56707 commit f70339f

14 files changed

Lines changed: 921 additions & 31 deletions

apps/web/src/lib/code-reviews/client/code-review-worker-client.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import 'server-only';
22

33
import type { CodeReviewPayload } from '../triggers/prepare-review-payload';
44
import { CODE_REVIEW_WORKER_AUTH_TOKEN } from '@/lib/config.server';
5+
import * as z from 'zod';
56

67
// Fetch timeout in milliseconds
78
const FETCH_TIMEOUT_MS = 10000;
@@ -62,6 +63,23 @@ export type CancelReviewResponse = {
6263
reviewId: string;
6364
};
6465

66+
const ReviewStatusResponseSchema = z.object({
67+
reviewId: z.string(),
68+
status: z.enum(['queued', 'running', 'completed', 'failed', 'cancelled']),
69+
sessionId: z.string().optional(),
70+
cliSessionId: z.string().optional(),
71+
startedAt: z.string().optional(),
72+
completedAt: z.string().optional(),
73+
model: z.string().optional(),
74+
totalTokensIn: z.number().optional(),
75+
totalTokensOut: z.number().optional(),
76+
totalCost: z.number().optional(),
77+
errorMessage: z.string().optional(),
78+
terminalReason: z.string().optional(),
79+
});
80+
81+
export type ReviewStatusResponse = z.infer<typeof ReviewStatusResponseSchema>;
82+
6583
/**
6684
* Code Review Worker API Client
6785
* Handles all communication with the Cloudflare Worker for code reviews
@@ -147,6 +165,23 @@ class CodeReviewWorkerClient {
147165

148166
return response.json() as Promise<CancelReviewResponse>;
149167
}
168+
169+
async getReviewStatus(reviewId: string): Promise<ReviewStatusResponse | null> {
170+
const response = await fetchWithTimeout(`${this.baseUrl}/reviews/${reviewId}/status`, {
171+
headers: this.getHeaders(),
172+
});
173+
174+
if (response.status === 404) {
175+
return null;
176+
}
177+
178+
if (!response.ok) {
179+
const errorText = await response.text();
180+
throw new Error(`Failed to fetch review status: ${response.status} ${errorText}`);
181+
}
182+
183+
return ReviewStatusResponseSchema.parse(await response.json());
184+
}
150185
}
151186

152187
// Export a singleton instance

apps/web/src/lib/code-reviews/db/code-reviews.ts

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
microdollar_usage,
1212
microdollar_usage_metadata,
1313
} from '@kilocode/db/schema';
14-
import { eq, and, desc, count, ne, inArray, sql, sum, gte } from 'drizzle-orm';
14+
import { eq, and, desc, count, ne, inArray, sql, sum, gte, isNull } from 'drizzle-orm';
1515
import { captureException } from '@sentry/nextjs';
1616
import type { CreateReviewParams, CodeReviewStatus, ListReviewsParams, Owner } from '../core';
1717
import type { CloudAgentCodeReview } from '@kilocode/db/schema';
@@ -163,6 +163,101 @@ export async function updateCodeReviewStatus(
163163
}
164164
}
165165

166+
export async function updateCodeReviewStatusIfNonTerminal(
167+
reviewId: string,
168+
status: CodeReviewStatus,
169+
updates: {
170+
sessionId?: string;
171+
cliSessionId?: string;
172+
errorMessage?: string;
173+
terminalReason?: CodeReviewTerminalReason;
174+
startedAt?: Date;
175+
completedAt?: Date;
176+
agentVersion?: string;
177+
model?: string;
178+
totalTokensIn?: number;
179+
totalTokensOut?: number;
180+
totalCostMusd?: number;
181+
} = {}
182+
): Promise<boolean> {
183+
try {
184+
const updateData: Partial<typeof cloud_agent_code_reviews.$inferInsert> = {
185+
status,
186+
updated_at: new Date().toISOString(),
187+
};
188+
189+
if (updates.sessionId !== undefined) updateData.session_id = updates.sessionId;
190+
if (updates.cliSessionId !== undefined) updateData.cli_session_id = updates.cliSessionId;
191+
if (updates.errorMessage !== undefined) updateData.error_message = updates.errorMessage;
192+
if (updates.terminalReason !== undefined) updateData.terminal_reason = updates.terminalReason;
193+
if (updates.startedAt !== undefined) updateData.started_at = updates.startedAt.toISOString();
194+
if (updates.completedAt !== undefined) {
195+
updateData.completed_at = updates.completedAt.toISOString();
196+
}
197+
if (updates.agentVersion !== undefined) updateData.agent_version = updates.agentVersion;
198+
if (updates.model !== undefined) updateData.model = updates.model;
199+
if (updates.totalTokensIn !== undefined) updateData.total_tokens_in = updates.totalTokensIn;
200+
if (updates.totalTokensOut !== undefined) updateData.total_tokens_out = updates.totalTokensOut;
201+
if (updates.totalCostMusd !== undefined) updateData.total_cost_musd = updates.totalCostMusd;
202+
203+
if (status === 'running' && !updates.startedAt) {
204+
updateData.started_at = new Date().toISOString();
205+
}
206+
if (
207+
(status === 'completed' || status === 'failed' || status === 'cancelled') &&
208+
!updates.completedAt
209+
) {
210+
updateData.completed_at = new Date().toISOString();
211+
}
212+
213+
const updated = await db
214+
.update(cloud_agent_code_reviews)
215+
.set(updateData)
216+
.where(
217+
and(
218+
eq(cloud_agent_code_reviews.id, reviewId),
219+
inArray(cloud_agent_code_reviews.status, ['pending', 'queued', 'running'])
220+
)
221+
)
222+
.returning({ id: cloud_agent_code_reviews.id });
223+
224+
return updated.length > 0;
225+
} catch (error) {
226+
captureException(error, {
227+
tags: { operation: 'updateCodeReviewStatusIfNonTerminal' },
228+
extra: { reviewId, status, updates },
229+
});
230+
throw error;
231+
}
232+
}
233+
234+
export async function releaseQueuedReviewClaim(reviewId: string): Promise<boolean> {
235+
try {
236+
const released = await db
237+
.update(cloud_agent_code_reviews)
238+
.set({
239+
status: 'pending',
240+
updated_at: new Date().toISOString(),
241+
})
242+
.where(
243+
and(
244+
eq(cloud_agent_code_reviews.id, reviewId),
245+
eq(cloud_agent_code_reviews.status, 'queued'),
246+
isNull(cloud_agent_code_reviews.session_id)
247+
)
248+
)
249+
.returning({ id: cloud_agent_code_reviews.id });
250+
251+
return released.length > 0;
252+
} catch (error) {
253+
captureException(error, {
254+
tags: { operation: 'releaseQueuedReviewClaim' },
255+
extra: { reviewId },
256+
});
257+
throw error;
258+
}
259+
}
260+
166261
/**
167262
* Updates only usage-related columns on a code review, without touching status or timestamps.
168263
*/

apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.test.ts

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
const mockDispatchReview = jest.fn();
2+
const mockGetReviewStatus = jest.fn();
23
const mockGetAgentConfigForOwner = jest.fn();
34
const mockPrepareReviewPayload = jest.fn();
45

56
jest.mock('@/lib/code-reviews/client/code-review-worker-client', () => ({
67
codeReviewWorkerClient: {
78
dispatchReview: (...args: unknown[]) => mockDispatchReview(...args),
9+
getReviewStatus: (...args: unknown[]) => mockGetReviewStatus(...args),
810
},
911
}));
1012

@@ -58,6 +60,7 @@ describe('tryDispatchPendingReviews', () => {
5860

5961
beforeEach(() => {
6062
mockDispatchReview.mockResolvedValue(undefined);
63+
mockGetReviewStatus.mockResolvedValue(null);
6164
mockGetAgentConfigForOwner.mockResolvedValue({ id: 'test-agent-config', config: {} });
6265
mockPrepareReviewPayload.mockImplementation((params: { reviewId: string }) => ({
6366
reviewId: params.reviewId,
@@ -69,6 +72,7 @@ describe('tryDispatchPendingReviews', () => {
6972
.delete(cloud_agent_code_reviews)
7073
.where(eq(cloud_agent_code_reviews.repo_full_name, REPO));
7174
mockDispatchReview.mockReset();
75+
mockGetReviewStatus.mockReset();
7276
mockGetAgentConfigForOwner.mockReset();
7377
mockPrepareReviewPayload.mockReset();
7478
});
@@ -462,4 +466,130 @@ describe('tryDispatchPendingReviews', () => {
462466
expect.objectContaining({ reviewId: staleQueuedReview.id })
463467
);
464468
});
469+
470+
it('keeps a dispatch timeout claimed when the Worker status probe finds queued DO state', async () => {
471+
const recentTimestamp = minutesAgo(1);
472+
const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner;
473+
await setTestUserBalance(DEFAULT_TIER_BALANCE_MICRODOLLARS);
474+
mockDispatchReview.mockRejectedValue(new Error('Request timeout after 10000ms'));
475+
mockGetReviewStatus.mockResolvedValue({ reviewId: 'unused', status: 'queued' });
476+
477+
const [review] = await db
478+
.insert(cloud_agent_code_reviews)
479+
.values(
480+
reviewValues({
481+
owner,
482+
status: 'pending',
483+
createdAt: recentTimestamp,
484+
updatedAt: recentTimestamp,
485+
})
486+
)
487+
.returning({ id: cloud_agent_code_reviews.id });
488+
489+
if (!review) {
490+
throw new Error('Expected review to be inserted');
491+
}
492+
493+
const result = await tryDispatchPendingReviews({
494+
type: 'user',
495+
id: testUser.id,
496+
userId: testUser.id,
497+
});
498+
499+
const storedReview = await db.query.cloud_agent_code_reviews.findFirst({
500+
where: eq(cloud_agent_code_reviews.id, review.id),
501+
});
502+
503+
expect(result).toEqual({
504+
dispatched: 1,
505+
pending: 0,
506+
activeCount: 1,
507+
});
508+
expect(mockGetReviewStatus).toHaveBeenCalledWith(review.id);
509+
expect(storedReview?.status).toBe('queued');
510+
});
511+
512+
it('releases a dispatch timeout claim when the Worker status probe finds no DO state', async () => {
513+
const recentTimestamp = minutesAgo(1);
514+
const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner;
515+
await setTestUserBalance(DEFAULT_TIER_BALANCE_MICRODOLLARS);
516+
mockDispatchReview.mockRejectedValue(new Error('Request timeout after 10000ms'));
517+
mockGetReviewStatus.mockResolvedValue(null);
518+
519+
const [review] = await db
520+
.insert(cloud_agent_code_reviews)
521+
.values(
522+
reviewValues({
523+
owner,
524+
status: 'pending',
525+
createdAt: recentTimestamp,
526+
updatedAt: recentTimestamp,
527+
})
528+
)
529+
.returning({ id: cloud_agent_code_reviews.id });
530+
531+
if (!review) {
532+
throw new Error('Expected review to be inserted');
533+
}
534+
535+
const result = await tryDispatchPendingReviews({
536+
type: 'user',
537+
id: testUser.id,
538+
userId: testUser.id,
539+
});
540+
541+
const storedReview = await db.query.cloud_agent_code_reviews.findFirst({
542+
where: eq(cloud_agent_code_reviews.id, review.id),
543+
});
544+
545+
expect(result).toEqual({
546+
dispatched: 0,
547+
pending: 1,
548+
activeCount: 0,
549+
});
550+
expect(mockGetReviewStatus).toHaveBeenCalledWith(review.id);
551+
expect(storedReview?.status).toBe('pending');
552+
});
553+
554+
it('keeps a dispatch timeout claim when the Worker status probe also fails', async () => {
555+
const recentTimestamp = minutesAgo(1);
556+
const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner;
557+
await setTestUserBalance(DEFAULT_TIER_BALANCE_MICRODOLLARS);
558+
mockDispatchReview.mockRejectedValue(new Error('Request timeout after 10000ms'));
559+
mockGetReviewStatus.mockRejectedValue(new Error('status probe timeout'));
560+
561+
const [review] = await db
562+
.insert(cloud_agent_code_reviews)
563+
.values(
564+
reviewValues({
565+
owner,
566+
status: 'pending',
567+
createdAt: recentTimestamp,
568+
updatedAt: recentTimestamp,
569+
})
570+
)
571+
.returning({ id: cloud_agent_code_reviews.id });
572+
573+
if (!review) {
574+
throw new Error('Expected review to be inserted');
575+
}
576+
577+
const result = await tryDispatchPendingReviews({
578+
type: 'user',
579+
id: testUser.id,
580+
userId: testUser.id,
581+
});
582+
583+
const storedReview = await db.query.cloud_agent_code_reviews.findFirst({
584+
where: eq(cloud_agent_code_reviews.id, review.id),
585+
});
586+
587+
expect(result).toEqual({
588+
dispatched: 0,
589+
pending: 1,
590+
activeCount: 0,
591+
});
592+
expect(mockGetReviewStatus).toHaveBeenCalledWith(review.id);
593+
expect(storedReview?.status).toBe('queued');
594+
});
465595
});

0 commit comments

Comments
 (0)