Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions services/cloud-agent-next/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ The recommended V2 flow is:
- `prepareSession` - Create a fully prepared session with workspace, git clone, and configuration
- `updateSession` - Update a prepared (not yet initiated) session
- `getSession` - Query session metadata (no secrets)
- `getMessageResult` - Poll lifecycle state and terminal assistant text for one submitted turn
- `initiateFromKilocodeSessionV2` - Start execution on a prepared session
- `sendMessageV2` - Send follow-up messages (output via `/stream` WebSocket)
- `deleteSession` - Delete a session and clean up resources
Expand All @@ -67,6 +68,28 @@ The recommended V2 flow is:

All endpoints require a kilocode api token except `/stream` which uses short lived ws tickets.

### Message Result Retrieval

Use the bearer-protected `GET /trpc/getMessageResult` query to poll one durably submitted Cloud Agent turn. Supply `cloudAgentSessionId` and the submitted `messageId`.

```text
GET /trpc/getMessageResult?input={"cloudAgentSessionId":"agent_<uuid>","messageId":"msg_<id>"}
Authorization: Bearer <kilocode-api-token>
```

The response includes only safe fields: `cloudAgentSessionId`, `messageId`, lifecycle status and timestamps, optional structured `completionSource`, `failure`, `gateResult`, and assistant text correlated to the selected submitted turn. It never returns prompts, tokens, callback details, or raw diagnostics.

| Stored lifecycle state | Public status |
|---|---|
| `queued` | `queued` |
| `accepted` | `running` |
| `completed` | `completed` |
| `failed` | `failed` |
| `interrupted` | `interrupted` |
| Pending-only compatibility row | `queued` |

The query returns `NOT_FOUND` for missing sessions, cross-user session lookups, and unknown message IDs.

## Usage Examples

