Skip to content

Commit 71629c2

Browse files
grrowlclaude
andcommitted
test(server): 5 wire-hardening tests + LimitsTestDO subclass
Adds tests/wire-hardening.test.ts pinning the ADR-0012 invariants: 1. Malformed frame shapes are silently dropped; socket survives. 2. ops over maxOpsPerMutation → LIMIT_EXCEEDED, nothing applied. 3. Subscription cap: third sub on 2-cap DO gets reset; first two subs continue receiving deltas. 4. Oversized frame (>1 MiB) dropped before decode; socket survives (workerd passed the frame to webSocketMessage — the DO's own maxFrameBytes guard fired at 1,048,666 bytes). 5. Execute error → generic "mutation failed"/EXECUTE_FAILED; SQLite constraint detail does not leak; authorize error passes through. Also fixes the wellFormed shape guard to treat null == absent for optional fields (the client transport sends null, not omit, for absent fields in MessagePack serialisation — previous commit missed this and broke all existing sub-bearing tests on the full suite). LimitsTestDO (maxOpsPerMutation=2, maxSubsPerSocket=2) added to tests/test-worker.ts, wired in vitest.config.ts and tests/env.d.ts, following the MaintTestDO/SlowTickDO pattern exactly. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent 33200a2 commit 71629c2

5 files changed

Lines changed: 297 additions & 6 deletions

File tree

src/server/sync-do.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,23 +185,28 @@ export abstract class SyncDurableObject<Env = unknown, TUser = unknown> extends
185185

186186
/** Shape-guard: returns true iff `v` is a structurally valid ClientFrame.
187187
* (ADR-0012) Runs after decode, before any SQL binding — ensures no
188-
* arbitrary decoded value reaches lookupTx or sql.exec. */
188+
* arbitrary decoded value reaches lookupTx or sql.exec.
189+
*
190+
* Optional fields treat null == absent (the client transport serialises
191+
* absent fields as null in MessagePack rather than omitting them). */
189192
private wellFormed(v: unknown): v is ClientFrame {
190193
if (v === null || typeof v !== "object") return false
191194
const f = v as Record<string, unknown>
192195
const t = f["t"]
193196
if (typeof t !== "string") return false
194197

195198
const isNonEmptyString = (x: unknown): x is string => typeof x === "string" && x.length > 0
199+
/** null is treated as absent for optional fields */
200+
const absent = (x: unknown): boolean => x === undefined || x === null
196201

197202
switch (t) {
198203
case "sub":
199204
return (
200205
isNonEmptyString(f["subId"]) &&
201206
isNonEmptyString(f["collection"]) &&
202-
(f["since"] === undefined || typeof f["since"] === "string") &&
203-
(f["limit"] === undefined || typeof f["limit"] === "number") &&
204-
(f["offset"] === undefined || typeof f["offset"] === "number")
207+
(absent(f["since"]) || typeof f["since"] === "string") &&
208+
(absent(f["limit"]) || typeof f["limit"] === "number") &&
209+
(absent(f["offset"]) || typeof f["offset"] === "number")
205210
)
206211
case "unsub":
207212
return typeof f["subId"] === "string"
@@ -214,7 +219,7 @@ export abstract class SyncDurableObject<Env = unknown, TUser = unknown> extends
214219
const o = op as Record<string, unknown>
215220
if (!validOpTypes.has(o["type"] as string)) return false
216221
if (typeof o["key"] !== "string") return false
217-
if (o["cols"] !== undefined && (o["cols"] === null || typeof o["cols"] !== "object" || Array.isArray(o["cols"]))) return false
222+
if (!absent(o["cols"]) && (typeof o["cols"] !== "object" || Array.isArray(o["cols"]))) return false
218223
}
219224
return true
220225
}
@@ -224,7 +229,7 @@ export abstract class SyncDurableObject<Env = unknown, TUser = unknown> extends
224229
return (
225230
isNonEmptyString(f["fetchId"]) &&
226231
isNonEmptyString(f["collection"]) &&
227-
(f["cursor"] === undefined || (f["cursor"] !== null && typeof f["cursor"] === "object"))
232+
(absent(f["cursor"]) || typeof f["cursor"] === "object")
228233
)
229234
default:
230235
return false

tests/env.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ declare module "cloudflare:test" {
77
UNREG_DO: DurableObjectNamespace
88
MAINT_DO: DurableObjectNamespace
99
SLOW_DO: DurableObjectNamespace
10+
LIMITS_DO: DurableObjectNamespace
1011
}
1112
}

