Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/kilo-chat/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ export type * from './types';
export type { KiloChatEvent, KiloChatEventName, KiloChatEventOf } from './events';
export * from './schemas';
export * from './webhook-schemas';
export type * from './rpc-types';
export * from './events';
export * from './route-helpers';
51 changes: 51 additions & 0 deletions packages/kilo-chat/src/rpc-types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Cross-service RPC contracts exposed by the kilo-chat WorkerEntrypoint.
//
// Producer: services/kilo-chat/src/index.ts (KiloChatService)
// Consumers: any worker with a service binding to kilo-chat
// (e.g. webhook-agent-ingest, kiloclaw)
//
// The kilo-chat producer imports these types directly. Consumers import
// them when declaring their service-binding shape (Cloudflare's wrangler
// types only emit a generic `Service` for service bindings; the precise
// RPC method shape is declared per-consumer alongside the binding).
//
// Keeping the contract in one shared package gives us compile-time drift
// detection: a change here breaks both producer and consumer in the same
// build.

// ── postMessageAsUser ──────────────────────────────────────────────

export type PostMessageAsUserCorrelation = {
triggerId?: string;
webhookRequestId?: string;
reason?: string;
};

export type PostMessageAsUserParams = {
userId: string;
sandboxId: string;
message: string;
// Origin identifier for diagnostics (e.g. "webhook", "onboarding-warmup").
// Logged so structured-log queries can attribute new conversations to a
// specific source.
source: string;
// Default true. Pass false to fail the call if the user has never opened
// a chat with this bot.
autoCreateConversation?: boolean;
correlation?: PostMessageAsUserCorrelation;
};

export type PostMessageAsUserOk = {
ok: true;
conversationId: string;
messageId: string;
conversationCreated: boolean;
};

export type PostMessageAsUserErr = {
ok: false;
code: 'invalid_request' | 'no_conversation' | 'forbidden' | 'internal';
error: string;
};

export type PostMessageAsUserResult = PostMessageAsUserOk | PostMessageAsUserErr;
233 changes: 26 additions & 207 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

219 changes: 219 additions & 0 deletions services/kilo-chat/src/__tests__/post-message-as-user.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import { env, createExecutionContext, waitOnExecutionContext } from 'cloudflare:test';
import { describe, it, expect, vi } from 'vitest';
import {
postMessageAsUser,
type PostMessageAsUserParams,
type PostMessageAsUserResult,
} from '../services/post-message-as-user';

/** Map of userId → set of sandbox IDs they own. */
const ownershipMap = new Map<string, Set<string>>();

vi.mock('../services/sandbox-ownership', () => ({
userOwnsSandbox: async (_env: Env, userId: string, sandboxId: string) =>
ownershipMap.get(userId)?.has(sandboxId) ?? false,
}));

vi.mock('../services/user-lookup', () => ({
resolveUserDisplayInfo: async () => new Map(),
}));

function grantSandbox(userId: string, sandboxId: string) {
if (!ownershipMap.has(userId)) ownershipMap.set(userId, new Set());
ownershipMap.get(userId)!.add(sandboxId);
}

function makeEnv(): Env {
return {
...env,
EVENT_SERVICE: {
fetch: env.EVENT_SERVICE.fetch.bind(env.EVENT_SERVICE),
connect: env.EVENT_SERVICE.connect.bind(env.EVENT_SERVICE),
pushEvent: async () => true,
},
} satisfies Env;
}

// Build a real ExecutionContext, run the callee, then drain pending
// waitUntil work before asserting so isolated-storage cleanup between tests
// does not race. Mirrors the pattern in helpers.ts.
async function runPost(
testEnv: Env,
params: PostMessageAsUserParams
): Promise<PostMessageAsUserResult> {
const ctx = createExecutionContext();
const result = await postMessageAsUser(testEnv, { waitUntil: p => ctx.waitUntil(p) }, params);
await waitOnExecutionContext(ctx);
return result;
}

describe('postMessageAsUser', () => {
it('auto-creates a conversation on first delivery and posts the message', async () => {
const userId = `user-${crypto.randomUUID()}`;
const sandboxId = `sandbox-${crypto.randomUUID()}`;
grantSandbox(userId, sandboxId);

const result = await runPost(makeEnv(), {
userId,
sandboxId,
message: 'webhook payload arrived',
source: 'webhook',
autoCreateConversation: true,
});

expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.conversationCreated).toBe(true);
expect(result.conversationId).toBeTruthy();
expect(result.messageId).toBeTruthy();
});

it('reuses the existing conversation on subsequent deliveries', async () => {
const userId = `user-${crypto.randomUUID()}`;
const sandboxId = `sandbox-${crypto.randomUUID()}`;
grantSandbox(userId, sandboxId);
const testEnv = makeEnv();

const first = await runPost(testEnv, {
userId,
sandboxId,
message: 'first',
source: 'webhook',
});
expect(first.ok).toBe(true);
if (!first.ok) return;
expect(first.conversationCreated).toBe(true);

const second = await runPost(testEnv, {
userId,
sandboxId,
message: 'second',
source: 'webhook',
});
expect(second.ok).toBe(true);
if (!second.ok) return;
expect(second.conversationCreated).toBe(false);
expect(second.conversationId).toBe(first.conversationId);
});

it('returns no_conversation when autoCreateConversation is false and none exists', async () => {
const userId = `user-${crypto.randomUUID()}`;
const sandboxId = `sandbox-${crypto.randomUUID()}`;
grantSandbox(userId, sandboxId);

const result = await runPost(makeEnv(), {
userId,
sandboxId,
message: 'should fail',
source: 'onboarding-warmup',
autoCreateConversation: false,
});

expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.code).toBe('no_conversation');
});

