Skip to content

Commit 67e24ea

Browse files
authored
fix(kiloclaw) webhook agent ingest chat delivery (#3140)
* fix(webhook-agent-ingest): restore webhook-to-chat delivery via kilo-chat RPC * fix(webhook-agent-ingest): close two reliability gaps in chat delivery * fix(webhook-agent-ingest): defense-in-depth + shared RPC contract types * fix(webhook-agent-ingest): address review feedback on chat delivery PR
1 parent 0abd6e2 commit 67e24ea

13 files changed

Lines changed: 888 additions & 241 deletions

File tree

packages/kilo-chat/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,6 @@ export type * from './types';
1616
export type { KiloChatEvent, KiloChatEventName, KiloChatEventOf } from './events';
1717
export * from './schemas';
1818
export * from './webhook-schemas';
19+
export type * from './rpc-types';
1920
export * from './events';
2021
export * from './route-helpers';
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Cross-service RPC contracts exposed by the kilo-chat WorkerEntrypoint.
2+
//
3+
// Producer: services/kilo-chat/src/index.ts (KiloChatService)
4+
// Consumers: any worker with a service binding to kilo-chat
5+
// (e.g. webhook-agent-ingest, kiloclaw)
6+
//
7+
// The kilo-chat producer imports these types directly. Consumers import
8+
// them when declaring their service-binding shape (Cloudflare's wrangler
9+
// types only emit a generic `Service` for service bindings; the precise
10+
// RPC method shape is declared per-consumer alongside the binding).
11+
//
12+
// Keeping the contract in one shared package gives us compile-time drift
13+
// detection: a change here breaks both producer and consumer in the same
14+
// build.
15+
16+
// ── postMessageAsUser ──────────────────────────────────────────────
17+
18+
export type PostMessageAsUserCorrelation = {
19+
triggerId?: string;
20+
webhookRequestId?: string;
21+
reason?: string;
22+
};
23+
24+
export type PostMessageAsUserParams = {
25+
userId: string;
26+
sandboxId: string;
27+
message: string;
28+
// Origin identifier for diagnostics (e.g. "webhook", "onboarding-warmup").
29+
// Logged so structured-log queries can attribute new conversations to a
30+
// specific source.
31+
source: string;
32+
// Default true. Pass false to fail the call if the user has never opened
33+
// a chat with this bot.
34+
autoCreateConversation?: boolean;
35+
correlation?: PostMessageAsUserCorrelation;
36+
};
37+
38+
export type PostMessageAsUserOk = {
39+
ok: true;
40+
conversationId: string;
41+
messageId: string;
42+
conversationCreated: boolean;
43+
};
44+
45+
export type PostMessageAsUserErr = {
46+
ok: false;
47+
code: 'invalid_request' | 'no_conversation' | 'forbidden' | 'internal';
48+
error: string;
49+
};
50+
51+
export type PostMessageAsUserResult = PostMessageAsUserOk | PostMessageAsUserErr;

pnpm-lock.yaml

Lines changed: 26 additions & 207 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
import { env, createExecutionContext, waitOnExecutionContext } from 'cloudflare:test';
2+
import { describe, it, expect, vi } from 'vitest';
3+
import {
4+
postMessageAsUser,
5+
type PostMessageAsUserParams,
6+
type PostMessageAsUserResult,
7+
} from '../services/post-message-as-user';
8+
9+
/** Map of userId → set of sandbox IDs they own. */
10+
const ownershipMap = new Map<string, Set<string>>();
11+
12+
vi.mock('../services/sandbox-ownership', () => ({
13+
userOwnsSandbox: async (_env: Env, userId: string, sandboxId: string) =>
14+
ownershipMap.get(userId)?.has(sandboxId) ?? false,
15+
}));
16+
17+
vi.mock('../services/user-lookup', () => ({
18+
resolveUserDisplayInfo: async () => new Map(),
19+
}));
20+
21+
function grantSandbox(userId: string, sandboxId: string) {
22+
if (!ownershipMap.has(userId)) ownershipMap.set(userId, new Set());
23+
ownershipMap.get(userId)!.add(sandboxId);
24+
}
25+
26+
function makeEnv(): Env {
27+
return {
28+
...env,
29+
EVENT_SERVICE: {
30+
fetch: env.EVENT_SERVICE.fetch.bind(env.EVENT_SERVICE),
31+
connect: env.EVENT_SERVICE.connect.bind(env.EVENT_SERVICE),
32+
pushEvent: async () => true,
33+
},
34+
} satisfies Env;
35+
}
36+
37+
// Build a real ExecutionContext, run the callee, then drain pending
38+
// waitUntil work before asserting so isolated-storage cleanup between tests
39+
// does not race. Mirrors the pattern in helpers.ts.
40+
async function runPost(
41+
testEnv: Env,
42+
params: PostMessageAsUserParams
43+
): Promise<PostMessageAsUserResult> {
44+
const ctx = createExecutionContext();
45+
const result = await postMessageAsUser(testEnv, { waitUntil: p => ctx.waitUntil(p) }, params);
46+
await waitOnExecutionContext(ctx);
47+
return result;
48+
}
49+
50+
describe('postMessageAsUser', () => {
51+
it('auto-creates a conversation on first delivery and posts the message', async () => {
52+
const userId = `user-${crypto.randomUUID()}`;
53+
const sandboxId = `sandbox-${crypto.randomUUID()}`;
54+
grantSandbox(userId, sandboxId);
55+
56+
const result = await runPost(makeEnv(), {
57+
userId,
58+
sandboxId,
59+
message: 'webhook payload arrived',
60+
source: 'webhook',
61+
autoCreateConversation: true,
62+
});
63+
64+
expect(result.ok).toBe(true);
65+
if (!result.ok) return;
66+
expect(result.conversationCreated).toBe(true);
67+
expect(result.conversationId).toBeTruthy();
68+
expect(result.messageId).toBeTruthy();
69+
});
70+
71+
it('reuses the existing conversation on subsequent deliveries', async () => {
72+
const userId = `user-${crypto.randomUUID()}`;
73+
const sandboxId = `sandbox-${crypto.randomUUID()}`;
74+
grantSandbox(userId, sandboxId);
75+
const testEnv = makeEnv();
76+
77+
const first = await runPost(testEnv, {
78+
userId,
79+
sandboxId,
80+
message: 'first',
81+
source: 'webhook',
82+
});
83+
expect(first.ok).toBe(true);
84+
if (!first.ok) return;
85+
expect(first.conversationCreated).toBe(true);
86+
87+
const second = await runPost(testEnv, {
88+
userId,
89+
sandboxId,
90+
message: 'second',
91+
source: 'webhook',
92+
});
93+
expect(second.ok).toBe(true);
94+
if (!second.ok) return;
95+
expect(second.conversationCreated).toBe(false);
96+
expect(second.conversationId).toBe(first.conversationId);
97+
});
98+
99+
it('returns no_conversation when autoCreateConversation is false and none exists', async () => {
100+
const userId = `user-${crypto.randomUUID()}`;
101+
const sandboxId = `sandbox-${crypto.randomUUID()}`;
102+
grantSandbox(userId, sandboxId);
103+
104+
const result = await runPost(makeEnv(), {
105+
userId,
106+
sandboxId,
107+
message: 'should fail',
108+
source: 'onboarding-warmup',
109+
autoCreateConversation: false,
110+
});
111+
112+
expect(result.ok).toBe(false);
113+
if (result.ok) return;
114+
expect(result.code).toBe('no_conversation');
115+
});
116+
117+
it('surfaces forbidden when the user does not own the sandbox', async () => {
118+
const userId = `user-${crypto.randomUUID()}`;
119+
const sandboxId = `sandbox-${crypto.randomUUID()}`;
120+
// intentionally NOT granting ownership
121+
122+
const result = await runPost(makeEnv(), {
123+
userId,
124+
sandboxId,
125+
message: 'should be forbidden',
126+
source: 'webhook',
127+
autoCreateConversation: true,
128+
});
129+
130+
expect(result.ok).toBe(false);
131+
if (result.ok) return;
132+
expect(result.code).toBe('forbidden');
133+
});
134+
135+
it('rejects forbidden even when a stale conversation already exists', async () => {
136+
const userId = `user-${crypto.randomUUID()}`;
137+
const sandboxId = `sandbox-${crypto.randomUUID()}`;
138+
const testEnv = makeEnv();
139+
140+
// Seed: grant ownership long enough to create a conversation.
141+
grantSandbox(userId, sandboxId);
142+
const seed = await runPost(testEnv, {
143+
userId,
144+
sandboxId,
145+
message: 'seed message',
146+
source: 'webhook',
147+
});
148+
expect(seed.ok).toBe(true);
149+
150+
// Revoke ownership (simulates instance destroyed / reassigned). The
151+
// conversation still exists in MEMBERSHIP_DO, so the existing-conversation
152+
// path runs without the create-time ownership check.
153+
ownershipMap.get(userId)?.delete(sandboxId);
154+
155+
const result = await runPost(testEnv, {
156+
userId,
157+
sandboxId,
158+
message: 'should now be forbidden',
159+
source: 'webhook',
160+
});
161+
162+
expect(result.ok).toBe(false);
163+
if (result.ok) return;
164+
expect(result.code).toBe('forbidden');
165+
});
166+
167+
it('rejects empty messages with invalid_request before any side effects', async () => {
168+
const userId = `user-${crypto.randomUUID()}`;
169+
const sandboxId = `sandbox-${crypto.randomUUID()}`;
170+
grantSandbox(userId, sandboxId);
171+
172+
const result = await runPost(makeEnv(), {
173+
userId,
174+
sandboxId,
175+
message: ' ',
176+
source: 'webhook',
177+
autoCreateConversation: true,
178+
});
179+
180+
expect(result.ok).toBe(false);
181+
if (result.ok) return;
182+
expect(result.code).toBe('invalid_request');
183+
});
184+
185+
it('rejects messages exceeding the chat content limit with invalid_request', async () => {
186+
const userId = `user-${crypto.randomUUID()}`;
187+
const sandboxId = `sandbox-${crypto.randomUUID()}`;
188+
grantSandbox(userId, sandboxId);
189+
190+
const result = await runPost(makeEnv(), {
191+
userId,
192+
sandboxId,
193+
// Larger than MESSAGE_TEXT_MAX_CHARS (8000) so the public schema rejects it.
194+
message: 'x'.repeat(9000),
195+
source: 'webhook',
196+
autoCreateConversation: true,
197+
});
198+
199+
expect(result.ok).toBe(false);
200+
if (result.ok) return;
201+
expect(result.code).toBe('invalid_request');
202+
});
203+
204+
it('accepts correlation metadata without throwing', async () => {
205+
const userId = `user-${crypto.randomUUID()}`;
206+
const sandboxId = `sandbox-${crypto.randomUUID()}`;
207+
grantSandbox(userId, sandboxId);
208+
209+
const result = await runPost(makeEnv(), {
210+
userId,
211+
sandboxId,
212+
message: 'with correlation',
213+
source: 'webhook',
214+
correlation: { triggerId: 'trig-1', webhookRequestId: 'req-1' },
215+
});
216+
217+
expect(result.ok).toBe(true);
218+
});
219+
});

services/kilo-chat/src/index.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { withDORetry } from '@kilocode/worker-utils';
55
import { cors } from 'hono/cors';
66
import { useWorkersLogger } from 'workers-tagged-logger';
77
import type { MiddlewareHandler } from 'hono';
8-
import { logger } from './util/logger';
8+
import { logger, withLogTags } from './util/logger';
99
import { formatError } from '@kilocode/worker-utils';
1010
import { authMiddleware } from './auth';
1111
import { botAuthMiddleware } from './auth-bot';
@@ -25,6 +25,11 @@ import {
2525
} from './routes/handler';
2626
import { registerBotRoutes } from './routes/bot-messages';
2727
import { registerSandboxReadRoutes } from './routes/sandbox-reads';
28+
import {
29+
postMessageAsUser,
30+
type PostMessageAsUserParams,
31+
type PostMessageAsUserResult,
32+
} from './services/post-message-as-user';
2833
export { MembershipDO } from './do/membership-do';
2934
export { ConversationDO } from './do/conversation-do';
3035
export { SandboxStatusDO } from './do/sandbox-status-do';
@@ -99,8 +104,34 @@ export class KiloChatService extends WorkerEntrypoint<Env> {
99104
return app.fetch(request, this.env, this.ctx);
100105
}
101106

107+
/**
108+
* Internal RPC: post a message into the user-bot conversation on behalf
109+
* of the user. Used by webhook-agent-ingest for webhook-to-chat delivery
110+
* and reusable for other internal flows (e.g. onboarding warmup).
111+
*
112+
* Auto-creates the conversation by default if the user has never opened
113+
* one. Pass `autoCreateConversation: false` to fail when none exists.
114+
*/
115+
async postMessageAsUser(params: PostMessageAsUserParams): Promise<PostMessageAsUserResult> {
116+
// Wrap in withLogTags so logger.setTags inside the helper actually
117+
// propagates. Without an active context (HTTP middleware or wrap),
118+
// setTags is a silent no-op for AsyncLocalStorage-backed loggers.
119+
return await withLogTags({ source: 'kilo-chat-rpc:postMessageAsUser' }, () =>
120+
postMessageAsUser(this.env, { waitUntil: p => this.ctx.waitUntil(p) }, params)
121+
);
122+
}
123+
102124
async destroySandboxData(
103125
sandboxId: string
126+
): Promise<{ ok: boolean; conversationsDeleted: number; failedConversations: string[] }> {
127+
return await withLogTags({ source: 'kilo-chat-rpc:destroySandboxData' }, () => {
128+
logger.setTags({ sandboxId });
129+
return this.destroySandboxDataImpl(sandboxId);
130+
});
131+
}
132+
133+
private async destroySandboxDataImpl(
134+
sandboxId: string
104135
): Promise<{ ok: boolean; conversationsDeleted: number; failedConversations: string[] }> {
105136
const botId = `bot:kiloclaw:${sandboxId}`;
106137
// Discover all conversations for this sandbox, paginating through all results.

0 commit comments

Comments
 (0)