Skip to content

Commit 7c2d1b4

Browse files
committed
fix(security-agent): settle callbacks and stale queue rows
1 parent 28eec6e commit 7c2d1b4

15 files changed

Lines changed: 1470 additions & 71 deletions

File tree

apps/web/src/app/api/internal/security-analysis-callback/[findingId]/route.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import {
3030
DEFAULT_SECURITY_AGENT_TRIAGE_MODEL,
3131
} from '@/lib/security-agent/core/constants';
3232

33+
// Compatibility-only callback ingress retained for explicit rollback routing.
34+
// Durable default ingress lives in the security-auto-analysis Worker.
3335
const log = sentryLogger('security-agent:callback', 'info');
3436
const warn = sentryLogger('security-agent:callback', 'warning');
3537
const logError = sentryLogger('security-agent:callback', 'error');

services/security-auto-analysis/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pnpm --filter cloudflare-security-auto-analysis exec wrangler queues list
6565
- `pending` is stale after 15 minutes
6666
- `running` is stale after 2 hours
6767

68-
> **Note:** The dispatcher reconciles stale rows before enqueueing due owners: stale `pending` rows return to `queued`, and stale `running` rows become terminal `failed` rows with `RUN_LOST`. Diagnostic queries below remain useful for verification and incident review.
68+
> **Note:** The dispatcher reconciles stale rows before enqueueing due owners: queue rows first heal to already-advanced finding states, remaining stale `pending` rows return to `queued`, and stale `running` rows become terminal `failed` rows with `RUN_LOST` only while the finding still reports `running`. Diagnostic queries below remain useful for verification and incident review.
6969
7070
### Failure codes
7171

@@ -205,8 +205,8 @@ Do not clear the block until credits are restored. After top-up, clear the block
205205

206206
**Callback routing:**
207207

208-
- `SECURITY_ANALYSIS_CALLBACK_ROUTING_MODE=worker` targets `${SECURITY_ANALYSIS_CALLBACK_WORKER_BASE_URL}/internal/security-analysis-callback/:findingId`; base URL must be reachable from `cloud-agent-next`.
209-
- `SECURITY_ANALYSIS_CALLBACK_ROUTING_MODE=web` targets `${SECURITY_ANALYSIS_CALLBACK_WEB_BASE_URL}/api/internal/security-analysis-callback/:findingId`; this is default callback path and keeps `cloud-agent-next` domain-blind.
208+
- `SECURITY_ANALYSIS_CALLBACK_ROUTING_MODE=worker` is the default and targets `${SECURITY_ANALYSIS_CALLBACK_WORKER_BASE_URL}/internal/security-analysis-callback/:findingId`; base URL must be reachable from `cloud-agent-next`. Worker ingress validates, enqueues callback finalization, then returns `202`.
209+
- `SECURITY_ANALYSIS_CALLBACK_ROUTING_MODE=web` targets `${SECURITY_ANALYSIS_CALLBACK_WEB_BASE_URL}/api/internal/security-analysis-callback/:findingId`; this is compatibility-only rollback routing while legacy callback traffic drains, not the durable default.
210210

211211
**Owner-scoped stop** (surgical):
212212

services/security-auto-analysis/src/analysis-start-lifecycle.integration.test.ts

Lines changed: 206 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ import { randomUUID } from 'crypto';
33
import { createDrizzleClient } from '@kilocode/db/client';
44
import { kilocode_users, security_analysis_queue, security_findings } from '@kilocode/db/schema';
55
import { eq, inArray } from 'drizzle-orm';
6-
import { transitionAnalysisStartLifecycle } from './analysis-start-lifecycle.js';
6+
import {
7+
transitionAnalysisCallbackLifecycle,
8+
transitionAnalysisStartLifecycle,
9+
} from './analysis-start-lifecycle.js';
710
import type { SecurityFindingAnalysis } from './types.js';
811

912
const connectionString =
@@ -82,6 +85,208 @@ describe('analysis start lifecycle durable transitions', () => {
8285
expect(queueRows).toEqual([{ status: 'completed' }]);
8386
});
8487