tests/test-worker.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,20 @@ export class SlowTickDO extends SyncTestDO {
125125
protected override readonly tickMs = 30_000
126126
}
127127

128+
/** Same collections as SyncTestDO with tiny inbound limits — lets wire-hardening
129+
* tests (ADR-0012) exercise the limit paths without sending 128+ ops or opening
130+
* 256+ subscriptions. */
131+
export class LimitsTestDO extends SyncTestDO {
132+
protected override readonly maxOpsPerMutation = 2
133+
protected override readonly maxSubsPerSocket = 2
134+
}
135+
128136
interface Env {
129137
TEST_DO: DurableObjectNamespace
130138
SYNC_DO: DurableObjectNamespace
131139
MAINT_DO: DurableObjectNamespace
132140
SLOW_DO: DurableObjectNamespace
141+
LIMITS_DO: DurableObjectNamespace
133142
}
134143

135144
export default {
@@ -147,6 +156,10 @@ export default {
147156
const name = url.pathname.slice("/slow/".length) || "default"
148157
return env.SLOW_DO.get(env.SLOW_DO.idFromName(name)).fetch(req)
149158
}
159+
if (url.pathname.startsWith("/limits/")) {
160+
const name = url.pathname.slice("/limits/".length) || "default"
161+
return env.LIMITS_DO.get(env.LIMITS_DO.idFromName(name)).fetch(req)
162+
}
150163
return new Response("test-worker")
151164
},
152165
}

