Skip to content

Commit 65a61b0

Browse files
fix: prevent WebSocket reconnection storms and DO write exhaustion
Three-layer defense: API Worker: Cache-based rate limit per userId (10s cooldown) prevents DO from being woken during storms. Audit writes moved to waitUntil. ConnectionDO: Close replaced sockets with custom code 4009. Safety valve closes accumulated sockets (>3). Try-catch for "Exceeded" errors returns 503 instead of 500. Conditional storage writes skip unchanged userId/defaultModel/cachedModels. Plugin: Detect 429 via unexpected-response and respect Retry-After. Recognize close code 4009 as "replaced" — stop reconnecting. Delay backoff reset by 10s so flash-connects don't reset to 1s.
1 parent 89b8b0e commit 65a61b0

8 files changed

Lines changed: 222 additions & 66 deletions

File tree

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "botschat",
3-
"version": "0.1.14",
3+
"version": "0.1.15",
44
"description": "A self-hosted chat interface for OpenClaw AI agents",
55
"workspaces": [
66
"packages/*"

packages/api/src/do/connection-do.ts

Lines changed: 95 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,43 @@ export class ConnectionDO implements DurableObject {
3232
/** Browser sessions that report themselves in foreground (push notifications are suppressed). */
3333
private foregroundSessions = new Set<string>();
3434

35+
/** Timestamp of last accepted OpenClaw WebSocket (in-memory, no storage write). */
36+
private lastOpenClawAcceptedAt = 0;
37+
3538
constructor(state: DurableObjectState, env: Env) {
3639
this.state = state;
3740
this.env = env;
3841
}
3942

4043
/** Handle incoming HTTP requests (WebSocket upgrades). */
4144
async fetch(request: Request): Promise<Response> {
45+
try {
46+
return await this._fetch(request);
47+
} catch (err) {
48+
const msg = String(err);
49+
if (msg.includes("Exceeded")) {
50+
console.error("[DO] Storage limit exceeded:", msg);
51+
return new Response("Storage limit exceeded, retry later", {
52+
status: 503,
53+
headers: { "Retry-After": "300" },
54+
});
55+
}
56+
throw err;
57+
}
58+
}
59+
60+
private async _fetch(request: Request): Promise<Response> {
4261
const url = new URL(request.url);
4362

4463
// Route: /gateway/:accountId — OpenClaw plugin connects here
4564
if (url.pathname.startsWith("/gateway/")) {
46-
// Extract and store userId from the gateway path
4765
const userId = url.pathname.split("/gateway/")[1]?.split("?")[0];
4866
if (userId) {
49-
await this.state.storage.put("userId", userId);
67+
const stored = await this.state.storage.get<string>("userId");
68+
if (stored !== userId) {
69+
await this.state.storage.put("userId", userId);
70+
}
5071
}
51-
// Check if the API worker already verified the token against D1
5272
const preVerified = url.searchParams.get("verified") === "1";
5373
return this.handleOpenClawConnect(request, preVerified);
5474
}
@@ -92,22 +112,32 @@ export class ConnectionDO implements DurableObject {
92112

93113
/** Called when a WebSocket receives a message (wakes from hibernation). */
94114
async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): Promise<void> {
95-
const tag = this.getTag(ws);
96-
const data = typeof message === "string" ? message : new TextDecoder().decode(message);
97-
98-
let parsed: Record<string, unknown>;
99115
try {
100-
parsed = JSON.parse(data);
101-
} catch {
102-
return; // Ignore malformed JSON
103-
}
116+
const tag = this.getTag(ws);
117+
const data = typeof message === "string" ? message : new TextDecoder().decode(message);
104118

105-
if (tag === "openclaw") {
106-
// Message from OpenClaw → handle auth or forward to browsers
107-
await this.handleOpenClawMessage(ws, parsed);
108-
} else if (tag?.startsWith("browser:")) {
109-
// Message from browser → forward to OpenClaw
110-
await this.handleBrowserMessage(ws, parsed);
119+
let parsed: Record<string, unknown>;
120+
try {
121+
parsed = JSON.parse(data);
122+
} catch {
123+
return;
124+
}
125+
126+
if (tag === "openclaw") {
127+
await this.handleOpenClawMessage(ws, parsed);
128+
} else if (tag?.startsWith("browser:")) {
129+
await this.handleBrowserMessage(ws, parsed);
130+
}
131+
} catch (err) {
132+
const msg = String(err);
133+
if (msg.includes("Exceeded")) {
134+
console.error("[DO] Storage limit exceeded in webSocketMessage:", msg);
135+
try {
136+
ws.send(JSON.stringify({ type: "error", message: "Storage limit exceeded, retry later" }));
137+
} catch { /* socket may already be dead */ }
138+
return;
139+
}
140+
throw err;
111141
}
112142
}
113143

@@ -139,10 +169,31 @@ export class ConnectionDO implements DurableObject {
139169
return new Response("Expected WebSocket upgrade", { status: 426 });
140170
}
141171

172+
const now = Date.now();
173+
const cooldownMs = 30_000;
174+
if (now - this.lastOpenClawAcceptedAt < cooldownMs) {
175+
const retryAfter = Math.ceil((cooldownMs - (now - this.lastOpenClawAcceptedAt)) / 1000);
176+
return new Response("Too many connections, retry later", {
177+
status: 429,
178+
headers: { "Retry-After": String(retryAfter) },
179+
});
180+
}
181+
this.lastOpenClawAcceptedAt = now;
182+
183+
// Safety valve: if stale openclaw sockets accumulated (e.g. from
184+
// rapid reconnects that authenticated but then lost their edge
185+
// connection), close them all before accepting a new one.
186+
const existing = this.state.getWebSockets("openclaw");
187+
if (existing.length > 3) {
188+
console.warn(`[DO] Safety valve: ${existing.length} openclaw sockets, closing all`);
189+
for (const s of existing) {
190+
try { s.close(4009, "replaced"); } catch { /* dead */ }
191+
}
192+
}
193+
142194
const pair = new WebSocketPair();
143195
const [client, server] = [pair[0], pair[1]];
144196

145-
// Accept with Hibernation API, tag as "openclaw"
146197
this.state.acceptWebSocket(server, ["openclaw"]);
147198

148199
// If the API worker already verified the token against D1, mark as
@@ -187,31 +238,26 @@ export class ConnectionDO implements DurableObject {
187238
const isValid = attachment?.preVerified || await this.validatePairingToken(token);
188239

189240
if (isValid) {
190-
// Close any existing (potentially stale) openclaw sockets before
191-
// marking the new one as authenticated. Cloudflare's edge infra
192-
// terminates WebSocket connections every ~60 min (code 1006). The
193-
// plugin reconnects immediately, but the DO may not have detected
194-
// the old socket's death yet (no close frame → no webSocketClose
195-
// callback yet). Without this cleanup, getOpenClawSocket() could
196-
// return a stale/dead socket, silently dropping all messages.
241+
// Close ALL other openclaw sockets. Use custom code 4009 so
242+
// well-behaved plugins know they were replaced (not a crash)
243+
// and should NOT reconnect. The Worker-level rate limit (10s)
244+
// prevents the resulting close event from flooding the DO.
197245
const existingSockets = this.state.getWebSockets("openclaw");
246+
let closedCount = 0;
198247
for (const oldWs of existingSockets) {
199248
if (oldWs !== ws) {
200249
try {
201-
oldWs.close(1000, "replaced by new connection");
202-
} catch {
203-
// Socket may already be dead — ignore
204-
}
250+
oldWs.close(4009, "replaced");
251+
closedCount++;
252+
} catch { /* already dead */ }
205253
}
206254
}
207255

208256
ws.serializeAttachment({ ...attachment, authenticated: true });
209-
// Include userId so the plugin can derive the E2E key
210257
const userId = await this.state.storage.get<string>("userId");
211-
console.log(`[DO] auth.ok → userId=${userId}, closedStale=${existingSockets.length - 1}`);
258+
console.log(`[DO] auth.ok → userId=${userId}, closed=${closedCount}, total=${existingSockets.length}`);
212259
ws.send(JSON.stringify({ type: "auth.ok", userId }));
213-
// Store gateway default model from plugin auth
214-
if (msg.model) {
260+
if (msg.model && msg.model !== this.defaultModel) {
215261
this.defaultModel = msg.model as string;
216262
await this.state.storage.put("defaultModel", this.defaultModel);
217263
}
@@ -295,20 +341,24 @@ export class ConnectionDO implements DurableObject {
295341
await this.handleTaskScanResult(msg);
296342
}
297343

298-
// Handle models list from plugin — persist to storage and broadcast to browsers
299344
if (msg.type === "models.list") {
300-
this.cachedModels = (msg.models as Array<{ id: string; name: string; provider: string }>) ?? [];
301-
await this.state.storage.put("cachedModels", this.cachedModels);
302-
console.log(`[DO] Persisted ${this.cachedModels.length} models to storage, broadcasting connection.status`);
345+
const newModels = (msg.models as Array<{ id: string; name: string; provider: string }>) ?? [];
346+
const changed = JSON.stringify(newModels) !== JSON.stringify(this.cachedModels);
347+
this.cachedModels = newModels;
348+
if (changed) {
349+
await this.state.storage.put("cachedModels", this.cachedModels);
350+
console.log(`[DO] Persisted ${this.cachedModels.length} models to storage`);
351+
}
303352
this.broadcastToBrowsers(
304353
JSON.stringify({ type: "connection.status", openclawConnected: true, defaultModel: this.defaultModel, models: this.cachedModels }),
305354
);
306355
}
307356

308-
// Plugin applied BotsChat default model to OpenClaw config — update and broadcast
309357
if (msg.type === "defaultModel.updated" && typeof msg.model === "string") {
310-
this.defaultModel = msg.model;
311-
await this.state.storage.put("defaultModel", this.defaultModel);
358+
if (msg.model !== this.defaultModel) {
359+
this.defaultModel = msg.model;
360+
await this.state.storage.put("defaultModel", this.defaultModel);
361+
}
312362
this.broadcastToBrowsers(
313363
JSON.stringify({ type: "connection.status", openclawConnected: true, defaultModel: this.defaultModel, models: this.cachedModels }),
314364
);
@@ -1299,7 +1349,11 @@ export class ConnectionDO implements DurableObject {
12991349
.first<{ user_id: string }>();
13001350

13011351
const isValid = !!row;
1302-
await this.state.storage.put(cacheKey, { valid: isValid, cachedAt: Date.now() });
1352+
try {
1353+
await this.state.storage.put(cacheKey, { valid: isValid, cachedAt: Date.now() });
1354+
} catch {
1355+
// Non-critical — skip caching if storage is full
1356+
}
13031357
return isValid;
13041358
} catch (err) {
13051359
console.error("[DO] Failed to validate pairing token against D1:", err);

packages/api/src/index.ts

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,6 @@ async function verifyUserAccess(c: { req: { header: (n: string) => string | unde
365365
app.all("/api/gateway/:connId", async (c) => {
366366
let userId = c.req.param("connId");
367367

368-
// If connId is not a real user ID (e.g. "default"), resolve via token
369368
if (!userId.startsWith("u_")) {
370369
const token =
371370
c.req.query("token") ??
@@ -376,7 +375,6 @@ app.all("/api/gateway/:connId", async (c) => {
376375
return c.json({ error: "Token required for gateway connection" }, 401);
377376
}
378377

379-
// Look up user by pairing token (exclude revoked tokens)
380378
const row = await c.env.DB.prepare(
381379
"SELECT user_id FROM pairing_tokens WHERE token = ? AND revoked_at IS NULL",
382380
)
@@ -387,26 +385,57 @@ app.all("/api/gateway/:connId", async (c) => {
387385
return c.json({ error: "Invalid pairing token" }, 401);
388386
}
389387
userId = row.user_id;
388+
}
389+
390+
// --- Worker-level rate limit (Cache API) ---
391+
// Protects the DO from being woken up during reconnection storms.
392+
// The Cache API persists across Worker isolates within the same colo.
393+
const GATEWAY_COOLDOWN_S = 10;
394+
const cache = caches.default;
395+
const rateCacheUrl = `https://rate.internal/gateway/${userId}`;
396+
const rateCacheReq = new Request(rateCacheUrl);
397+
const rateCached = await cache.match(rateCacheReq);
398+
if (rateCached) {
399+
return c.text("Too many connections, retry later", 429, {
400+
"Retry-After": String(GATEWAY_COOLDOWN_S),
401+
});
402+
}
390403

391-
// Update audit fields: last_connected_at, last_ip, connection_count
404+
// Audit: update pairing token stats (only when not rate-limited)
405+
const token = c.req.query("token") ?? c.req.header("X-Pairing-Token");
406+
if (token) {
392407
const clientIp = c.req.header("CF-Connecting-IP") ?? c.req.header("X-Forwarded-For") ?? "unknown";
393-
await c.env.DB.prepare(
394-
`UPDATE pairing_tokens
395-
SET last_connected_at = unixepoch(), last_ip = ?, connection_count = connection_count + 1
396-
WHERE token = ?`,
397-
)
398-
.bind(clientIp, token)
399-
.run();
408+
c.executionCtx.waitUntil(
409+
c.env.DB.prepare(
410+
`UPDATE pairing_tokens
411+
SET last_connected_at = unixepoch(), last_ip = ?, connection_count = connection_count + 1
412+
WHERE token = ?`,
413+
).bind(clientIp, token).run(),
414+
);
400415
}
401416

402417
const doId = c.env.CONNECTION_DO.idFromName(userId);
403418
const stub = c.env.CONNECTION_DO.get(doId);
404419
const url = new URL(c.req.url);
405-
// Pass verified userId to DO — the API worker already validated the token
406-
// against D1 above, so DO can trust this.
407420
url.pathname = `/gateway/${userId}`;
408421
url.searchParams.set("verified", "1");
409-
return stub.fetch(new Request(url.toString(), c.req.raw));
422+
const doResp = await stub.fetch(new Request(url.toString(), c.req.raw));
423+
424+
// Cache the rate limit after the DO responds (success or rate-limited).
425+
// 101 = WebSocket accepted; 429 = DO's own rate limit.
426+
// Either way, prevent further DO wake-ups for GATEWAY_COOLDOWN_S.
427+
if (doResp.status === 101 || doResp.status === 429) {
428+
c.executionCtx.waitUntil(
429+
cache.put(
430+
rateCacheReq,
431+
new Response(null, {
432+
headers: { "Cache-Control": `public, max-age=${GATEWAY_COOLDOWN_S}` },
433+
}),
434+
),
435+
);
436+
}
437+
438+
return doResp;
410439
});
411440

412441
// Browser client connects to: /api/ws/:userId/:sessionId

packages/plugin/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@botschat/botschat",
3-
"version": "0.1.14",
3+
"version": "0.1.15",
44
"description": "BotsChat channel plugin for OpenClaw — connects your OpenClaw agent to the BotsChat cloud platform",
55
"type": "module",
66
"main": "dist/index.js",

0 commit comments

Comments
 (0)