88+
it('terminalizes completed callbacks with queue and finding state settled together', async () => {
89+
const findingId = await insertFinding('callback-completed', 'running');
90+
await insertQueueClaim({
91+
findingId,
92+
claimToken: 'callback-completed-claim',
93+
jobId: 'callback-completed-job',
94+
queueStatus: 'running',
95+
});
96+
const analysis = createAnalysis('callback-completed');
97+
98+
await expect(
99+
transitionAnalysisCallbackLifecycle(client.db as never, {
100+
findingId,
101+
outcome: {
102+
type: 'completed',
103+
analysis,
104+
},
105+
})
106+
).resolves.toEqual({ status: 'completed' });
107+
108+
const findingRows = await client.db
109+
.select({
110+
analysisStatus: security_findings.analysis_status,
111+
analysis: security_findings.analysis,
112+
})
113+
.from(security_findings)
114+
.where(eq(security_findings.id, findingId));
115+
expect(findingRows).toEqual([
116+
expect.objectContaining({
117+
analysisStatus: 'completed',
118+
analysis: expect.objectContaining({ correlationId: analysis.correlationId }),
119+
}),
120+
]);
121+
122+
const queueRows = await client.db
123+
.select({
124+
status: security_analysis_queue.queue_status,
125+
failureCode: security_analysis_queue.failure_code,
126+
})
127+
.from(security_analysis_queue)
128+
.where(eq(security_analysis_queue.finding_id, findingId));
129+
expect(queueRows).toEqual([{ status: 'completed', failureCode: null }]);
130+
});
131+
132+
it('terminalizes failed callbacks with queue and finding failure state settled together', async () => {
133+
const findingId = await insertFinding('callback-failed', 'running');
134+
await insertQueueClaim({
135+
findingId,
136+
claimToken: 'callback-failed-claim',
137+
jobId: 'callback-failed-job',
138+
queueStatus: 'running',
139+
});
140+
141+
await expect(
142+
transitionAnalysisCallbackLifecycle(client.db as never, {
143+
findingId,
144+
outcome: {
145+
type: 'failed',
146+
errorMessage: 'upstream 503',
147+
failureCode: 'UPSTREAM_5XX',
148+
},
149+
})
150+
).resolves.toEqual({ status: 'failed' });
151+
152+
const findingRows = await client.db
153+
.select({
154+
analysisStatus: security_findings.analysis_status,
155+
analysisError: security_findings.analysis_error,
156+
})
157+
.from(security_findings)
158+
.where(eq(security_findings.id, findingId));
159+
expect(findingRows).toEqual([{ analysisStatus: 'failed', analysisError: 'upstream 503' }]);
160+
161+
const queueRows = await client.db
162+
.select({
163+
status: security_analysis_queue.queue_status,
164+
failureCode: security_analysis_queue.failure_code,
165+
lastError: security_analysis_queue.last_error_redacted,
166+
})
167+
.from(security_analysis_queue)
168+
.where(eq(security_analysis_queue.finding_id, findingId));
169+
expect(queueRows).toEqual([
170+
{ status: 'failed', failureCode: 'UPSTREAM_5XX', lastError: 'upstream 503' },
171+
]);
172+
});
173+
174+
it('clears superseded callback capacity while settling its queue row', async () => {
175+
const findingId = await insertFinding('callback-superseded', 'running');
176+
await client.db
177+
.update(security_findings)
178+
.set({ ignored_reason: 'superseded:canonical-finding' })
179+
.where(eq(security_findings.id, findingId));
180+
await insertQueueClaim({
181+
findingId,
182+
claimToken: 'callback-superseded-claim',
183+
jobId: 'callback-superseded-job',
184+
queueStatus: 'running',
185+
});
186+
187+
await expect(
188+
transitionAnalysisCallbackLifecycle(client.db as never, {
189+
findingId,
190+
outcome: { type: 'superseded' },
191+
})
192+
).resolves.toEqual({ status: 'superseded' });
193+
194+
const findingRows = await client.db
195+
.select({ analysisStatus: security_findings.analysis_status })
196+
.from(security_findings)
197+
.where(eq(security_findings.id, findingId));
198+
expect(findingRows).toEqual([{ analysisStatus: null }]);
199+
200+
const queueRows = await client.db
201+
.select({
202+
status: security_analysis_queue.queue_status,
203+
failureCode: security_analysis_queue.failure_code,
204+
})
205+
.from(security_analysis_queue)
206+
.where(eq(security_analysis_queue.finding_id, findingId));
207+
expect(queueRows).toEqual([{ status: 'completed', failureCode: 'SKIPPED_NO_LONGER_ELIGIBLE' }]);
208+
});
209+
210+
it('settles completion races that find the callback superseded at terminal write time', async () => {
211+
const findingId = await insertFinding('callback-superseded-completion-race', 'running');
212+
await client.db
213+
.update(security_findings)
214+
.set({ ignored_reason: 'superseded:replacement-finding' })
215+
.where(eq(security_findings.id, findingId));
216+
await insertQueueClaim({
217+
findingId,
218+
claimToken: 'callback-superseded-completion-race-claim',
219+
jobId: 'callback-superseded-completion-race-job',
220+
queueStatus: 'running',
221+
});
222+
223+
await expect(
224+
transitionAnalysisCallbackLifecycle(client.db as never, {
225+
findingId,
226+
outcome: {
227+
type: 'completed',
228+
analysis: createAnalysis('callback-superseded-completion-race'),
229+
},
230+
})
231+
).resolves.toEqual({ status: 'superseded' });
232+
233+
const findingRows = await client.db
234+
.select({ analysisStatus: security_findings.analysis_status })
235+
.from(security_findings)
236+
.where(eq(security_findings.id, findingId));
237+
expect(findingRows).toEqual([{ analysisStatus: null }]);
238+
239+
const queueRows = await client.db
240+
.select({
241+
status: security_analysis_queue.queue_status,
242+
failureCode: security_analysis_queue.failure_code,
243+
})
244+
.from(security_analysis_queue)
245+
.where(eq(security_analysis_queue.finding_id, findingId));
246+
expect(queueRows).toEqual([{ status: 'completed', failureCode: 'SKIPPED_NO_LONGER_ELIGIBLE' }]);
247+
});
248+
249+
it('heals stale running queue state on retried already-terminal completed callbacks', async () => {
250+
const findingId = await insertFinding('callback-partial-completion', 'running');
251+
await client.db
252+
.update(security_findings)
253+
.set({ analysis_status: 'completed' })
254+
.where(eq(security_findings.id, findingId));
255+
await insertQueueClaim({
256+
findingId,
257+
claimToken: 'callback-partial-completion-claim',
258+
jobId: 'callback-partial-completion-job',
259+
queueStatus: 'running',
260+
});
261+
262+
await expect(
263+
transitionAnalysisCallbackLifecycle(client.db as never, {
264+
findingId,
265+
outcome: {
266+
type: 'already-terminal',
267+
findingStatus: 'completed',
268+
failureCode: null,
269+
errorMessage: null,
270+
},
271+
})
272+
).resolves.toEqual({ status: 'already-terminal' });
273+
274+
const findingRows = await client.db
275+
.select({ analysisStatus: security_findings.analysis_status })
276+
.from(security_findings)
277+
.where(eq(security_findings.id, findingId));
278+
expect(findingRows).toEqual([{ analysisStatus: 'completed' }]);
279+
280+
const queueRows = await client.db
281+
.select({
282+
status: security_analysis_queue.queue_status,
283+
failureCode: security_analysis_queue.failure_code,
284+
})
285+
.from(security_analysis_queue)
286+
.where(eq(security_analysis_queue.finding_id, findingId));
287+
expect(queueRows).toEqual([{ status: 'completed', failureCode: null }]);
288+
});
289+
85290
it('promotes scheduled sandbox starts to running without leaving the queue pending', async () => {
86291
const findingId = await insertFinding('scheduled-sandbox-running');
87292
const queueRowId = await insertQueueClaim({

0 commit comments

Comments
 (0)