Skip to content

Commit 880a7fe

Browse files
committed
feat: add Blob serialization support over RPC
Blobs can now be passed as RPC call arguments and return values, with MIME type preserved across the wire. Wire format: ["blob", type, ["readable", pipeId]]. Bytes always stream through a pipe — reading a Blob's bytes is inherently async, so there's no way to preserve send-side e-order regardless of encoding; the uniform pipe path keeps the encoder synchronous and matches the wire semantics of a payload containing a promise. Implementation is almost entirely in serialize.ts: - serialize.ts: Devaluator encode case creates a pipe from blob.stream(). Evaluator decode case wraps the incoming ReadableStream in an RpcPromise via a new streamToBlobPromise() helper that mirrors fixBrokenRequestBody(): the promise is pushed into the Evaluator's existing promises list and the payload-delivery machinery substitutes the real Blob before user code sees it. No dedicated blob-promise plumbing. - core.ts: add "blob" to TypeForRpc, BLOB_PROTOTYPE constant, the typeForRpc case, and immutable case arms in deepCopy / disposeReturn / deliverTo / followPath. No other changes. - rpc.ts: unchanged. - types.d.ts: Blob added to BaseType. - README.md: Blob added to pass-by-value types list. - __tests__/test-util.ts: echoBlob() on TestTarget. - __tests__/index.test.ts: decode rejection tests, round-trip coverage, wire-format verification. Not supported: File (Blob subclass with different prototype), and serialize()/deserialize() of Blobs outside an RPC session (same limitation as streams and stubs — createPipe() requires a session).
1 parent cfa1b95 commit 880a7fe

7 files changed

Lines changed: 311 additions & 2 deletions

File tree

.changeset/blob-rpc-support.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"capnweb": minor
3+
---
4+
5+
Add `Blob` as a serializable type over RPC. `Blob` objects can now be passed as call arguments and return values. The MIME type (`blob.type`) is preserved across the wire.

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ The following types can be passed over RPC (in arguments or return values), and
201201
* `Date`
202202
* `Uint8Array`
203203
* `Error` and its well-known subclasses
204+
* `Blob`
204205
* `ReadableStream` and `WritableStream`, with automatic flow control.
205206
* `Headers`, `Request`, and `Response` from the Fetch API.
206207