tests/wire-hardening.test.ts

Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
// WHY: the wire boundary rejects or drops hostile input loudly without crashing
2+
// the socket, and internal error detail never reaches a client. These tests
3+
// guard the ADR-0012 invariants:
4+
// 1. Malformed frame shapes are dropped silently (no reply) — the socket
5+
// survives and processes subsequent well-formed frames normally.
6+
// 2. A mutation with too many ops is rejected with LIMIT_EXCEEDED before any
7+
// write is attempted.
8+
// 3. A socket hitting the subscription cap gets reset for the excess subId;
9+
// earlier subs continue to receive deltas.
10+
// 4. Oversized frames are dropped before decode; the socket survives.
11+
// 5. Execute errors send a generic message to the client — SQLite detail never
12+
// leaks; authorize errors remain user-facing.
13+
14+
import { env, runInDurableObject, SELF } from "cloudflare:test"
15+
import { describe, expect, it } from "vitest"
16+
import { createFrameCodec, type WireOut } from "../src/wire/frame-codec.ts"
17+
import type { ClientFrame, ServerFrame } from "../src/wire/frames.ts"
18+
19+
const codec = createFrameCodec()
20+
21+
async function openWs(path: string): Promise<WebSocket> {
22+
const res = await SELF.fetch(`https://example.com${path}`, { headers: { Upgrade: "websocket" } })
23+
expect(res.status).toBe(101)
24+
const ws = res.webSocket
25+
if (!ws) throw new Error("no webSocket on 101 response")
26+
ws.accept()
27+
return ws
28+
}
29+
30+
function send(ws: WebSocket, frame: ClientFrame): void {
31+
ws.send(codec.encode(frame))
32+
}
33+
34+
/** Send a raw, already-encoded value (for malformed-frame and oversize tests). */
35+
function sendRaw(ws: WebSocket, data: WireOut): void {
36+
ws.send(data)
37+
}
38+
39+
function collectUntil(ws: WebSocket, done: (f: ServerFrame) => boolean, timeoutMs = 2000): Promise<Array<ServerFrame>> {
40+
return new Promise((resolve, reject) => {
41+
const out: Array<ServerFrame> = []
42+
const timer = setTimeout(() => reject(new Error(`timeout; got [${out.map((f) => f.t).join(",")}]`)), timeoutMs)
43+
const onMsg = (e: MessageEvent): void => {
44+
out.push(codec.decode(e.data as ArrayBuffer) as ServerFrame)
45+
if (done(out[out.length - 1]!)) {
46+
clearTimeout(timer)
47+
ws.removeEventListener("message", onMsg)
48+
resolve(out)
49+
}
50+
}
51+
ws.addEventListener("message", onMsg)
52+
})
53+
}
54+
55+
/** Wait up to timeoutMs; resolve with whatever frames arrived (may be empty). */
56+
function collectFor(ws: WebSocket, timeoutMs: number): Promise<Array<ServerFrame>> {
57+
return new Promise((resolve) => {
58+
const out: Array<ServerFrame> = []
59+
const onMsg = (e: MessageEvent): void => {
60+
out.push(codec.decode(e.data as ArrayBuffer) as ServerFrame)
61+
}
62+
ws.addEventListener("message", onMsg)
63+
setTimeout(() => {
64+
ws.removeEventListener("message", onMsg)
65+
resolve(out)
66+
}, timeoutMs)
67+
})
68+
}
69+
70+
/** Subscribe and wait for the initial snap-end so later frames are post-sub. */
71+
async function subscribe(ws: WebSocket, subId: string, path = "/sync"): Promise<void> {
72+
send(ws, { t: "sub", subId, collection: "messages" })
73+
await collectUntil(ws, (f) => f.t === "snap-end")
74+
}
75+
76+
describe("wire-hardening (ADR-0012)", () => {
77+
it("malformed frame shapes are dropped; socket survives", async () => {
78+
const ws = await openWs("/sync/wh-malformed")
79+
await subscribe(ws, "s1")
80+
81+
// (a) mut with no txId or ops — fails wellFormed; dropped
82+
const malformed1 = codec.encode({ t: "mut" } as unknown as ClientFrame)
83+
sendRaw(ws, malformed1)
84+
85+
// (b) mut with object txId and string ops — fails wellFormed; dropped
86+
const malformed2 = codec.encode({ t: "mut", txId: { evil: 1 }, collection: "messages", ops: "x" } as unknown as ClientFrame)
87+
sendRaw(ws, malformed2)
88+
89+
// (c) unknown frame type — fails wellFormed; dropped
90+
const malformed3 = codec.encode({ t: "unknown-type" } as unknown as ClientFrame)
91+
sendRaw(ws, malformed3)
92+
93+
// After the malformed frames, a VALID mut must succeed — proves socket survived.
94+
send(ws, {
95+
t: "mut",
96+
txId: "wh-t1",
97+
collection: "messages",
98+
ops: [{ type: "insert", key: "wh-a", cols: { id: "wh-a", body: "good" } }],
99+
})
100+
const frames = await collectUntil(ws, (f) => f.t === "committed" || f.t === "rejected")
101+
102+
// No rejected for the malformed ones (dropped, not answered).
103+
const rejected = frames.filter((f) => f.t === "rejected")
104+
expect(rejected.length).toBe(0)
105+
// The valid mut committed.
106+
const committed = frames.find((f) => f.t === "committed")
107+
expect(committed).toBeDefined()
108+
109+
ws.close()
110+
})
111+
112+
it("ops over the per-mutation limit → LIMIT_EXCEEDED, nothing applied", async () => {
113+
// LimitsTestDO has maxOpsPerMutation = 2
114+
const room = "wh-ops-limit"
115+
const ws = await openWs(`/limits/${room}`)
116+
117+
// Three ops — one over the limit of 2.
118+
send(ws, {
119+
t: "mut",
120+
txId: "wh-lim1",
121+
collection: "messages",
122+
ops: [
123+
{ type: "insert", key: "r1", cols: { id: "r1", body: "x" } },
124+
{ type: "insert", key: "r2", cols: { id: "r2", body: "y" } },
125+
{ type: "insert", key: "r3", cols: { id: "r3", body: "z" } },
126+
],
127+
})
128+
const frames = await collectUntil(ws, (f) => f.t === "rejected" || f.t === "committed")
129+
const last = frames[frames.length - 1]! as Extract<ServerFrame, { t: "rejected" }>
130+
expect(last.t).toBe("rejected")
131+
expect(last.error.code).toBe("LIMIT_EXCEEDED")
132+
expect(last.error.message).toMatch(/maxOpsPerMutation/)
133+
134+
// Verify nothing was applied.
135+
const stub = env.LIMITS_DO.get(env.LIMITS_DO.idFromName(room))
136+
const rows = await runInDurableObject(stub, (_i, s) => {
137+
return Array.from(s.storage.sql.exec("SELECT COUNT(*) as cnt FROM messages"))
138+
})
139+
expect((rows[0] as { cnt: number }).cnt).toBe(0)
140+
141+
ws.close()
142+
})
143+
144+
it("subscription cap: third sub on 2-cap DO → reset; first two subs still receive deltas", async () => {
145+
// LimitsTestDO has maxSubsPerSocket = 2
146+
const room = "wh-sub-cap"
147+
const ws = await openWs(`/limits/${room}`)
148+
149+
// First sub — ok.
150+
send(ws, { t: "sub", subId: "cap-s1", collection: "messages" })
151+
await collectUntil(ws, (f) => f.t === "snap-end")
152+
153+
// Second sub — ok.
154+
send(ws, { t: "sub", subId: "cap-s2", collection: "messages" })
155+
await collectUntil(ws, (f) => f.t === "snap-end")
156+
157+
// Third sub — over the cap; should get reset.
158+
send(ws, { t: "sub", subId: "cap-s3", collection: "messages" })
159+
const capFrames = await collectUntil(ws, (f) => f.t === "reset" || f.t === "snap-end", 2000)
160+
const reset = capFrames.find((f) => f.t === "reset") as Extract<ServerFrame, { t: "reset" }> | undefined
161+
expect(reset).toBeDefined()
162+
expect(reset!.sub).toBe("cap-s3")
163+
164+
// Now insert a row — the first two subs should receive deltas.
165+
send(ws, {
166+
t: "mut",
167+
txId: "wh-cap-mut1",
168+
collection: "messages",
169+
ops: [{ type: "insert", key: "cap-row1", cols: { id: "cap-row1", body: "hello" } }],
170+
})
171+
const afterMut = await collectUntil(ws, (f) => f.t === "committed", 2000)
172+
// s1 and s2 both get a delta frame for the inserted row.
173+
const deltas = afterMut.filter((f) => f.t === "d") as Array<Extract<ServerFrame, { t: "d" }>>
174+
const subIds = deltas.map((d) => d.sub)
175+
expect(subIds).toContain("cap-s1")
176+
expect(subIds).toContain("cap-s2")
177+
178+
ws.close()
179+
})
180+
181+
it("oversized frame is dropped; socket survives and answers the next valid frame", async () => {
182+
const ws = await openWs("/sync/wh-oversize")
183+
184+
// Build a >1 MiB payload by encoding a mut with a large cols.body.
185+
// The frame will exceed maxFrameBytes (1_048_576) when encoded.
186+
const bigBody = "x".repeat(1_048_577)
187+
const bigFrame = codec.encode({
188+
t: "mut",
189+
txId: "wh-big1",
190+
collection: "messages",
191+
ops: [{ type: "insert", key: "big", cols: { id: "big", body: bigBody } }],
192+
})
193+
194+
// Attempt to send the oversized frame.
195+
let wsClosedByWorkerd = false
196+
ws.addEventListener("close", () => {
197+
wsClosedByWorkerd = true
198+
})
199+
200+
try {
201+
sendRaw(ws, bigFrame)
202+
} catch {
203+
// workerd may throw synchronously on an oversized send — treat as closed.
204+
wsClosedByWorkerd = true
205+
}
206+
207+
if (wsClosedByWorkerd) {
208+
// STOP-condition path: workerd closed the socket before webSocketMessage
209+
// ran. The DO never saw the frame. Assert the socket is closed and note.
210+
// The maxFrameBytes guard still provides an explicit, testable bound for
211+
// cases where workerd passes the frame through (future versions, etc.).
212+
// Per plan: assert THAT and note it — either way the DO must not crash.
213+
expect(wsClosedByWorkerd).toBe(true)
214+
return
215+
}
216+
217+
// Collect for a short window — no reply expected (frame was dropped).
218+
const droppedFrames = await collectFor(ws, 300)
219+
const anyReply = droppedFrames.filter((f) => {
220+
const r = f as Extract<ServerFrame, { t: "rejected" }>
221+
return f.t === "rejected" && (r as typeof r).txId === "wh-big1"
222+
})
223+
expect(anyReply).toHaveLength(0)
224+
225+
// Socket survives: a normal mut succeeds.
226+
send(ws, {
227+
t: "mut",
228+
txId: "wh-small1",
229+
collection: "messages",
230+
ops: [{ type: "insert", key: "small", cols: { id: "small", body: "normal" } }],
231+
})
232+
const followUp = await collectUntil(ws, (f) => f.t === "committed" || f.t === "rejected")
233+
const last = followUp[followUp.length - 1]!
234+
expect(last.t).toBe("committed")
235+
236+
ws.close()
237+
})
238+
239+
it("execute error → generic 'mutation failed', SQLite detail not leaked; authorize error passes through", async () => {
240+
const ws = await openWs("/sync/wh-sanitize")
241+
242+
// Insert key "a" first.
243+
send(ws, { t: "mut", txId: "wh-san1", collection: "messages", ops: [{ type: "insert", key: "a", cols: { id: "a", body: "v1" } }] })
244+
await collectUntil(ws, (f) => f.t === "committed")
245+
246+
// Duplicate-pk insert triggers SQLite UNIQUE constraint.
247+
send(ws, { t: "mut", txId: "wh-san2", collection: "messages", ops: [{ type: "insert", key: "a", cols: { id: "a", body: "v2" } }] })
248+
const frames = await collectUntil(ws, (f) => f.t === "rejected" || f.t === "committed")
249+
const last = frames[frames.length - 1]! as Extract<ServerFrame, { t: "rejected" }>
250+
expect(last.t).toBe("rejected")
251+
// Generic message — no SQLite detail.
252+
expect(last.error.message).toBe("mutation failed")
253+
expect(last.error.code).toBe("EXECUTE_FAILED")
254+
// The SQLite constraint detail must NOT appear in the client-visible message.
255+
expect(last.error.message).not.toMatch(/UNIQUE|constraint/i)
256+
257+
// Authorize denial still passes through verbatim (README: "throw to deny").
258+
send(ws, {
259+
t: "mut",
260+
txId: "wh-san3",
261+
collection: "messages",
262+
ops: [{ type: "insert", key: "forbidden-key", cols: { id: "forbidden-key", body: "FORBIDDEN" } }],
263+
})
264+
const authFrames = await collectUntil(ws, (f) => f.t === "rejected" || f.t === "committed")
265+
const authLast = authFrames[authFrames.length - 1]! as Extract<ServerFrame, { t: "rejected" }>
266+
expect(authLast.t).toBe("rejected")
267+
expect(authLast.error.message).toMatch(/forbidden/)
268+
269+
ws.close()
270+
})
271+
})

vitest.config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export default defineWorkersProject({
2424
UNREG_DO: { className: "UnregisteredDO", useSQLite: true },
2525
MAINT_DO: { className: "MaintTestDO", useSQLite: true },
2626
SLOW_DO: { className: "SlowTickDO", useSQLite: true },
27+
LIMITS_DO: { className: "LimitsTestDO", useSQLite: true },
2728
},
2829
},
2930
},

0 commit comments

Comments
 (0)