From 33df516db0f907e9fabb318f800852522db609e6 Mon Sep 17 00:00:00 2001 From: Evgeny Shurakov Date: Tue, 2 Jun 2026 15:50:03 +0200 Subject: [PATCH] feat(cloud-agent-next): add exact message result polling --- services/cloud-agent-next/README.md | 23 + .../src/persistence/CloudAgentSession.ts | 39 ++ services/cloud-agent-next/src/router.test.ts | 106 ++++ .../src/router/handlers/session-management.ts | 39 ++ .../src/router/schemas.test.ts | 107 ++++ .../cloud-agent-next/src/router/schemas.ts | 95 ++++ .../src/session/message-result.test.ts | 219 ++++++++ .../src/session/message-result.ts | 116 ++++ .../src/session/queries/events.ts | 39 ++ .../src/session/session-message-state.ts | 144 +++-- .../session/message-result.test.ts | 516 ++++++++++++++++++ 11 files changed, 1384 insertions(+), 59 deletions(-) create mode 100644 services/cloud-agent-next/src/session/message-result.test.ts create mode 100644 services/cloud-agent-next/src/session/message-result.ts create mode 100644 services/cloud-agent-next/test/integration/session/message-result.test.ts diff --git a/services/cloud-agent-next/README.md b/services/cloud-agent-next/README.md index 7ce35ee1a9..b823644133 100644 --- a/services/cloud-agent-next/README.md +++ b/services/cloud-agent-next/README.md @@ -56,6 +56,7 @@ The recommended V2 flow is: - `prepareSession` - Create a fully prepared session with workspace, git clone, and configuration - `updateSession` - Update a prepared (not yet initiated) session - `getSession` - Query session metadata (no secrets) +- `getMessageResult` - Poll lifecycle state and terminal assistant text for one submitted turn - `initiateFromKilocodeSessionV2` - Start execution on a prepared session - `sendMessageV2` - Send follow-up messages (output via `/stream` WebSocket) - `deleteSession` - Delete a session and clean up resources @@ -67,6 +68,28 @@ The recommended V2 flow is: All endpoints require a kilocode api token except `/stream` which uses short lived ws tickets. +### Message Result Retrieval + +Use the bearer-protected `GET /trpc/getMessageResult` query to poll one durably submitted Cloud Agent turn. Supply `cloudAgentSessionId` and the submitted `messageId`. + +```text +GET /trpc/getMessageResult?input={"cloudAgentSessionId":"agent_","messageId":"msg_"} +Authorization: Bearer +``` + +The response includes only safe fields: `cloudAgentSessionId`, `messageId`, lifecycle status and timestamps, optional structured `completionSource`, `failure`, `gateResult`, and assistant text correlated to the selected submitted turn. It never returns prompts, tokens, callback details, or raw diagnostics. + +| Stored lifecycle state | Public status | +|---|---| +| `queued` | `queued` | +| `accepted` | `running` | +| `completed` | `completed` | +| `failed` | `failed` | +| `interrupted` | `interrupted` | +| Pending-only compatibility row | `queued` | + +The query returns `NOT_FOUND` for missing sessions, cross-user session lookups, and unknown message IDs. + ## Usage Examples ### TypeScript Client (Recommended) diff --git a/services/cloud-agent-next/src/persistence/CloudAgentSession.ts b/services/cloud-agent-next/src/persistence/CloudAgentSession.ts index e3f6ae6318..9f6c92ecf0 100644 --- a/services/cloud-agent-next/src/persistence/CloudAgentSession.ts +++ b/services/cloud-agent-next/src/persistence/CloudAgentSession.ts @@ -108,6 +108,10 @@ import { createMessageSettlementOutbox, type MessageSettlementOutbox, } from '../session/message-settlement-outbox.js'; +import { + resolveSessionMessageResult, + type MessageResultRPCResponse, +} from '../session/message-result.js'; import { createAgentRuntime, type AgentRuntime, @@ -979,6 +983,41 @@ export class CloudAgentSession extends DurableObject { return this.eventQueries.getLatestAssistantMessage(sessionId, metadata.auth.kiloSessionId); } + async getMessageResult(messageId: string): Promise { + const metadata = await this.getMetadata(); + if (!metadata) return { type: 'session-not-found' }; + + const resolved = await resolveSessionMessageResult(this.ctx.storage, messageId); + if (!resolved) return { type: 'message-not-found' }; + if (resolved.type === 'state-invalid') return resolved; + + const sessionId = await this.requireSessionId(); + const assistantMessage = + metadata.auth.kiloSessionId && resolved.assistantLookup + ? this.eventQueries.getAssistantMessageById( + sessionId, + metadata.auth.kiloSessionId, + resolved.assistantLookup.messageId, + resolved.assistantLookup.parentMessageId + ) + : null; + const assistant = assistantMessage + ? { + messageId: assistantMessage.info.id, + text: extractAssistantTextFromParts(assistantMessage.parts) || undefined, + } + : undefined; + + return { + type: 'found', + result: { + cloudAgentSessionId: sessionId, + ...resolved.result, + ...(assistant ? { assistant } : {}), + }, + }; + } + private async getLatestAssistantMessageText(): Promise { try { const message = await this.getLatestAssistantMessage(); diff --git a/services/cloud-agent-next/src/router.test.ts b/services/cloud-agent-next/src/router.test.ts index 163ace1f3e..e698c9eade 100644 --- a/services/cloud-agent-next/src/router.test.ts +++ b/services/cloud-agent-next/src/router.test.ts @@ -91,6 +91,7 @@ type MockSessionStub = { getActiveExecutionId?: ReturnType; getExecution?: ReturnType; getLatestAssistantMessage?: ReturnType; + getMessageResult?: ReturnType; createTerminal?: ReturnType; resizeTerminal?: ReturnType; closeTerminal?: ReturnType; @@ -1505,6 +1506,111 @@ describe('router sessionId validation', () => { }); }); +describe('getMessageResult procedure', () => { + const sessionId: SessionId = 'agent_12345678-1234-1234-1234-123456789abc'; + const messageId = 'msg_018f1e2d3c4bAbCdEfGhIjKlMn'; + let mockContext: TRPCContext; + let caller: ReturnType; + let cloudAgentSession: MockCAS; + let mockGetMessageResult: ReturnType; + + beforeEach(() => { + vi.clearAllMocks(); + mockGetMessageResult = vi.fn().mockResolvedValue({ + type: 'found', + result: { + cloudAgentSessionId: sessionId, + messageId, + status: 'completed', + createdAt: 1, + terminalAt: 2, + assistant: { messageId: 'assistant_done', text: 'done' }, + }, + }); + mockContext = { + userId: 'test-user-123', + authToken: 'test-token', + botId: undefined, + request: {} as Request, + env: { + CLOUD_AGENT_SESSION: { + idFromName: vi.fn((id: string) => ({ id })), + get: vi.fn(() => ({ getMessageResult: mockGetMessageResult })), + } as unknown as TRPCContext['env']['CLOUD_AGENT_SESSION'], + } as unknown as TRPCContext['env'], + }; + cloudAgentSession = mockContext.env.CLOUD_AGENT_SESSION as unknown as MockCAS; + caller = appRouter.createCaller(mockContext); + }); + + it('returns an ownership-isolated safe exact message result with one Durable Object RPC', async () => { + await expect( + caller.getMessageResult({ cloudAgentSessionId: sessionId, messageId }) + ).resolves.toEqual({ + cloudAgentSessionId: sessionId, + messageId, + status: 'completed', + createdAt: 1, + terminalAt: 2, + assistant: { messageId: 'assistant_done', text: 'done' }, + }); + expect(cloudAgentSession.idFromName).toHaveBeenCalledWith(`test-user-123:${sessionId}`); + expect(mockGetMessageResult).toHaveBeenCalledOnce(); + expect(mockGetMessageResult).toHaveBeenCalledWith(messageId); + }); + + it('returns Session not found when the Durable Object has no metadata', async () => { + mockGetMessageResult.mockResolvedValue({ type: 'session-not-found' }); + await expect( + caller.getMessageResult({ cloudAgentSessionId: sessionId, messageId }) + ).rejects.toMatchObject({ code: 'NOT_FOUND', message: 'Session not found' }); + }); + + it('returns Message not found for an unknown message ID', async () => { + mockGetMessageResult.mockResolvedValue({ type: 'message-not-found' }); + await expect( + caller.getMessageResult({ cloudAgentSessionId: sessionId, messageId }) + ).rejects.toMatchObject({ code: 'NOT_FOUND', message: 'Message not found' }); + }); + + it('fails closed when persisted message state is invalid', async () => { + mockGetMessageResult.mockResolvedValue({ type: 'state-invalid' }); + await expect( + caller.getMessageResult({ cloudAgentSessionId: sessionId, messageId }) + ).rejects.toMatchObject({ + code: 'INTERNAL_SERVER_ERROR', + message: 'Message result unavailable', + }); + }); + + it('requires authentication', async () => { + const unauthenticatedCaller = appRouter.createCaller({ + ...mockContext, + userId: undefined, + authToken: undefined, + } as unknown as TRPCContext); + await expect( + unauthenticatedCaller.getMessageResult({ cloudAgentSessionId: sessionId, messageId }) + ).rejects.toThrow('Authentication required'); + }); + + it('rejects extra sensitive RPC response fields at the output boundary', async () => { + mockGetMessageResult.mockResolvedValue({ + type: 'found', + result: { + cloudAgentSessionId: sessionId, + messageId, + status: 'failed', + createdAt: 1, + error: 'private raw error', + }, + }); + await expect( + caller.getMessageResult({ cloudAgentSessionId: sessionId, messageId }) + ).rejects.toThrow(); + }); +}); + describe('router terminal procedures', () => { it('creates a terminal through the session Durable Object', async () => { const createTerminal = vi.fn().mockResolvedValue({ diff --git a/services/cloud-agent-next/src/router/handlers/session-management.ts b/services/cloud-agent-next/src/router/handlers/session-management.ts index db20b4cb95..356c718bc7 100644 --- a/services/cloud-agent-next/src/router/handlers/session-management.ts +++ b/services/cloud-agent-next/src/router/handlers/session-management.ts @@ -18,12 +18,15 @@ import { GetSessionOutput, GetSessionHealthInput, GetSessionHealthOutput, + GetMessageResultInput, + GetMessageResultOutput, GetLatestAssistantMessageInput, GetLatestAssistantMessageOutput, } from '../schemas.js'; import { readProfileBundle } from '../../session-profile.js'; import type { CloudAgentSession } from '../../persistence/CloudAgentSession.js'; import type { CloudAgentSessionState } from '../../persistence/types.js'; +import type { MessageResultRPCResponse } from '../../session/message-result.js'; function publicRepositoryFields(metadata: CloudAgentSessionState): { githubRepo?: string; @@ -402,6 +405,42 @@ export function createSessionManagementHandlers() { }); }), + getMessageResult: protectedProcedure + .input(GetMessageResultInput) + .output(GetMessageResultOutput) + .query(async ({ input, ctx }) => { + return withLogTags({ source: 'getMessageResult' }, async () => { + const sessionId = input.cloudAgentSessionId as SessionId; + const { userId, env } = ctx; + const doKey = `${userId}:${sessionId}`; + const getStub = () => + env.CLOUD_AGENT_SESSION.get(env.CLOUD_AGENT_SESSION.idFromName(doKey)); + + const response = await withDORetry< + DurableObjectStub, + MessageResultRPCResponse + >( + getStub, + async stub => await stub.getMessageResult(input.messageId), + 'getMessageResult' + ); + if (response.type === 'session-not-found') { + throw new TRPCError({ code: 'NOT_FOUND', message: 'Session not found' }); + } + if (response.type === 'message-not-found') { + throw new TRPCError({ code: 'NOT_FOUND', message: 'Message not found' }); + } + if (response.type === 'state-invalid') { + throw new TRPCError({ + code: 'INTERNAL_SERVER_ERROR', + message: 'Message result unavailable', + }); + } + + return response.result; + }); + }), + getLatestAssistantMessage: protectedProcedure .input(GetLatestAssistantMessageInput) .output(GetLatestAssistantMessageOutput) diff --git a/services/cloud-agent-next/src/router/schemas.test.ts b/services/cloud-agent-next/src/router/schemas.test.ts index 83f8095dff..2cfb745aa1 100644 --- a/services/cloud-agent-next/src/router/schemas.test.ts +++ b/services/cloud-agent-next/src/router/schemas.test.ts @@ -1,6 +1,8 @@ import { describe, expect, it } from 'vitest'; import { ExecutionResponse, + GetMessageResultInput, + GetMessageResultOutput, GetSessionOutput, InitiateFromPreparedSessionInput, LegacyExecutionResponse, @@ -331,6 +333,111 @@ describe('message ID schema validation', () => { }); }); +describe('getMessageResult contract', () => { + const baseOutput = { + cloudAgentSessionId: validSessionId, + messageId: validMessageId, + status: 'completed' as const, + createdAt: 1, + }; + + it('requires an exact lookup input while rejecting invalid or unknown fields', () => { + expect(GetMessageResultInput.safeParse({ cloudAgentSessionId: validSessionId }).success).toBe( + false + ); + expect( + GetMessageResultInput.safeParse({ + cloudAgentSessionId: validSessionId, + messageId: validMessageId, + }).success + ).toBe(true); + expect(GetMessageResultInput.safeParse({ cloudAgentSessionId: 'agent_invalid' }).success).toBe( + false + ); + expect( + GetMessageResultInput.safeParse({ cloudAgentSessionId: validSessionId, messageId: 'msg_bad' }) + .success + ).toBe(false); + expect( + GetMessageResultInput.safeParse({ cloudAgentSessionId: validSessionId, unknown: true }) + .success + ).toBe(false); + }); + + it('accepts public statuses and allowlisted structured result fields', () => { + for (const status of ['queued', 'running', 'completed', 'failed', 'interrupted']) { + expect(GetMessageResultOutput.safeParse({ ...baseOutput, status }).success).toBe(true); + } + expect( + GetMessageResultOutput.safeParse({ + ...baseOutput, + queuedAt: 2, + acceptedAt: 3, + terminalAt: 4, + completionSource: 'assistant_message_event', + gateResult: 'fail', + assistant: { messageId: 'assistant_1', text: 'safe answer' }, + }).success + ).toBe(true); + expect( + GetMessageResultOutput.safeParse({ + ...baseOutput, + status: 'failed', + queuedAt: 2, + acceptedAt: 3, + terminalAt: 4, + completionSource: 'wrapper_failure', + failure: { stage: 'agent_activity', code: 'assistant_error', attempts: 2 }, + }).success + ).toBe(true); + }); + + it('fails closed on contradictory lifecycle result fields', () => { + for (const output of [ + { ...baseOutput, status: 'queued', acceptedAt: 2 }, + { ...baseOutput, status: 'queued', terminalAt: 2 }, + { ...baseOutput, status: 'running', completionSource: 'assistant_message_event' }, + { ...baseOutput, status: 'queued', failure: { attempts: 1 } }, + { ...baseOutput, status: 'failed', assistant: { messageId: 'assistant_1', text: 'wrong' } }, + { ...baseOutput, status: 'interrupted', gateResult: 'fail' }, + ]) { + expect(GetMessageResultOutput.safeParse(output).success).toBe(false); + } + }); + + it('fails closed on extra top-level and nested fields', () => { + for (const extra of [ + { error: 'token' }, + { failureReason: 'token' }, + { callbackTarget: { url: 'https://example.com', headers: { Authorization: 'token' } } }, + ]) { + expect(GetMessageResultOutput.safeParse({ ...baseOutput, ...extra }).success).toBe(false); + } + expect( + GetMessageResultOutput.safeParse({ + ...baseOutput, + status: 'failed', + failure: { attempts: -1 }, + }).success + ).toBe(false); + expect( + GetMessageResultOutput.safeParse({ + ...baseOutput, + status: 'failed', + failure: { error: 'token' }, + }).success + ).toBe(false); + expect( + GetMessageResultOutput.safeParse({ ...baseOutput, assistant: { text: 'missing identity' } }) + .success + ).toBe(false); + expect( + GetMessageResultOutput.safeParse({ ...baseOutput, assistant: { parts: [], info: {} } }) + .success + ).toBe(false); + }); +}); + describe('API output schemas omit executionId', () => { it('StartSessionOutput rejects executionId', () => { const result = StartSessionOutput.strict().safeParse({ diff --git a/services/cloud-agent-next/src/router/schemas.ts b/services/cloud-agent-next/src/router/schemas.ts index 9e86896e99..b82632e582 100644 --- a/services/cloud-agent-next/src/router/schemas.ts +++ b/services/cloud-agent-next/src/router/schemas.ts @@ -18,6 +18,11 @@ import { } from '../persistence/schemas.js'; import { AgentModeSchema, BUILTIN_AGENT_MODES, Limits } from '../schema.js'; import { MESSAGE_ID_FORMAT_DESCRIPTION, MESSAGE_ID_PATTERN } from '../session/message-id.js'; +import { + SessionMessageCompletionSourceSchema, + SessionMessageFailureCodeSchema, + SessionMessageFailureStageSchema, +} from '../session/session-message-state.js'; // Re-export schemas from types.ts and persistence/schemas.ts for convenience export { sessionIdSchema, githubRepoSchema, gitUrlSchema, envVarsSchema }; @@ -851,6 +856,96 @@ export const GetSessionOutput = z.object({ export type GetSessionResponse = z.infer; +export const GetMessageResultInput = z + .object({ + cloudAgentSessionId: sessionIdSchema.describe('Cloud-agent session ID to inspect'), + messageId: MessageIdSchema.describe('Exact submitted message ID to inspect'), + }) + .strict(); + +export const GetMessageResultOutput = z + .object({ + cloudAgentSessionId: sessionIdSchema, + messageId: MessageIdSchema, + status: z.enum(['queued', 'running', 'completed', 'failed', 'interrupted']), + createdAt: z.number(), + queuedAt: z.number().optional(), + acceptedAt: z.number().optional(), + terminalAt: z.number().optional(), + completionSource: SessionMessageCompletionSourceSchema.optional(), + failure: z + .object({ + stage: SessionMessageFailureStageSchema.optional(), + code: SessionMessageFailureCodeSchema.optional(), + attempts: z.number().int().nonnegative().optional(), + }) + .strict() + .optional(), + gateResult: z.enum(['pass', 'fail']).optional(), + assistant: z + .object({ + messageId: z.string(), + text: z.string().optional(), + }) + .strict() + .optional(), + }) + .strict() + .superRefine((result, ctx) => { + const isTerminal = + result.status === 'completed' || + result.status === 'failed' || + result.status === 'interrupted'; + if (result.status === 'queued' && result.acceptedAt !== undefined) { + ctx.addIssue({ + code: 'custom', + message: 'Queued results cannot include acceptedAt', + path: ['acceptedAt'], + }); + } + if (!isTerminal && result.terminalAt !== undefined) { + ctx.addIssue({ + code: 'custom', + message: 'Active results cannot include terminalAt', + path: ['terminalAt'], + }); + } + if (!isTerminal && result.completionSource !== undefined) { + ctx.addIssue({ + code: 'custom', + message: 'Active results cannot include completionSource', + path: ['completionSource'], + }); + } + if ( + result.status !== 'failed' && + result.status !== 'interrupted' && + result.failure !== undefined + ) { + ctx.addIssue({ + code: 'custom', + message: 'Only failed or interrupted results can include failure details', + path: ['failure'], + }); + } + if (result.status !== 'completed' && result.gateResult !== undefined) { + ctx.addIssue({ + code: 'custom', + message: 'Only completed results can include gateResult', + path: ['gateResult'], + }); + } + if (result.status !== 'completed' && result.assistant !== undefined) { + ctx.addIssue({ + code: 'custom', + message: 'Only completed results can include an assistant response', + path: ['assistant'], + }); + } + }); + +export type GetMessageResultResponse = z.infer; + export const GetLatestAssistantMessageInput = z.object({ cloudAgentSessionId: sessionIdSchema.describe('Cloud-agent session ID to inspect'), }); diff --git a/services/cloud-agent-next/src/session/message-result.test.ts b/services/cloud-agent-next/src/session/message-result.test.ts new file mode 100644 index 0000000000..4a0bf643f7 --- /dev/null +++ b/services/cloud-agent-next/src/session/message-result.test.ts @@ -0,0 +1,219 @@ +import { describe, expect, it } from 'vitest'; +import { + createPendingSessionMessage, + storePendingSessionMessage, + type SessionQueueStorage, +} from './pending-messages.js'; +import { resolveSessionMessageResult } from './message-result.js'; +import { + putSessionMessageState, + type SessionMessageState, + type SessionMessageStorage, +} from './session-message-state.js'; + +type FakeStorage = SessionMessageStorage & + SessionQueueStorage & { + store: Map; + }; + +function createFakeStorage(): FakeStorage { + const store = new Map(); + return { + store, + async get(key: string): Promise { + return store.get(key) as T | undefined; + }, + async put(key: string, value: unknown): Promise { + store.set(key, value); + }, + async delete(keys: string | string[]): Promise { + for (const key of typeof keys === 'string' ? [keys] : keys) store.delete(key); + }, + async list(options: { prefix: string }): Promise> { + const entries = new Map(); + for (const [key, value] of store.entries()) { + if (key.startsWith(options.prefix)) entries.set(key, value as T); + } + return entries; + }, + }; +} + +const messageA = 'msg_0123456789abAAAAAAAAAAAAAA'; + +function lifecycleState( + messageId: string, + overrides: Partial = {} +): SessionMessageState { + return { + messageId, + status: 'queued', + prompt: 'secret prompt', + createdAt: 1, + queuedAt: 1, + ...overrides, + }; +} + +async function storePending( + storage: FakeStorage, + messageId: string, + createdAt: number, + overrides: Parameters[0] extends infer Params + ? Partial + : never = {} +): Promise { + await storePendingSessionMessage( + storage, + createPendingSessionMessage({ + messageId, + role: 'user', + content: 'secret pending prompt', + createdAt, + ...overrides, + }) + ); +} + +describe('resolveSessionMessageResult', () => { + it('returns an allowlisted exact lifecycle projection and maps accepted to running', async () => { + const storage = createFakeStorage(); + await putSessionMessageState( + storage, + lifecycleState(messageA, { + status: 'accepted', + acceptedAt: 2, + admissionSnapshot: { + turn: { type: 'prompt', messageId: messageA, prompt: 'private admission prompt' }, + agent: { mode: 'code', model: 'private-model' }, + }, + callbackTarget: { url: 'https://example.com', headers: { Authorization: 'secret' } }, + error: 'private raw error', + failureReason: 'private failure reason', + }) + ); + + await expect(resolveSessionMessageResult(storage, messageA)).resolves.toEqual({ + type: 'found', + result: { + messageId: messageA, + status: 'running', + createdAt: 1, + queuedAt: 1, + acceptedAt: 2, + }, + }); + }); + + it('returns a queued recovery projection for exact pending-only rows', async () => { + const storage = createFakeStorage(); + await storePending(storage, messageA, 3, { + lastFlushError: 'private flush error', + callbackSnapshot: { + required: true, + target: { url: 'https://example.com', headers: { Authorization: 'secret' } }, + }, + }); + + await expect(resolveSessionMessageResult(storage, messageA)).resolves.toEqual({ + type: 'found', + result: { + messageId: messageA, + status: 'queued', + createdAt: 3, + queuedAt: 3, + }, + }); + }); + + it('returns undefined for an unknown exact message', async () => { + const storage = createFakeStorage(); + await expect(resolveSessionMessageResult(storage, messageA)).resolves.toBeUndefined(); + }); + + it('prefers lifecycle state over a duplicate pending row', async () => { + const storage = createFakeStorage(); + await storePending(storage, messageA, 20); + await putSessionMessageState( + storage, + lifecycleState(messageA, { status: 'completed', createdAt: 10, terminalAt: 30 }) + ); + + await expect(resolveSessionMessageResult(storage, messageA)).resolves.toMatchObject({ + result: { messageId: messageA, status: 'completed', createdAt: 10 }, + }); + }); + + it('returns an exact assistant lookup for completed lifecycle rows with assistant identity', async () => { + const storage = createFakeStorage(); + await putSessionMessageState( + storage, + lifecycleState(messageA, { + status: 'completed', + assistantMessageId: 'assistant_exact', + terminalAt: 4, + }) + ); + + await expect(resolveSessionMessageResult(storage, messageA)).resolves.toMatchObject({ + assistantLookup: { + type: 'message-id', + messageId: 'assistant_exact', + parentMessageId: messageA, + }, + }); + }); + + it('omits assistant lookup for completed lifecycle rows without assistant identity', async () => { + const storage = createFakeStorage(); + await putSessionMessageState( + storage, + lifecycleState(messageA, { status: 'completed', terminalAt: 4 }) + ); + + await expect(resolveSessionMessageResult(storage, messageA)).resolves.toEqual({ + type: 'found', + result: { + messageId: messageA, + status: 'completed', + createdAt: 1, + queuedAt: 1, + terminalAt: 4, + }, + }); + }); + + it('projects only safe structured terminal fields without assistant lookup for failures', async () => { + const storage = createFakeStorage(); + await putSessionMessageState( + storage, + lifecycleState(messageA, { + status: 'failed', + terminalAt: 4, + completionSource: 'wrapper_failure', + failureStage: 'agent_activity', + failureCode: 'assistant_error', + attempts: 2, + error: 'private raw error', + failureReason: 'private reason', + terminalEffects: { + event: 'pending', + callback: { disposition: 'pending', allowWithoutObservedIdle: false }, + }, + }) + ); + + await expect(resolveSessionMessageResult(storage, messageA)).resolves.toEqual({ + type: 'found', + result: { + messageId: messageA, + status: 'failed', + createdAt: 1, + queuedAt: 1, + terminalAt: 4, + completionSource: 'wrapper_failure', + failure: { stage: 'agent_activity', code: 'assistant_error', attempts: 2 }, + }, + }); + }); +}); diff --git a/services/cloud-agent-next/src/session/message-result.ts b/services/cloud-agent-next/src/session/message-result.ts new file mode 100644 index 0000000000..de2c82d7e8 --- /dev/null +++ b/services/cloud-agent-next/src/session/message-result.ts @@ -0,0 +1,116 @@ +import { + findPendingSessionMessageByMessageId, + type PendingSessionMessage, + type SessionQueueStorage, +} from './pending-messages.js'; +import { + lookupSessionMessageState, + type SessionMessageCompletionSource, + type SessionMessageFailureCode, + type SessionMessageFailureStage, + type SessionMessageState, + type SessionMessageStorage, +} from './session-message-state.js'; + +export type SafeMessageResult = { + messageId: string; + status: 'queued' | 'running' | 'completed' | 'failed' | 'interrupted'; + createdAt: number; + queuedAt?: number; + acceptedAt?: number; + terminalAt?: number; + completionSource?: SessionMessageCompletionSource; + failure?: { + stage?: SessionMessageFailureStage; + code?: SessionMessageFailureCode; + attempts?: number; + }; + gateResult?: 'pass' | 'fail'; +}; + +export type SafeMessageResultResponse = SafeMessageResult & { + cloudAgentSessionId: string; + assistant?: { + messageId: string; + text?: string; + }; +}; + +export type MessageResultRPCResponse = + | { type: 'session-not-found' } + | { type: 'message-not-found' } + | { type: 'state-invalid' } + | { type: 'found'; result: SafeMessageResultResponse }; + +type AssistantLookup = { type: 'message-id'; messageId: string; parentMessageId: string }; + +type ResolvedSessionMessageResult = + | { type: 'state-invalid' } + | { type: 'found'; result: SafeMessageResult; assistantLookup?: AssistantLookup }; + +type MessageResultStorage = SessionMessageStorage & SessionQueueStorage; + +function projectFailure(state: SessionMessageState): SafeMessageResult['failure'] { + if ( + state.failureStage === undefined && + state.failureCode === undefined && + state.attempts === undefined + ) { + return undefined; + } + return { + stage: state.failureStage, + code: state.failureCode, + attempts: state.attempts, + }; +} + +function projectLifecycleState(state: SessionMessageState): ResolvedSessionMessageResult { + const failure = projectFailure(state); + const assistantLookup: AssistantLookup | undefined = + state.status === 'completed' && state.assistantMessageId + ? { + type: 'message-id', + messageId: state.assistantMessageId, + parentMessageId: state.messageId, + } + : undefined; + return { + type: 'found', + result: { + messageId: state.messageId, + status: state.status === 'accepted' ? 'running' : state.status, + createdAt: state.createdAt, + ...(state.queuedAt === undefined ? {} : { queuedAt: state.queuedAt }), + ...(state.acceptedAt === undefined ? {} : { acceptedAt: state.acceptedAt }), + ...(state.terminalAt === undefined ? {} : { terminalAt: state.terminalAt }), + ...(state.completionSource === undefined ? {} : { completionSource: state.completionSource }), + ...(failure === undefined ? {} : { failure }), + ...(state.gateResult === undefined ? {} : { gateResult: state.gateResult }), + }, + ...(assistantLookup ? { assistantLookup } : {}), + }; +} + +function projectPendingMessage(message: PendingSessionMessage): ResolvedSessionMessageResult { + return { + type: 'found', + result: { + messageId: message.messageId, + status: 'queued', + createdAt: message.createdAt, + queuedAt: message.createdAt, + }, + }; +} + +export async function resolveSessionMessageResult( + storage: MessageResultStorage, + messageId: string +): Promise { + const lifecycle = await lookupSessionMessageState(storage, messageId); + if (lifecycle.type === 'invalid') return { type: 'state-invalid' }; + if (lifecycle.type === 'found') return projectLifecycleState(lifecycle.state); + const pending = await findPendingSessionMessageByMessageId(storage, messageId); + return pending ? projectPendingMessage(pending) : undefined; +} diff --git a/services/cloud-agent-next/src/session/queries/events.ts b/services/cloud-agent-next/src/session/queries/events.ts index edda00ade5..622cdfccbc 100644 --- a/services/cloud-agent-next/src/session/queries/events.ts +++ b/services/cloud-agent-next/src/session/queries/events.ts @@ -320,6 +320,45 @@ export function createEventQueries(db: DrizzleSqliteDODatabase, rawSql: SqlStora return buildLatestAssistantMessage(sessionId, rawSql, messageRow); }, + getAssistantMessageById( + sessionId: string, + kiloSessionId: string, + assistantMessageId: string, + parentMessageId: string + ): LatestAssistantMessage | null { + const messageRow = parseStoredEventRow( + rawSql + .exec( + ` + SELECT id, execution_id, session_id, stream_event_type, payload, timestamp + FROM events + WHERE session_id = ? + AND stream_event_type = 'kilocode' + AND entity_id = ? + AND json_extract(payload, '$.event') = 'message.updated' + AND json_extract(payload, '$.properties.info.id') = ? + AND json_extract(payload, '$.properties.info.role') = 'assistant' + AND json_extract(payload, '$.properties.info.sessionID') = ? + AND json_extract(payload, '$.properties.info.parentID') = ? + AND ( + json_extract(payload, '$.properties.info.time.completed') IS NOT NULL + OR json_extract(payload, '$.properties.info.error') IS NOT NULL + ) + LIMIT 1 + `, + sessionId, + `message/${assistantMessageId}`, + assistantMessageId, + kiloSessionId, + parentMessageId + ) + .toArray()[0] + ); + if (!messageRow) return null; + + return buildLatestAssistantMessage(sessionId, rawSql, messageRow); + }, + getAssistantMessageForUserMessage( sessionId: string, kiloSessionId: string, diff --git a/services/cloud-agent-next/src/session/session-message-state.ts b/services/cloud-agent-next/src/session/session-message-state.ts index 2ebb262807..821d11e9bf 100644 --- a/services/cloud-agent-next/src/session/session-message-state.ts +++ b/services/cloud-agent-next/src/session/session-message-state.ts @@ -10,18 +10,63 @@ const SESSION_MESSAGE_STATE_PREFIX = 'session_message:'; export type SessionMessageStatus = 'queued' | 'accepted' | 'completed' | 'failed' | 'interrupted'; -export type SessionMessageCompletionSource = - | 'assistant_message_event' - | 'manual_compact_summarize' - | 'idle_reconciliation' - | 'wrapper_failure' - | 'interrupt' - | 'delivery_failure'; - -export type SessionMessageFailureStage = NonNullable< - CloudAgentRunStateReport['run']['failureStage'] ->; -export type SessionMessageFailureCode = NonNullable; +export const SessionMessageCompletionSourceSchema = z.enum([ + 'assistant_message_event', + 'manual_compact_summarize', + 'idle_reconciliation', + 'wrapper_failure', + 'interrupt', + 'delivery_failure', +]); +export type SessionMessageCompletionSource = z.infer; + +type AssertTrue = T; +type CloudAgentRunFailureStage = NonNullable; +type CloudAgentRunFailureCode = NonNullable; + +export const SessionMessageFailureStageSchema = z.enum([ + 'pre_dispatch', + 'post_dispatch_no_activity', + 'agent_activity', + 'interruption', + 'unknown', +] as const satisfies readonly CloudAgentRunFailureStage[]); +export type SessionMessageFailureStage = + AssertTrue< + CloudAgentRunFailureStage extends z.infer + ? true + : false + > extends true + ? z.infer + : never; + +export const SessionMessageFailureCodeSchema = z.enum([ + 'sandbox_connect_failed', + 'workspace_setup_failed', + 'kilo_server_failed', + 'wrapper_start_failed', + 'invalid_delivery_request', + 'session_metadata_missing', + 'model_missing', + 'delivery_failure_unknown', + 'wrapper_disconnected', + 'wrapper_no_output', + 'wrapper_ping_timeout', + 'wrapper_error_before_activity', + 'assistant_error', + 'wrapper_error_after_activity', + 'missing_assistant_reply', + 'user_interrupt', + 'container_shutdown', + 'system_interrupt', + 'unclassified', +] as const satisfies readonly CloudAgentRunFailureCode[]); +export type SessionMessageFailureCode = + AssertTrue< + CloudAgentRunFailureCode extends z.infer ? true : false + > extends true + ? z.infer + : never; export type SessionMessageDispatchAcceptanceKind = 'observed' | 'inferred_from_terminal'; export type LegacyAdmissionConstraints = { @@ -165,48 +210,9 @@ export const SessionMessageStateSchema = z wrapperRunId: z.string().optional(), assistantMessageId: z.string().optional(), assistantCompletedAt: z.number().optional(), - completionSource: z - .enum([ - 'assistant_message_event', - 'manual_compact_summarize', - 'idle_reconciliation', - 'wrapper_failure', - 'interrupt', - 'delivery_failure', - ]) - .optional(), - failureStage: z - .enum([ - 'pre_dispatch', - 'post_dispatch_no_activity', - 'agent_activity', - 'interruption', - 'unknown', - ]) - .optional(), - failureCode: z - .enum([ - 'sandbox_connect_failed', - 'workspace_setup_failed', - 'kilo_server_failed', - 'wrapper_start_failed', - 'invalid_delivery_request', - 'session_metadata_missing', - 'model_missing', - 'delivery_failure_unknown', - 'wrapper_disconnected', - 'wrapper_no_output', - 'wrapper_ping_timeout', - 'wrapper_error_before_activity', - 'assistant_error', - 'wrapper_error_after_activity', - 'missing_assistant_reply', - 'user_interrupt', - 'container_shutdown', - 'system_interrupt', - 'unclassified', - ]) - .optional(), + completionSource: SessionMessageCompletionSourceSchema.optional(), + failureStage: SessionMessageFailureStageSchema.optional(), + failureCode: SessionMessageFailureCodeSchema.optional(), error: z.string().optional(), failureReason: z.string().optional(), attempts: z.number().int().nonnegative().optional(), @@ -357,21 +363,41 @@ function normalizeParsedSessionMessageState( return { ...currentState, legacyAdmissionConstraints: constraints }; } -export async function getSessionMessageState( +export type SessionMessageStateLookup = + | { type: 'missing' } + | { type: 'invalid' } + | { type: 'found'; state: SessionMessageState }; + +export async function lookupSessionMessageState( storage: SessionMessageStorage, messageId: string -): Promise { +): Promise { const raw = await storage.get(sessionMessageStateKey(messageId)); - if (raw === undefined) return undefined; + if (raw === undefined) return { type: 'missing' }; const result = SessionMessageStateSchema.safeParse(raw); if (!result.success) { console.warn('Invalid session message state', { messageId, issues: result.error.issues, }); - return undefined; + return { type: 'invalid' }; } - return normalizeParsedSessionMessageState(result.data); + if (result.data.messageId !== messageId) { + console.warn('Mismatched session message state identity', { + messageId, + storedMessageId: result.data.messageId, + }); + return { type: 'invalid' }; + } + return { type: 'found', state: normalizeParsedSessionMessageState(result.data) }; +} + +export async function getSessionMessageState( + storage: SessionMessageStorage, + messageId: string +): Promise { + const lookup = await lookupSessionMessageState(storage, messageId); + return lookup.type === 'found' ? lookup.state : undefined; } export async function putSessionMessageState( diff --git a/services/cloud-agent-next/test/integration/session/message-result.test.ts b/services/cloud-agent-next/test/integration/session/message-result.test.ts new file mode 100644 index 0000000000..3a04ebdb4f --- /dev/null +++ b/services/cloud-agent-next/test/integration/session/message-result.test.ts @@ -0,0 +1,516 @@ +import { env, listDurableObjectIds, runInDurableObject } from 'cloudflare:test'; +import { drizzle } from 'drizzle-orm/durable-sqlite'; +import { beforeEach, describe, expect, it } from 'vitest'; +import type { CloudAgentSession } from '../../../src/persistence/CloudAgentSession.js'; +import { createEventQueries } from '../../../src/session/queries/events.js'; +import { + createPendingSessionMessage, + createPendingSessionMessageFromIntent, + storePendingSessionMessage, +} from '../../../src/session/pending-messages.js'; +import { + putSessionMessageState, + type SessionMessageState, +} from '../../../src/session/session-message-state.js'; +import { registerReadySession } from '../../helpers/session-setup.js'; + +const kiloSessionId = 'ses_message_result'; +const messageA = 'msg_0123456789abAAAAAAAAAAAAAA'; +const messageB = 'msg_0123456789abBBBBBBBBBBBBBB'; + +async function seedAssistantMessageWithParent( + state: DurableObjectState, + sessionId: string, + input: { + messageId: string; + parentId: string; + text: string; + timestamp: number; + kiloSessionId?: string; + } +): Promise { + const events = createEventQueries(drizzle(state.storage, { logger: false }), state.storage.sql); + events.upsert({ + executionId: 'exc_message_result', + sessionId, + streamEventType: 'kilocode', + payload: JSON.stringify({ + event: 'message.updated', + properties: { + info: { + id: input.messageId, + role: 'assistant', + sessionID: input.kiloSessionId ?? kiloSessionId, + parentID: input.parentId, + time: { completed: input.timestamp }, + }, + }, + }), + timestamp: input.timestamp, + entityId: `message/${input.messageId}`, + }); + events.upsert({ + executionId: 'exc_message_result', + sessionId, + streamEventType: 'kilocode', + payload: JSON.stringify({ + event: 'message.part.updated', + properties: { + part: { + id: `part_${input.messageId}`, + messageID: input.messageId, + sessionID: input.kiloSessionId ?? kiloSessionId, + type: 'text', + text: input.text, + }, + }, + }), + timestamp: input.timestamp + 1, + entityId: `part/${input.messageId}/part_${input.messageId}`, + }); +} + +async function registerSession(instance: CloudAgentSession, sessionId: string, userId: string) { + await registerReadySession(instance, { + sessionId, + userId, + kiloSessionId, + prompt: 'initial prompt', + mode: 'code', + model: 'test-model', + kilocodeToken: 'private-kilo-token', + }); +} + +function lifecycleState( + messageId: string, + overrides: Partial = {} +): SessionMessageState { + return { + messageId, + status: 'queued', + prompt: 'private prompt', + createdAt: 1, + queuedAt: 1, + ...overrides, + }; +} + +describe('CloudAgentSession.getMessageResult', () => { + beforeEach(async () => { + const ids = await listDurableObjectIds(env.CLOUD_AGENT_SESSION); + await Promise.all( + ids.map(id => + runInDurableObject(env.CLOUD_AGENT_SESSION.get(id), instance => + instance.ctx.storage.deleteAll() + ) + ) + ); + }); + + it('returns session-not-found when metadata is absent', async () => { + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName('user_message_result_missing:agent_message_result_missing') + ); + + await expect(stub.getMessageResult(messageA)).resolves.toEqual({ type: 'session-not-found' }); + }); + + it('returns message-not-found when the exact message ID is absent', async () => { + const userId = 'user_message_result_unknown'; + const sessionId = 'agent_message_result_unknown'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + + const result = await runInDurableObject(stub, async instance => { + await registerSession(instance, sessionId, userId); + return instance.getMessageResult(messageA); + }); + + expect(result).toEqual({ type: 'message-not-found' }); + }); + + it('returns correlated assistant text for an exact completed turn', async () => { + const userId = 'user_message_result_exact'; + const sessionId = 'agent_message_result_exact'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + + const result = await runInDurableObject(stub, async (instance, state) => { + await registerSession(instance, sessionId, userId); + await putSessionMessageState( + instance.ctx.storage, + lifecycleState(messageA, { + status: 'completed', + acceptedAt: 2, + terminalAt: 3, + assistantMessageId: 'assistant_exact', + completionSource: 'assistant_message_event', + }) + ); + await seedAssistantMessageWithParent(state, sessionId, { + messageId: 'assistant_exact', + parentId: messageA, + text: 'Completed answer', + timestamp: 10, + }); + await seedAssistantMessageWithParent(state, sessionId, { + messageId: 'assistant_duplicate', + parentId: messageA, + text: 'Wrong duplicate answer', + timestamp: 20, + }); + return instance.getMessageResult(messageA); + }); + + expect(result).toEqual({ + type: 'found', + result: { + cloudAgentSessionId: sessionId, + messageId: messageA, + status: 'completed', + createdAt: 1, + queuedAt: 1, + acceptedAt: 2, + terminalAt: 3, + completionSource: 'assistant_message_event', + assistant: { messageId: 'assistant_exact', text: 'Completed answer' }, + }, + }); + }); + + it('omits assistant text when a persisted assistant ID belongs to another turn', async () => { + const userId = 'user_message_result_mismatched_parent'; + const sessionId = 'agent_message_result_mismatched_parent'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + + const result = await runInDurableObject(stub, async (instance, state) => { + await registerSession(instance, sessionId, userId); + await putSessionMessageState( + instance.ctx.storage, + lifecycleState(messageA, { + status: 'completed', + terminalAt: 3, + assistantMessageId: 'assistant_wrong_parent', + }) + ); + await seedAssistantMessageWithParent(state, sessionId, { + messageId: 'assistant_wrong_parent', + parentId: messageB, + text: 'Wrong parent answer', + timestamp: 10, + }); + return instance.getMessageResult(messageA); + }); + + expect(result).toEqual({ + type: 'found', + result: { + cloudAgentSessionId: sessionId, + messageId: messageA, + status: 'completed', + createdAt: 1, + queuedAt: 1, + terminalAt: 3, + }, + }); + }); + + it('omits assistant text when a persisted assistant ID belongs to another Kilo session', async () => { + const userId = 'user_message_result_mismatched_kilo_session'; + const sessionId = 'agent_message_result_mismatched_kilo_session'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + + const result = await runInDurableObject(stub, async (instance, state) => { + await registerSession(instance, sessionId, userId); + await putSessionMessageState( + instance.ctx.storage, + lifecycleState(messageA, { + status: 'completed', + terminalAt: 3, + assistantMessageId: 'assistant_wrong_kilo_session', + }) + ); + await seedAssistantMessageWithParent(state, sessionId, { + messageId: 'assistant_wrong_kilo_session', + parentId: messageA, + text: 'Wrong Kilo session answer', + timestamp: 10, + kiloSessionId: 'ses_child', + }); + return instance.getMessageResult(messageA); + }); + + expect(result).toEqual({ + type: 'found', + result: { + cloudAgentSessionId: sessionId, + messageId: messageA, + status: 'completed', + createdAt: 1, + queuedAt: 1, + terminalAt: 3, + }, + }); + }); + + it('returns a queued recovery result for pending-only compatibility rows', async () => { + const userId = 'user_message_result_pending'; + const sessionId = 'agent_message_result_pending'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + + const result = await runInDurableObject(stub, async instance => { + await registerSession(instance, sessionId, userId); + await storePendingSessionMessage( + instance.ctx.storage, + createPendingSessionMessage({ + messageId: messageA, + role: 'user', + content: 'private pending prompt', + createdAt: 5, + lastFlushError: 'private pending failure', + }) + ); + return instance.getMessageResult(messageA); + }); + + expect(result).toEqual({ + type: 'found', + result: { + cloudAgentSessionId: sessionId, + messageId: messageA, + status: 'queued', + createdAt: 5, + queuedAt: 5, + }, + }); + }); + + it('fails closed when a corrupt lifecycle row has pending compatibility residue', async () => { + const userId = 'user_message_result_corrupt'; + const sessionId = 'agent_message_result_corrupt'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + + const result = await runInDurableObject(stub, async instance => { + await registerSession(instance, sessionId, userId); + await storePendingSessionMessage( + instance.ctx.storage, + createPendingSessionMessage({ + messageId: messageA, + role: 'user', + content: 'private pending prompt', + createdAt: 5, + }) + ); + await instance.ctx.storage.put(`session_message:${messageA}`, { + messageId: 'invalid', + status: 'queued', + prompt: 'corrupt lifecycle prompt', + createdAt: 1, + }); + return instance.getMessageResult(messageA); + }); + + expect(result).toEqual({ type: 'state-invalid' }); + }); + + it('fails closed when a lifecycle row embeds another valid message ID', async () => { + const userId = 'user_message_result_mismatched_identity'; + const sessionId = 'agent_message_result_mismatched_identity'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + + const result = await runInDurableObject(stub, async instance => { + await registerSession(instance, sessionId, userId); + await instance.ctx.storage.put(`session_message:${messageA}`, lifecycleState(messageB)); + return instance.getMessageResult(messageA); + }); + + expect(result).toEqual({ type: 'state-invalid' }); + }); + + it('returns a safe exact result for current pending rows', async () => { + const userId = 'user_message_result_current_pending'; + const sessionId = 'agent_message_result_current_pending'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + const token = 'private-current-pending-token'; + + const result = await runInDurableObject(stub, async instance => { + await registerSession(instance, sessionId, userId); + const pending = createPendingSessionMessageFromIntent( + { + turn: { type: 'prompt', messageId: messageB, prompt: token }, + agent: { mode: 'code', model: 'test-model' }, + }, + 6, + { + required: true, + target: { url: 'https://example.com', headers: { Authorization: token } }, + } + ); + await storePendingSessionMessage(instance.ctx.storage, { ...pending, lastFlushError: token }); + return instance.getMessageResult(messageB); + }); + + expect(result).toEqual({ + type: 'found', + result: { + cloudAgentSessionId: sessionId, + messageId: messageB, + status: 'queued', + createdAt: 6, + queuedAt: 6, + }, + }); + expect(JSON.stringify(result)).not.toContain(token); + }); + + it('does not expose assistant text for a queued turn', async () => { + const userId = 'user_message_result_queued'; + const sessionId = 'agent_message_result_queued'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + + const result = await runInDurableObject(stub, async (instance, state) => { + await registerSession(instance, sessionId, userId); + await putSessionMessageState(instance.ctx.storage, lifecycleState(messageA)); + await seedAssistantMessageWithParent(state, sessionId, { + messageId: 'assistant_stale', + parentId: messageA, + text: 'Stale answer', + timestamp: 10, + }); + return instance.getMessageResult(messageA); + }); + + expect(result).toEqual({ + type: 'found', + result: { + cloudAgentSessionId: sessionId, + messageId: messageA, + status: 'queued', + createdAt: 1, + queuedAt: 1, + }, + }); + }); + + it('omits assistant text for completed rows without an assistant message ID', async () => { + const userId = 'user_message_result_parent'; + const sessionId = 'agent_message_result_parent'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + + const result = await runInDurableObject(stub, async (instance, state) => { + await registerSession(instance, sessionId, userId); + await putSessionMessageState( + instance.ctx.storage, + lifecycleState(messageA, { status: 'completed', terminalAt: 3 }) + ); + await seedAssistantMessageWithParent(state, sessionId, { + messageId: 'assistant_selected', + parentId: messageA, + text: 'Selected answer', + timestamp: 10, + }); + await seedAssistantMessageWithParent(state, sessionId, { + messageId: 'assistant_latest', + parentId: messageB, + text: 'Wrong latest answer', + timestamp: 20, + }); + return instance.getMessageResult(messageA); + }); + + expect(result).toEqual({ + type: 'found', + result: { + cloudAgentSessionId: sessionId, + messageId: messageA, + status: 'completed', + createdAt: 1, + queuedAt: 1, + terminalAt: 3, + }, + }); + }); + + it('never exposes sensitive persisted diagnostics', async () => { + const userId = 'user_message_result_safe'; + const sessionId = 'agent_message_result_safe'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + const token = 'private-token-like-text'; + + const result = await runInDurableObject(stub, async instance => { + await registerSession(instance, sessionId, userId); + await putSessionMessageState( + instance.ctx.storage, + lifecycleState(messageA, { + status: 'failed', + prompt: token, + admissionSnapshot: { + turn: { type: 'prompt', messageId: messageA, prompt: token }, + agent: { mode: 'code', model: 'test-model' }, + }, + error: token, + failureReason: token, + callbackTarget: { url: 'https://example.com', headers: { Authorization: token } }, + callbackLastError: token, + terminalEffects: { + event: 'pending', + callback: { disposition: 'pending', allowWithoutObservedIdle: false }, + }, + }) + ); + return instance.getMessageResult(messageA); + }); + + expect(JSON.stringify(result)).not.toContain(token); + }); + + it('returns allowlisted structured failure fields', async () => { + const userId = 'user_message_result_failure'; + const sessionId = 'agent_message_result_failure'; + const stub = env.CLOUD_AGENT_SESSION.get( + env.CLOUD_AGENT_SESSION.idFromName(`${userId}:${sessionId}`) + ); + + const result = await runInDurableObject(stub, async instance => { + await registerSession(instance, sessionId, userId); + await putSessionMessageState( + instance.ctx.storage, + lifecycleState(messageA, { + status: 'failed', + terminalAt: 3, + completionSource: 'wrapper_failure', + failureStage: 'agent_activity', + failureCode: 'assistant_error', + attempts: 2, + }) + ); + return instance.getMessageResult(messageA); + }); + + expect(result).toMatchObject({ + type: 'found', + result: { failure: { stage: 'agent_activity', code: 'assistant_error', attempts: 2 } }, + }); + }); +});