__tests__/index.test.ts

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,27 @@ describe("simple serialization", () => {
173173

174174
// =======================================================================================
175175

176+
describe("blob serialization", () => {
177+
it("rejects malformed blob wire values", () => {
178+
// Missing parts.
179+
expect(() => deserialize('["blob"]')).toThrowError();
180+
expect(() => deserialize('["blob","text/plain"]')).toThrowError();
181+
// Non-string MIME type.
182+
expect(() => deserialize('["blob",123,["readable",0]]')).toThrowError();
183+
// Extra parts.
184+
expect(() => deserialize('["blob","text/plain",["readable",0],"extra"]')).toThrowError();
185+
});
186+
187+
it("throws when serializing Blob without an RPC session", () => {
188+
// The encoder always uses a pipe, which requires an active RPC session. `serialize()` routes
189+
// through NULL_EXPORTER and therefore cannot support Blob — same as streams and stubs.
190+
let blob = new Blob(["hello"], {type: "text/plain"});
191+
expect(() => serialize(blob)).toThrowError("Cannot create pipes without an RPC session");
192+
});
193+
});
194+
195+
// =======================================================================================
196+
176197
class TestTransport implements RpcTransport {
177198
constructor(public name: string, private partner?: TestTransport) {
178199
if (partner) {
@@ -2444,3 +2465,215 @@ describe("Fetch API types over RPC", () => {
24442465
expect(result.hasBody).toBe(false);
24452466
});
24462467
});
2468+
2469+
// =======================================================================================
2470+
2471+
describe("Blob over RPC", () => {
2472+
it("can send and receive a binary Blob", async () => {
2473+
await using harness = new TestHarness(new TestTarget());
2474+
let bytes = new TextEncoder().encode("hello from blob");
2475+
let blob = new Blob([bytes], {type: "application/octet-stream"});
2476+
using result = await harness.stub.echoBlob(blob);
2477+
expect(result).toBeInstanceOf(Blob);
2478+
expect(result.type).toBe("application/octet-stream");
2479+
expect(new Uint8Array(await result.arrayBuffer())).toStrictEqual(bytes);
2480+
});
2481+
2482+
it("preserves Blob MIME type", async () => {
2483+
await using harness = new TestHarness(new TestTarget());
2484+
let blob = new Blob(["<h1>hello</h1>"], {type: "text/html; charset=utf-8"});
2485+
using result = await harness.stub.echoBlob(blob);
2486+
expect(result.type).toBe("text/html; charset=utf-8");
2487+
expect(await result.text()).toBe("<h1>hello</h1>");
2488+
});
2489+
2490+
it("can send an empty Blob", async () => {
2491+
await using harness = new TestHarness(new TestTarget());
2492+
let blob = new Blob([], {type: "application/octet-stream"});
2493+
using result = await harness.stub.echoBlob(blob);
2494+
expect(result).toBeInstanceOf(Blob);
2495+
expect(result.size).toBe(0);
2496+
expect(result.type).toBe("application/octet-stream");
2497+
});
2498+
2499+
it("can send a Blob as part of a compound return value", async () => {
2500+
class BlobServer extends RpcTarget {
2501+
makePayload() {
2502+
return {
2503+
name: "test.txt",
2504+
blob: new Blob(["file content"], {type: "text/plain"}),
2505+
size: 12,
2506+
};
2507+
}
2508+
}
2509+
2510+
await using harness = new TestHarness(new BlobServer());
2511+
let stub = harness.stub as any;
2512+
let result = await stub.makePayload();
2513+
expect(result.name).toBe("test.txt");
2514+
expect(result.blob).toBeInstanceOf(Blob);
2515+
expect(result.blob.type).toBe("text/plain");
2516+
expect(await result.blob.text()).toBe("file content");
2517+
expect(result.size).toBe(12);
2518+
});
2519+
2520+
it("can send multiple Blobs in the same call", async () => {
2521+
// Each Blob produces its own RpcPromise entry in the Evaluator's `promises` list; all must
2522+
// resolve before the payload is delivered to user code.
2523+
class BlobCombiner extends RpcTarget {
2524+
async concatenate(a: Blob, b: Blob) {
2525+
let [textA, textB] = await Promise.all([a.text(), b.text()]);
2526+
return `${textA}|${textB}`;
2527+
}
2528+
}
2529+
2530+
await using harness = new TestHarness(new BlobCombiner());
2531+
let stub = harness.stub as any;
2532+
let result = await stub.concatenate(
2533+
new Blob(["hello"], {type: "text/plain"}),
2534+
new Blob(["world"], {type: "text/plain"}),
2535+
);
2536+
expect(result).toBe("hello|world");
2537+
});
2538+
2539+
it("can receive an array of Blobs in one return value", async () => {
2540+
// Multiple RpcPromise entries produced from a single return value, all substituted before
2541+
// the array reaches user code.
2542+
class BlobFactory extends RpcTarget {
2543+
makeBlobs() {
2544+
return [
2545+
new Blob(["first"], {type: "text/plain"}),
2546+
new Blob(["second"], {type: "text/plain"}),
2547+
new Blob(["third"], {type: "text/plain"}),
2548+
];
2549+
}
2550+
}
2551+
2552+
await using harness = new TestHarness(new BlobFactory());
2553+
let stub = harness.stub as any;
2554+
let [b1, b2, b3] = await stub.makeBlobs();
2555+
expect(await b1.text()).toBe("first");
2556+
expect(await b2.text()).toBe("second");
2557+
expect(await b3.text()).toBe("third");
2558+
});
2559+
2560+
it("round-trips a Blob with no MIME type", async () => {
2561+
// new Blob([bytes]) leaves .type as "" — the empty string must survive the round-trip
2562+
// and not become undefined or null.
2563+
await using harness = new TestHarness(new TestTarget());
2564+
let bytes = new TextEncoder().encode("untyped content");
2565+
let blob = new Blob([bytes]);
2566+
expect(blob.type).toBe("");
2567+
using result = await harness.stub.echoBlob(blob);
2568+
expect(result.type).toBe("");
2569+
expect(new Uint8Array(await result.arrayBuffer())).toStrictEqual(bytes);
2570+
});
2571+
2572+
it("preserves every possible byte value through the pipe", async () => {
2573+
// All 256 possible byte values in a single Blob — verifies the pipe mechanism
2574+
// neither corrupts nor truncates any byte.
2575+
await using harness = new TestHarness(new TestTarget());
2576+
let bytes = new Uint8Array(256);
2577+
for (let i = 0; i < 256; i++) bytes[i] = i;
2578+
let blob = new Blob([bytes], {type: "application/octet-stream"});
2579+
using result = await harness.stub.echoBlob(blob);
2580+
expect(result.size).toBe(256);
2581+
expect(new Uint8Array(await result.arrayBuffer())).toStrictEqual(bytes);
2582+
});
2583+
2584+
it("can send a large Blob over RPC", async () => {
2585+
// 1 MB blob — exercises multi-chunk stream collection in streamToBlob().
2586+
// Timeout is raised because CI machines can be slow to pump 1 MB through the
2587+
// fake in-process transport (default 5 s is too tight on some runners).
2588+
// Skipped in workerd: the isolate drops its connection when a large in-process
2589+
// stream is pumped through it (infrastructure limit, not a code bug).
2590+
if (navigator.userAgent === "Cloudflare-Workers") return;
2591+
await using harness = new TestHarness(new TestTarget());
2592+
let size = 1024 * 1024;
2593+
let bytes = new Uint8Array(size);
2594+
for (let i = 0; i < size; i++) bytes[i] = i & 0xff;
2595+
let blob = new Blob([bytes], {type: "application/octet-stream"});
2596+
using result = await harness.stub.echoBlob(blob);
2597+
expect(result.size).toBe(size);
2598+
expect(new Uint8Array(await result.arrayBuffer())).toStrictEqual(bytes);
2599+
}, 30_000);
2600+
2601+
it("can pass a Blob through a local (loopback) stub", async () => {
2602+
// No network — payload goes through deepCopy() rather than the Evaluator. Blobs are
2603+
// immutable so deepCopy() returns them as-is, without going through the pipe path.
2604+
using stub = new RpcStub(new TestTarget());
2605+
let bytes = new TextEncoder().encode("loopback content");
2606+
let blob = new Blob([bytes], {type: "text/plain"});
2607+
let result = await stub.echoBlob(blob);
2608+
expect(result).toBeInstanceOf(Blob);
2609+
expect(result.type).toBe("text/plain");
2610+
expect(await result.text()).toBe("loopback content");
2611+
result[Symbol.dispose]();
2612+
});
2613+
2614+
it("disposing a result containing a Blob does not throw", async () => {
2615+
// Blobs have no owned resources; disposeImpl() must be a silent no-op.
2616+
class BlobServer extends RpcTarget {
2617+
makeBlob() { return new Blob(["hello"], {type: "text/plain"}); }
2618+
}
2619+
2620+
await using harness = new TestHarness(new BlobServer());
2621+
let stub = harness.stub as any;
2622+
let result = await stub.makeBlob();
2623+
expect(result).toBeInstanceOf(Blob);
2624+
// Dispose without reading — should never throw.
2625+
expect(() => result[Symbol.dispose]()).not.toThrow();
2626+
});
2627+
2628+
it("is encoded as a readable pipe on the wire", async () => {
2629+
// Verify the wire format: ["blob", type, ["readable", pipeId]] — always. There is no inline
2630+
// fast path; reading a Blob's bytes is inherently async so we always stream.
2631+
class Server extends RpcTarget {
2632+
receiveBlob(_blob: Blob) { return "ok"; }
2633+
}
2634+
2635+
let clientTransport = new TestTransport("client");
2636+
let serverTransport = new TestTransport("server", clientTransport);
2637+
2638+
let client = new RpcSession<Server>(clientTransport);
2639+
let server = new RpcSession(serverTransport, new Server());
2640+
2641+
serverTransport.fence();
2642+
2643+
let stub = client.getRemoteMain();
2644+
let blob = new Blob(["hello"], {type: "text/plain"});
2645+
let p = stub.receiveBlob(blob);
2646+
2647+
// The call message is dispatched synchronously (the pipe path does not require pre-reading
2648+
// bytes on the sending side), but yield once to be safe across environments.
2649+
await Promise.resolve();
2650+
2651+
let blobExpr: any = undefined;
2652+
for (let i = 0; i < serverTransport.pendingCount; i++) {
2653+
let msg = JSON.parse((serverTransport as any).queue[i]);
2654+
if (msg[0] === "push") {
2655+
let findBlob = (v: any): any => {
2656+
if (v instanceof Array && v[0] === "blob") return v;
2657+
if (v instanceof Array) for (let e of v) { let r = findBlob(e); if (r) return r; }
2658+
if (v && typeof v === "object") for (let k in v) { let r = findBlob(v[k]); if (r) return r; }
2659+
return undefined;
2660+
};
2661+
blobExpr = findBlob(msg);
2662+
if (blobExpr) break;
2663+
}
2664+
}
2665+
2666+
expect(blobExpr).toBeDefined();
2667+
expect(blobExpr[0]).toBe("blob");
2668+
expect(blobExpr[1]).toBe("text/plain");
2669+
expect(blobExpr[2]).toBeInstanceOf(Array);
2670+
expect(blobExpr[2][0]).toBe("readable");
2671+
expect(typeof blobExpr[2][1]).toBe("number"); // pipe ID
2672+
2673+
serverTransport.releaseFence();
2674+
await p;
2675+
2676+
stub[Symbol.dispose]();
2677+
await pumpMicrotasks();
2678+
});
2679+
});

__tests__/test-util.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,9 @@ export class TestTarget extends RpcTarget {
6666
returnNull() { return null; }
6767
returnUndefined() { return undefined; }
6868
returnNumber(i: number) { return i; }
69+
70+
async echoBlob(blob: Blob): Promise<Blob> {
71+
let bytes = await blob.arrayBuffer();
72+
return new Blob([bytes], {type: blob.type});
73+
}
6974
}

src/core.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ export let RpcTarget = workersModule ? workersModule.RpcTarget : class {};
3838
export type PropertyPath = (string | number)[];
3939

4040
type TypeForRpc = "unsupported" | "primitive" | "object" | "function" | "array" | "date" |
41-
"bigint" | "bytes" | "stub" | "rpc-promise" | "rpc-target" | "rpc-thenable" | "error" |
42-
"undefined" | "writable" | "readable" | "headers" | "request" | "response";
41+
"bigint" | "bytes" | "blob" | "stub" | "rpc-promise" | "rpc-target" | "rpc-thenable" |
42+
"error" | "undefined" | "writable" | "readable" | "headers" | "request" | "response";
4343

4444
const AsyncFunction = (async function () {}).constructor;
4545

@@ -48,6 +48,9 @@ const AsyncFunction = (async function () {}).constructor;
4848
let BUFFER_PROTOTYPE: object | undefined =
4949
typeof Buffer !== "undefined" ? Buffer.prototype : undefined;
5050

51+
// Blob is available in every runtime we support (Node >=18, browsers, workerd).
52+
const BLOB_PROTOTYPE = Blob.prototype;
53+
5154
export function typeForRpc(value: unknown): TypeForRpc {
5255
switch (typeof value) {
5356
case "boolean":
@@ -112,6 +115,9 @@ export function typeForRpc(value: unknown): TypeForRpc {
112115
case Response.prototype:
113116
return "response";
114117

118+
case BLOB_PROTOTYPE:
119+
return "blob";
120+
115121
// TODO: All other structured clone types.
116122

117123
case RpcStub.prototype:
@@ -947,6 +953,7 @@ export class RpcPayload {
947953
case "bigint":
948954
case "date":
949955
case "bytes":
956+
case "blob":
950957
case "error":
951958
case "undefined":
952959
// immutable, no need to copy
@@ -1271,6 +1278,7 @@ export class RpcPayload {
12711278
case "primitive":
12721279
case "bigint":
12731280
case "bytes":
1281+
case "blob":
12741282
case "date":
12751283
case "error":
12761284
case "undefined":
@@ -1409,6 +1417,7 @@ export class RpcPayload {
14091417
case "primitive":
14101418
case "bigint":
14111419
case "bytes":
1420+
case "blob":
14121421
case "date":
14131422
case "error":
14141423
case "undefined":
@@ -1566,6 +1575,7 @@ function followPath(value: unknown, parent: object | undefined,
15661575
case "primitive":
15671576
case "bigint":
15681577
case "bytes":
1578+
case "blob":
15691579
case "date":
15701580
case "error":
15711581
case "headers":

0 commit comments

Comments
 (0)