Skip to content

Commit d9db303

Browse files
feat(security-agent): move manual workflows into workers (#3312)
Co-authored-by: kiloconnect[bot] <240665456+kiloconnect[bot]@users.noreply.github.com>
1 parent 46563ad commit d9db303

83 files changed

Lines changed: 9522 additions & 3126 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/web/.env.development.local.example

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ AUTO_FIX_URL=http://localhost:8792
1616
# @url cloudflare-auto-triage-infra
1717
AUTO_TRIAGE_URL=http://localhost:8791
1818

19+
# @url cloudflare-security-sync
20+
SECURITY_SYNC_WORKER_URL=http://localhost:8812
21+
22+
# @url cloudflare-security-auto-analysis
23+
SECURITY_AUTO_ANALYSIS_WORKER_URL=http://localhost:8797
24+
1925
# @url cloudflare-app-builder
2026
APP_BUILDER_URL=http://localhost:8790
2127

apps/web/src/app/api/internal/security-analysis-callback/[findingId]/route.test.ts

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,18 @@ jest.mock('@/lib/utils.server', () => ({
105105
}));
106106

107107
jest.mock('@/lib/drizzle', () => {
108+
let selectedTable: unknown;
108109
const chain = {
109-
from: jest.fn().mockReturnThis(),
110+
from: jest.fn((table: unknown) => {
111+
selectedTable = table;
112+
return chain;
113+
}),
110114
where: jest.fn().mockReturnThis(),
111-
limit: jest.fn(() => mockDbSelect()),
115+
limit: jest.fn(() =>
116+
(selectedTable as { __name?: string } | undefined)?.__name === 'security_analysis_queue'
117+
? Promise.resolve([{ claimToken: 'attempt-token-123' }])
118+
: mockDbSelect()
119+
),
112120
};
113121
return {
114122
db: {
@@ -119,16 +127,25 @@ jest.mock('@/lib/drizzle', () => {
119127

120128
jest.mock('@kilocode/db/schema', () => ({
121129
kilocode_users: { id: 'id' },
130+
security_analysis_queue: {
131+
__name: 'security_analysis_queue',
132+
claim_token: 'claim_token',
133+
finding_id: 'finding_id',
134+
queue_status: 'queue_status',
135+
},
122136
}));
123137

124138
jest.mock('drizzle-orm', () => ({
139+
and: jest.fn(),
125140
eq: jest.fn(),
141+
inArray: jest.fn(),
126142
}));
127143

128144
// --- Helpers ---
129145

130146
const CALLBACK_SECRET = 'test-callback-token-secret';
131147
const FINDING_ID = 'finding-abc-123';
148+
const ATTEMPT_TOKEN = 'attempt-token-123';
132149
let defaultCallbackToken: string;
133150

134151
function makeRequest(
@@ -137,6 +154,9 @@ function makeRequest(
137154
callbackToken: string | null = defaultCallbackToken
138155
): NextRequest {
139156
return {
157+
nextUrl: new URL(
158+
`https://app.kilo.ai/api/internal/security-analysis-callback/${findingId}?attempt=${ATTEMPT_TOKEN}`
159+
),
140160
headers: {
141161
get: (name: string) => {
142162
if (name === 'X-Callback-Token') return callbackToken;
@@ -248,7 +268,7 @@ beforeEach(async () => {
248268
defaultCallbackToken = await deriveCallbackToken({
249269
secret: CALLBACK_SECRET,
250270
scope: 'security-analysis-callback',
251-
resourceParts: [FINDING_ID],
271+
resourceParts: [FINDING_ID, ATTEMPT_TOKEN],
252272
});
253273
mockUpdateAnalysisStatus.mockResolvedValue(true);
254274
mockTransitionAutoAnalysisQueueFromCallback.mockResolvedValue(undefined);
@@ -292,7 +312,7 @@ describe('POST /api/internal/security-analysis-callback/[findingId]', () => {
292312
const callbackToken = await deriveCallbackToken({
293313
secret: CALLBACK_SECRET,
294314
scope: 'security-analysis-callback',
295-
resourceParts: ['different-finding'],
315+
resourceParts: ['different-finding', ATTEMPT_TOKEN],
296316
});
297317
const req = makeRequest(FINDING_ID, completedPayload, callbackToken);
298318
const response = await POST(req, makeParams(FINDING_ID));
@@ -366,6 +386,7 @@ describe('POST /api/internal/security-analysis-callback/[findingId]', () => {
366386
expect(body.message).toBe('Superseded finding ignored');
367387
expect(mockTransitionAutoAnalysisQueueFromCallback).toHaveBeenCalledWith({
368388
findingId: FINDING_ID,
389+
attemptToken: ATTEMPT_TOKEN,
369390
toStatus: 'completed',
370391
failureCode: 'SKIPPED_NO_LONGER_ELIGIBLE',
371392
});

apps/web/src/app/api/internal/security-analysis-callback/[findingId]/route.ts

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ import { fetchSessionSnapshot } from '@/lib/session-ingest-client';
1717
import { trackSecurityAgentAnalysisCompleted } from '@/lib/security-agent/posthog-tracking';
1818
import { generateApiToken } from '@/lib/tokens';
1919
import { db } from '@/lib/drizzle';
20-
import { kilocode_users } from '@kilocode/db/schema';
21-
import { eq } from 'drizzle-orm';
20+
import { kilocode_users, security_analysis_queue } from '@kilocode/db/schema';
21+
import { and, eq, inArray } from 'drizzle-orm';
2222
import { z } from 'zod';
2323
import { verifyCallbackToken } from '@kilocode/worker-utils/callback-token';
2424
import { logExceptInTest, sentryLogger } from '@/lib/utils.server';
@@ -32,6 +32,8 @@ import {
3232
DEFAULT_SECURITY_AGENT_TRIAGE_MODEL,
3333
} from '@/lib/security-agent/core/constants';
3434

35+
// Compatibility-only callback ingress retained for explicit rollback routing.
36+
// Durable default ingress lives in the security-auto-analysis Worker.
3537
const log = sentryLogger('security-agent:callback', 'info');
3638
const warn = sentryLogger('security-agent:callback', 'warning');
3739
const logError = sentryLogger('security-agent:callback', 'error');
@@ -82,14 +84,18 @@ export async function POST(
8284
) {
8385
try {
8486
const { findingId } = await params;
87+
const attemptToken = req.nextUrl.searchParams.get('attempt');
88+
if (!attemptToken) {
89+
return NextResponse.json({ error: 'Missing callback attempt token' }, { status: 400 });
90+
}
8591
const callbackToken = req.headers.get('X-Callback-Token');
8692
const validCallbackToken =
8793
!!CALLBACK_TOKEN_SECRET &&
8894
(await verifyCallbackToken({
8995
token: callbackToken,
9096
secret: CALLBACK_TOKEN_SECRET,
9197
scope: 'security-analysis-callback',
92-
resourceParts: [findingId],
98+
resourceParts: [findingId, attemptToken],
9399
}));
94100
if (!validCallbackToken) {
95101
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
@@ -117,6 +123,28 @@ export async function POST(
117123
return NextResponse.json({ error: 'Finding not found' }, { status: 404 });
118124
}
119125

126+
const [activeAttempt] = await db
127+
.select({ claimToken: security_analysis_queue.claim_token })
128+
.from(security_analysis_queue)
129+
.where(
130+
and(
131+
eq(security_analysis_queue.finding_id, findingId),
132+
inArray(security_analysis_queue.queue_status, ['pending', 'running'])
133+
)
134+
)
135+
.limit(1);
136+
if (
137+
(finding.analysis_status === 'pending' || finding.analysis_status === 'running') &&
138+
activeAttempt?.claimToken !== attemptToken
139+
) {
140+
warn('Ignoring stale auto-analysis callback due to attempt mismatch', {
141+
findingId,
142+
callbackAttemptToken: attemptToken,
143+
activeAttemptToken: activeAttempt?.claimToken ?? null,
144+
});
145+
return NextResponse.json({ success: true, message: 'Stale callback ignored' });
146+
}
147+
120148
const sessionMismatch =
121149
(payload.cloudAgentSessionId &&
122150
finding.session_id &&
@@ -158,6 +186,7 @@ export async function POST(
158186
});
159187
await transitionAutoAnalysisQueueFromCallback({
160188
findingId,
189+
attemptToken,
161190
toStatus: 'completed',
162191
failureCode: 'SKIPPED_NO_LONGER_ELIGIBLE',
163192
});
@@ -184,9 +213,9 @@ export async function POST(
184213
after(async () => {
185214
try {
186215
if (payload.status === 'completed') {
187-
await handleAnalysisCompleted(findingId, payload, finding);
216+
await handleAnalysisCompleted(findingId, attemptToken, payload, finding);
188217
} else if (payload.status === 'failed' || payload.status === 'interrupted') {
189-
await handleAnalysisFailed(findingId, payload, finding);
218+
await handleAnalysisFailed(findingId, attemptToken, payload, finding);
190219
} else {
191220
const unknownStatus = payload.status as string;
192221
logError('Unknown callback status received, marking as failed', {
@@ -202,6 +231,7 @@ export async function POST(
202231
}
203232
await transitionAutoAnalysisQueueFromCallback({
204233
findingId,
234+
attemptToken,
205235
toStatus: 'failed',
206236
failureCode: 'STATE_GUARD_REJECTED',
207237
errorMessage: `Unknown callback status: ${unknownStatus}`,
@@ -255,6 +285,7 @@ function readAnalysisContext(analysis: SecurityFindingAnalysis | null | undefine
255285

256286
async function handleAnalysisCompleted(
257287
findingId: string,
288+
attemptToken: string,
258289
payload: ExecutionCallbackPayload,
259290
finding: Awaited<ReturnType<typeof getSecurityFindingById>> & {}
260291
) {
@@ -281,6 +312,7 @@ async function handleAnalysisCompleted(
281312
}
282313
await transitionAutoAnalysisQueueFromCallback({
283314
findingId,
315+
attemptToken,
284316
toStatus: 'failed',
285317
failureCode: 'STATE_GUARD_REJECTED',
286318
errorMessage: 'Cannot process callback — triggeredByUserId missing from analysis context',
@@ -300,6 +332,7 @@ async function handleAnalysisCompleted(
300332
}
301333
await transitionAutoAnalysisQueueFromCallback({
302334
findingId,
335+
attemptToken,
303336
toStatus: 'failed',
304337
failureCode: 'STATE_GUARD_REJECTED',
305338
errorMessage: 'Callback missing kiloSessionId — cannot retrieve analysis result',
@@ -365,6 +398,7 @@ async function handleAnalysisCompleted(
365398
}
366399
await transitionAutoAnalysisQueueFromCallback({
367400
findingId,
401+
attemptToken,
368402
toStatus: 'failed',
369403
failureCode: 'START_CALL_AMBIGUOUS',
370404
errorMessage: 'Analysis completed but result could not be retrieved from ingest service',
@@ -404,6 +438,7 @@ async function handleAnalysisCompleted(
404438
}
405439
await transitionAutoAnalysisQueueFromCallback({
406440
findingId,
441+
attemptToken,
407442
toStatus: 'failed',
408443
failureCode: 'STATE_GUARD_REJECTED',
409444
errorMessage: `User ${triggeredByUserId} not found — cannot run Tier 3 extraction`,
@@ -449,6 +484,7 @@ async function handleAnalysisCompleted(
449484
});
450485
await transitionAutoAnalysisQueueFromCallback({
451486
findingId,
487+
attemptToken,
452488
toStatus: 'failed',
453489
failureCode: 'START_CALL_AMBIGUOUS',
454490
errorMessage: error instanceof Error ? error.message : String(error),
@@ -458,17 +494,23 @@ async function handleAnalysisCompleted(
458494

459495
const updatedFinding = await getSecurityFindingById(findingId);
460496
if (updatedFinding?.analysis_status === 'completed') {
461-
await transitionAutoAnalysisQueueFromCallback({ findingId, toStatus: 'completed' });
497+
await transitionAutoAnalysisQueueFromCallback({
498+
findingId,
499+
attemptToken,
500+
toStatus: 'completed',
501+
});
462502
} else if (updatedFinding?.analysis_status === 'failed') {
463503
await transitionAutoAnalysisQueueFromCallback({
464504
findingId,
505+
attemptToken,
465506
toStatus: 'failed',
466507
failureCode: 'START_CALL_AMBIGUOUS',
467508
errorMessage: updatedFinding.analysis_error ?? undefined,
468509
});
469510
} else {
470511
await transitionAutoAnalysisQueueFromCallback({
471512
findingId,
513+
attemptToken,
472514
toStatus: 'failed',
473515
failureCode: 'STATE_GUARD_REJECTED',
474516
errorMessage: `Unexpected post-finalize state: ${updatedFinding?.analysis_status ?? 'finding_not_found'}`,
@@ -478,6 +520,7 @@ async function handleAnalysisCompleted(
478520

479521
async function handleAnalysisFailed(
480522
findingId: string,
523+
attemptToken: string,
481524
payload: ExecutionCallbackPayload,
482525
finding: Awaited<ReturnType<typeof getSecurityFindingById>> & {}
483526
) {
@@ -521,6 +564,7 @@ async function handleAnalysisFailed(
521564
}
522565
await transitionAutoAnalysisQueueFromCallback({
523566
findingId,
567+
attemptToken,
524568
toStatus: 'failed',
525569
failureCode: callbackFailure.failureCode,
526570
errorMessage,

apps/web/src/components/security-agent/FindingDetailDialog.tsx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import type { SecurityFinding } from '@kilocode/db/schema';
2929
import { useTRPC } from '@/lib/trpc/utils';
3030
import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query';
3131
import Link from 'next/link';
32+
import { toast } from 'sonner';
33+
import { manualAnalysisAdmissionCopy } from './manual-analysis-admission-copy';
3234

3335
type Severity = 'critical' | 'high' | 'medium' | 'low';
3436

@@ -106,6 +108,7 @@ export function FindingDetailDialog({
106108
const startOrgAnalysisMutation = useMutation(
107109
trpc.organizations.securityAgent.startAnalysis.mutationOptions({
108110
onSuccess: async () => {
111+
toast.success(manualAnalysisAdmissionCopy.successTitle);
109112
await queryClient.invalidateQueries();
110113
},
111114
})
@@ -115,6 +118,7 @@ export function FindingDetailDialog({
115118
const startUserAnalysisMutation = useMutation(
116119
trpc.securityAgent.startAnalysis.mutationOptions({
117120
onSuccess: async () => {
121+
toast.success(manualAnalysisAdmissionCopy.successTitle);
118122
await queryClient.invalidateQueries();
119123
},
120124
})

0 commit comments

Comments
 (0)