it('surfaces forbidden when the user does not own the sandbox', async () => {
const userId = `user-${crypto.randomUUID()}`;
const sandboxId = `sandbox-${crypto.randomUUID()}`;
// intentionally NOT granting ownership

const result = await runPost(makeEnv(), {
userId,
sandboxId,
message: 'should be forbidden',
source: 'webhook',
autoCreateConversation: true,
});

expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.code).toBe('forbidden');
});

it('rejects forbidden even when a stale conversation already exists', async () => {
const userId = `user-${crypto.randomUUID()}`;
const sandboxId = `sandbox-${crypto.randomUUID()}`;
const testEnv = makeEnv();

// Seed: grant ownership long enough to create a conversation.
grantSandbox(userId, sandboxId);
const seed = await runPost(testEnv, {
userId,
sandboxId,
message: 'seed message',
source: 'webhook',
});
expect(seed.ok).toBe(true);

// Revoke ownership (simulates instance destroyed / reassigned). The
// conversation still exists in MEMBERSHIP_DO, so the existing-conversation
// path runs without the create-time ownership check.
ownershipMap.get(userId)?.delete(sandboxId);

const result = await runPost(testEnv, {
userId,
sandboxId,
message: 'should now be forbidden',
source: 'webhook',
});

expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.code).toBe('forbidden');
});

it('rejects empty messages with invalid_request before any side effects', async () => {
const userId = `user-${crypto.randomUUID()}`;
const sandboxId = `sandbox-${crypto.randomUUID()}`;
grantSandbox(userId, sandboxId);

const result = await runPost(makeEnv(), {
userId,
sandboxId,
message: ' ',
source: 'webhook',
autoCreateConversation: true,
});

expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.code).toBe('invalid_request');
});

it('rejects messages exceeding the chat content limit with invalid_request', async () => {
const userId = `user-${crypto.randomUUID()}`;
const sandboxId = `sandbox-${crypto.randomUUID()}`;
grantSandbox(userId, sandboxId);

const result = await runPost(makeEnv(), {
userId,
sandboxId,
// Larger than MESSAGE_TEXT_MAX_CHARS (8000) so the public schema rejects it.
message: 'x'.repeat(9000),
source: 'webhook',
autoCreateConversation: true,
});

expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.code).toBe('invalid_request');
});

it('accepts correlation metadata without throwing', async () => {
const userId = `user-${crypto.randomUUID()}`;
const sandboxId = `sandbox-${crypto.randomUUID()}`;
grantSandbox(userId, sandboxId);

const result = await runPost(makeEnv(), {
userId,
sandboxId,
message: 'with correlation',
source: 'webhook',
correlation: { triggerId: 'trig-1', webhookRequestId: 'req-1' },
});

expect(result.ok).toBe(true);
});
});
33 changes: 32 additions & 1 deletion services/kilo-chat/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { withDORetry } from '@kilocode/worker-utils';
import { cors } from 'hono/cors';
import { useWorkersLogger } from 'workers-tagged-logger';
import type { MiddlewareHandler } from 'hono';
import { logger } from './util/logger';
import { logger, withLogTags } from './util/logger';
import { formatError } from '@kilocode/worker-utils';
import { authMiddleware } from './auth';
import { botAuthMiddleware } from './auth-bot';
Expand All @@ -25,6 +25,11 @@ import {
} from './routes/handler';
import { registerBotRoutes } from './routes/bot-messages';
import { registerSandboxReadRoutes } from './routes/sandbox-reads';
import {
postMessageAsUser,
type PostMessageAsUserParams,
type PostMessageAsUserResult,
} from './services/post-message-as-user';
export { MembershipDO } from './do/membership-do';
export { ConversationDO } from './do/conversation-do';
export { SandboxStatusDO } from './do/sandbox-status-do';
Expand Down Expand Up @@ -99,8 +104,34 @@ export class KiloChatService extends WorkerEntrypoint<Env> {
return app.fetch(request, this.env, this.ctx);
}

/**
* Internal RPC: post a message into the user-bot conversation on behalf
* of the user. Used by webhook-agent-ingest for webhook-to-chat delivery
* and reusable for other internal flows (e.g. onboarding warmup).
*
* Auto-creates the conversation by default if the user has never opened
* one. Pass `autoCreateConversation: false` to fail when none exists.
*/
async postMessageAsUser(params: PostMessageAsUserParams): Promise<PostMessageAsUserResult> {
// Wrap in withLogTags so logger.setTags inside the helper actually
// propagates. Without an active context (HTTP middleware or wrap),
// setTags is a silent no-op for AsyncLocalStorage-backed loggers.
return await withLogTags({ source: 'kilo-chat-rpc:postMessageAsUser' }, () =>
postMessageAsUser(this.env, { waitUntil: p => this.ctx.waitUntil(p) }, params)
);
}

async destroySandboxData(
sandboxId: string
): Promise<{ ok: boolean; conversationsDeleted: number; failedConversations: string[] }> {
return await withLogTags({ source: 'kilo-chat-rpc:destroySandboxData' }, () => {
logger.setTags({ sandboxId });
return this.destroySandboxDataImpl(sandboxId);
});
}

private async destroySandboxDataImpl(
sandboxId: string
): Promise<{ ok: boolean; conversationsDeleted: number; failedConversations: string[] }> {
const botId = `bot:kiloclaw:${sandboxId}`;
// Discover all conversations for this sandbox, paginating through all results.
Expand Down
Loading