### TypeScript Client (Recommended)
Expand Down
39 changes: 39 additions & 0 deletions services/cloud-agent-next/src/persistence/CloudAgentSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ import {
createMessageSettlementOutbox,
type MessageSettlementOutbox,
} from '../session/message-settlement-outbox.js';
import {
resolveSessionMessageResult,
type MessageResultRPCResponse,
} from '../session/message-result.js';
import {
createAgentRuntime,
type AgentRuntime,
Expand Down Expand Up @@ -979,6 +983,41 @@ export class CloudAgentSession extends DurableObject<WorkerEnv> {
return this.eventQueries.getLatestAssistantMessage(sessionId, metadata.auth.kiloSessionId);
}

async getMessageResult(messageId: string): Promise<MessageResultRPCResponse> {
const metadata = await this.getMetadata();
if (!metadata) return { type: 'session-not-found' };

const resolved = await resolveSessionMessageResult(this.ctx.storage, messageId);
if (!resolved) return { type: 'message-not-found' };
if (resolved.type === 'state-invalid') return resolved;

const sessionId = await this.requireSessionId();
const assistantMessage =
metadata.auth.kiloSessionId && resolved.assistantLookup
? this.eventQueries.getAssistantMessageById(
sessionId,
metadata.auth.kiloSessionId,
resolved.assistantLookup.messageId,
resolved.assistantLookup.parentMessageId
)
: null;
const assistant = assistantMessage
? {
messageId: assistantMessage.info.id,
text: extractAssistantTextFromParts(assistantMessage.parts) || undefined,
}
: undefined;

return {
type: 'found',
result: {
cloudAgentSessionId: sessionId,
...resolved.result,
...(assistant ? { assistant } : {}),
},
};
}

private async getLatestAssistantMessageText(): Promise<string | undefined> {
try {
const message = await this.getLatestAssistantMessage();
Expand Down
106 changes: 106 additions & 0 deletions services/cloud-agent-next/src/router.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type MockSessionStub = {
getActiveExecutionId?: ReturnType<typeof vi.fn>;
getExecution?: ReturnType<typeof vi.fn>;
getLatestAssistantMessage?: ReturnType<typeof vi.fn>;
getMessageResult?: ReturnType<typeof vi.fn>;
createTerminal?: ReturnType<typeof vi.fn>;
resizeTerminal?: ReturnType<typeof vi.fn>;
closeTerminal?: ReturnType<typeof vi.fn>;
Expand Down Expand Up @@ -1505,6 +1506,111 @@ describe('router sessionId validation', () => {
});
});

describe('getMessageResult procedure', () => {
const sessionId: SessionId = 'agent_12345678-1234-1234-1234-123456789abc';
const messageId = 'msg_018f1e2d3c4bAbCdEfGhIjKlMn';
let mockContext: TRPCContext;
let caller: ReturnType<typeof appRouter.createCaller>;
let cloudAgentSession: MockCAS;
let mockGetMessageResult: ReturnType<typeof vi.fn>;

beforeEach(() => {
vi.clearAllMocks();
mockGetMessageResult = vi.fn().mockResolvedValue({
type: 'found',
result: {
cloudAgentSessionId: sessionId,
messageId,
status: 'completed',
createdAt: 1,
terminalAt: 2,
assistant: { messageId: 'assistant_done', text: 'done' },
},
});
mockContext = {
userId: 'test-user-123',
authToken: 'test-token',
botId: undefined,
request: {} as Request,
env: {
CLOUD_AGENT_SESSION: {
idFromName: vi.fn((id: string) => ({ id })),
get: vi.fn(() => ({ getMessageResult: mockGetMessageResult })),
} as unknown as TRPCContext['env']['CLOUD_AGENT_SESSION'],
} as unknown as TRPCContext['env'],
};
cloudAgentSession = mockContext.env.CLOUD_AGENT_SESSION as unknown as MockCAS;
caller = appRouter.createCaller(mockContext);
});

it('returns an ownership-isolated safe exact message result with one Durable Object RPC', async () => {
await expect(
caller.getMessageResult({ cloudAgentSessionId: sessionId, messageId })
).resolves.toEqual({
cloudAgentSessionId: sessionId,
messageId,
status: 'completed',
createdAt: 1,
terminalAt: 2,
assistant: { messageId: 'assistant_done', text: 'done' },
});
expect(cloudAgentSession.idFromName).toHaveBeenCalledWith(`test-user-123:${sessionId}`);
expect(mockGetMessageResult).toHaveBeenCalledOnce();
expect(mockGetMessageResult).toHaveBeenCalledWith(messageId);
});

it('returns Session not found when the Durable Object has no metadata', async () => {
mockGetMessageResult.mockResolvedValue({ type: 'session-not-found' });
await expect(
caller.getMessageResult({ cloudAgentSessionId: sessionId, messageId })
).rejects.toMatchObject({ code: 'NOT_FOUND', message: 'Session not found' });
});

it('returns Message not found for an unknown message ID', async () => {
mockGetMessageResult.mockResolvedValue({ type: 'message-not-found' });
await expect(
caller.getMessageResult({ cloudAgentSessionId: sessionId, messageId })
).rejects.toMatchObject({ code: 'NOT_FOUND', message: 'Message not found' });
});

it('fails closed when persisted message state is invalid', async () => {
mockGetMessageResult.mockResolvedValue({ type: 'state-invalid' });
await expect(
caller.getMessageResult({ cloudAgentSessionId: sessionId, messageId })
).rejects.toMatchObject({
code: 'INTERNAL_SERVER_ERROR',
message: 'Message result unavailable',
});
});

it('requires authentication', async () => {
const unauthenticatedCaller = appRouter.createCaller({
...mockContext,
userId: undefined,
authToken: undefined,
} as unknown as TRPCContext);
await expect(
unauthenticatedCaller.getMessageResult({ cloudAgentSessionId: sessionId, messageId })
).rejects.toThrow('Authentication required');
});

it('rejects extra sensitive RPC response fields at the output boundary', async () => {
mockGetMessageResult.mockResolvedValue({
type: 'found',
result: {
cloudAgentSessionId: sessionId,
messageId,
status: 'failed',
createdAt: 1,
error: 'private raw error',
},
});
await expect(
caller.getMessageResult({ cloudAgentSessionId: sessionId, messageId })
).rejects.toThrow();
});
});

describe('router terminal procedures', () => {
it('creates a terminal through the session Durable Object', async () => {
const createTerminal = vi.fn().mockResolvedValue({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ import {
GetSessionOutput,
GetSessionHealthInput,
GetSessionHealthOutput,
GetMessageResultInput,
GetMessageResultOutput,
GetLatestAssistantMessageInput,
GetLatestAssistantMessageOutput,
} from '../schemas.js';
import { readProfileBundle } from '../../session-profile.js';
import type { CloudAgentSession } from '../../persistence/CloudAgentSession.js';
import type { CloudAgentSessionState } from '../../persistence/types.js';
import type { MessageResultRPCResponse } from '../../session/message-result.js';

function publicRepositoryFields(metadata: CloudAgentSessionState): {
githubRepo?: string;
Expand Down Expand Up @@ -402,6 +405,42 @@ export function createSessionManagementHandlers() {
});
}),

getMessageResult: protectedProcedure
.input(GetMessageResultInput)
.output(GetMessageResultOutput)
.query(async ({ input, ctx }) => {
return withLogTags({ source: 'getMessageResult' }, async () => {
const sessionId = input.cloudAgentSessionId as SessionId;
const { userId, env } = ctx;
const doKey = `${userId}:${sessionId}`;
const getStub = () =>
env.CLOUD_AGENT_SESSION.get(env.CLOUD_AGENT_SESSION.idFromName(doKey));

const response = await withDORetry<
DurableObjectStub<CloudAgentSession>,
MessageResultRPCResponse
>(
getStub,
async stub => await stub.getMessageResult(input.messageId),
'getMessageResult'
);
if (response.type === 'session-not-found') {
throw new TRPCError({ code: 'NOT_FOUND', message: 'Session not found' });
}
if (response.type === 'message-not-found') {
throw new TRPCError({ code: 'NOT_FOUND', message: 'Message not found' });
}
if (response.type === 'state-invalid') {
throw new TRPCError({
code: 'INTERNAL_SERVER_ERROR',
message: 'Message result unavailable',
});
}

return response.result;
});
}),

getLatestAssistantMessage: protectedProcedure
.input(GetLatestAssistantMessageInput)
.output(GetLatestAssistantMessageOutput)
Expand Down
107 changes: 107 additions & 0 deletions services/cloud-agent-next/src/router/schemas.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { describe, expect, it } from 'vitest';
import {
ExecutionResponse,
GetMessageResultInput,
GetMessageResultOutput,
GetSessionOutput,
InitiateFromPreparedSessionInput,
LegacyExecutionResponse,
Expand Down Expand Up @@ -331,6 +333,111 @@ describe('message ID schema validation', () => {
});
});

describe('getMessageResult contract', () => {
const baseOutput = {
cloudAgentSessionId: validSessionId,
messageId: validMessageId,
status: 'completed' as const,
createdAt: 1,
};

it('requires an exact lookup input while rejecting invalid or unknown fields', () => {
expect(GetMessageResultInput.safeParse({ cloudAgentSessionId: validSessionId }).success).toBe(
false
);
expect(
GetMessageResultInput.safeParse({
cloudAgentSessionId: validSessionId,
messageId: validMessageId,
}).success
).toBe(true);
expect(GetMessageResultInput.safeParse({ cloudAgentSessionId: 'agent_invalid' }).success).toBe(
false
);
expect(
GetMessageResultInput.safeParse({ cloudAgentSessionId: validSessionId, messageId: 'msg_bad' })
.success
).toBe(false);
expect(
GetMessageResultInput.safeParse({ cloudAgentSessionId: validSessionId, unknown: true })
.success
).toBe(false);
});

it('accepts public statuses and allowlisted structured result fields', () => {
for (const status of ['queued', 'running', 'completed', 'failed', 'interrupted']) {
expect(GetMessageResultOutput.safeParse({ ...baseOutput, status }).success).toBe(true);
}
expect(
GetMessageResultOutput.safeParse({
...baseOutput,
queuedAt: 2,
acceptedAt: 3,
terminalAt: 4,
completionSource: 'assistant_message_event',
gateResult: 'fail',
assistant: { messageId: 'assistant_1', text: 'safe answer' },
}).success
).toBe(true);
expect(
GetMessageResultOutput.safeParse({
...baseOutput,
status: 'failed',
queuedAt: 2,
acceptedAt: 3,
terminalAt: 4,
completionSource: 'wrapper_failure',
failure: { stage: 'agent_activity', code: 'assistant_error', attempts: 2 },
}).success
).toBe(true);
});

it('fails closed on contradictory lifecycle result fields', () => {
for (const output of [
{ ...baseOutput, status: 'queued', acceptedAt: 2 },
{ ...baseOutput, status: 'queued', terminalAt: 2 },
{ ...baseOutput, status: 'running', completionSource: 'assistant_message_event' },
{ ...baseOutput, status: 'queued', failure: { attempts: 1 } },
{ ...baseOutput, status: 'failed', assistant: { messageId: 'assistant_1', text: 'wrong' } },
{ ...baseOutput, status: 'interrupted', gateResult: 'fail' },
]) {
expect(GetMessageResultOutput.safeParse(output).success).toBe(false);
}
});

it('fails closed on extra top-level and nested fields', () => {
for (const extra of [
{ error: 'token' },
{ failureReason: 'token' },
{ callbackTarget: { url: 'https://example.com', headers: { Authorization: 'token' } } },
]) {
expect(GetMessageResultOutput.safeParse({ ...baseOutput, ...extra }).success).toBe(false);
}
expect(
GetMessageResultOutput.safeParse({
...baseOutput,
status: 'failed',
failure: { attempts: -1 },
}).success
).toBe(false);
expect(
GetMessageResultOutput.safeParse({
...baseOutput,
status: 'failed',
failure: { error: 'token' },
}).success
).toBe(false);
expect(
GetMessageResultOutput.safeParse({ ...baseOutput, assistant: { text: 'missing identity' } })
.success
).toBe(false);
expect(
GetMessageResultOutput.safeParse({ ...baseOutput, assistant: { parts: [], info: {} } })
.success
).toBe(false);
});
});

describe('API output schemas omit executionId', () => {
it('StartSessionOutput rejects executionId', () => {
const result = StartSessionOutput.strict().safeParse({
Expand Down
Loading
Loading