Skip to content

Commit a08d1a8

Browse files
committed
feat: add Blob serialization support over RPC
Encode Blob as ["blob", contentType, ["readable", pipeId]] on the wire, mirroring the existing Firefox Request body technique so devaluateImpl() stays synchronous and e-order is preserved. On the receive side, bytes are collected from the pipe stream via a new LocatedBlobPromise mechanism in RpcPayload.deliverTo(), substituting the real Blob before user code runs — analogous to how LocatedPromise handles RpcPromise substitution. ## Core changes - src/core.ts: add "blob" TypeForRpc, BLOB_PROTOTYPE detection, LocatedBlobPromise type, RpcPayload blobPromises field, forEvaluate(), deliverTo(), dispose(), deepCopy(), disposeImpl(), ignoreUnhandledRejectionsImpl(), and followPath() coverage - src/serialize.ts: streamToBlob() helper, Devaluator encode case, Evaluator decode case with sync fast-path for bytes/string content and async LocatedBlobPromise path for ReadableStream content - src/types.d.ts: add Blob to BaseType - .changeset/blob-rpc-support.md: minor bump changeset ## Tests - blob serialization: decode-only unit tests via deserialize() covering bytes content, string content, empty blob, malformed wire values, and unsupported content expression types - Blob over RPC: full round-trip tests including binary data, MIME type preservation, empty blob, compound return values, multiple blobs in one call, blob array returns, no MIME type, all 256 byte values, 1 MB large blob, loopback (local) stub, blob alongside ReadableStream, dispose without reading, and an e-order regression test verifying that blob call messages are dispatched synchronously
1 parent cfa1b95 commit a08d1a8

6 files changed

Lines changed: 401 additions & 6 deletions

File tree

.changeset/blob-rpc-support.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"capnweb": minor
3+
---
4+
5+
Add `Blob` as a serializable type over RPC. `Blob` objects can now be passed as call arguments and return values. The MIME type (`blob.type`) is preserved across the wire. `File` is not yet supported.

__tests__/index.test.ts

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,56 @@ describe("simple serialization", () => {
173173

174174
// =======================================================================================
175175

176+
describe("blob serialization", () => {
177+
it("can deserialize blob with bytes content", async () => {
178+
// "hello!" base64-encoded without padding
179+
let blob = deserialize('["blob","text/plain",["bytes","aGVsbG8h"]]') as Blob;
180+
expect(blob).toBeInstanceOf(Blob);
181+
expect(blob.type).toBe("text/plain");
182+
expect(await blob.text()).toBe("hello!");
183+
});
184+
185+
it("can deserialize blob with string content", async () => {
186+
let blob = deserialize('["blob","text/html","<b>hi</b>"]') as Blob;
187+
expect(blob).toBeInstanceOf(Blob);
188+
expect(blob.type).toBe("text/html");
189+
expect(await blob.text()).toBe("<b>hi</b>");
190+
});
191+
192+
it("can deserialize empty blob", async () => {
193+
let blob = deserialize('["blob","application/octet-stream",["bytes",""]]') as Blob;
194+
expect(blob).toBeInstanceOf(Blob);
195+
expect(blob.size).toBe(0);
196+
expect(blob.type).toBe("application/octet-stream");
197+
});
198+
199+
it("rejects malformed blob wire values", () => {
200+
expect(() => deserialize('["blob"]')).toThrowError();
201+
expect(() => deserialize('["blob",123,["bytes",""]]')).toThrowError();
202+
// Missing content argument (length < 3)
203+
expect(() => deserialize('["blob","text/plain"]')).toThrowError();
204+
});
205+
206+
it("rejects blob with unsupported content expression type", () => {
207+
// Content expression evaluates to a Date — not string, Uint8Array, or ReadableStream.
208+
expect(() => deserialize('["blob","text/plain",["date",12345]]')).toThrowError(
209+
/unknown special value/
210+
);
211+
// Content expression evaluates to a number.
212+
expect(() => deserialize('["blob","text/plain",42]')).toThrowError(
213+
/unknown special value/
214+
);
215+
});
216+
217+
it("throws when serializing Blob without an RPC session", () => {
218+
if (typeof Blob === "undefined") return; // skip if Blob not available
219+
let blob = new Blob(["hello"], {type: "text/plain"});
220+
expect(() => serialize(blob)).toThrowError("Cannot create pipes without an RPC session");
221+
});
222+
});
223+
224+
// =======================================================================================
225+
176226
class TestTransport implements RpcTransport {
177227
constructor(public name: string, private partner?: TestTransport) {
178228
if (partner) {
@@ -2444,3 +2494,238 @@ describe("Fetch API types over RPC", () => {
24442494
expect(result.hasBody).toBe(false);
24452495
});
24462496
});
2497+
2498+
// =======================================================================================
2499+
2500+
describe("Blob over RPC", () => {
2501+
it("can send and receive a binary Blob", async () => {
2502+
await using harness = new TestHarness(new TestTarget());
2503+
let bytes = new TextEncoder().encode("hello from blob");
2504+
let blob = new Blob([bytes], {type: "application/octet-stream"});
2505+
using result = await harness.stub.echoBlob(blob);
2506+
expect(result).toBeInstanceOf(Blob);
2507+
expect(result.type).toBe("application/octet-stream");
2508+
expect(new Uint8Array(await result.arrayBuffer())).toStrictEqual(bytes);
2509+
});
2510+
2511+
it("preserves Blob MIME type", async () => {
2512+
await using harness = new TestHarness(new TestTarget());
2513+
let blob = new Blob(["<h1>hello</h1>"], {type: "text/html; charset=utf-8"});
2514+
using result = await harness.stub.echoBlob(blob);
2515+
expect(result.type).toBe("text/html; charset=utf-8");
2516+
expect(await result.text()).toBe("<h1>hello</h1>");
2517+
});
2518+
2519+
it("can send an empty Blob", async () => {
2520+
await using harness = new TestHarness(new TestTarget());
2521+
let blob = new Blob([], {type: "application/octet-stream"});
2522+
using result = await harness.stub.echoBlob(blob);
2523+
expect(result).toBeInstanceOf(Blob);
2524+
expect(result.size).toBe(0);
2525+
expect(result.type).toBe("application/octet-stream");
2526+
});
2527+
2528+
it("can send a Blob as part of a compound return value", async () => {
2529+
class BlobServer extends RpcTarget {
2530+
makePayload() {
2531+
return {
2532+
name: "test.txt",
2533+
blob: new Blob(["file content"], {type: "text/plain"}),
2534+
size: 12,
2535+
};
2536+
}
2537+
}
2538+
2539+
await using harness = new TestHarness(new BlobServer());
2540+
let stub = harness.stub as any;
2541+
let result = await stub.makePayload();
2542+
expect(result.name).toBe("test.txt");
2543+
expect(result.blob).toBeInstanceOf(Blob);
2544+
expect(result.blob.type).toBe("text/plain");
2545+
expect(await result.blob.text()).toBe("file content");
2546+
expect(result.size).toBe(12);
2547+
});
2548+
2549+
it("can send multiple Blobs in the same call", async () => {
2550+
// Exercises the LocatedBlobPromise array with more than one entry — the deliverTo()
2551+
// loop must resolve all pending blob assemblies before dispatching to user code.
2552+
class BlobCombiner extends RpcTarget {
2553+
async concatenate(a: Blob, b: Blob) {
2554+
let [textA, textB] = await Promise.all([a.text(), b.text()]);
2555+
return `${textA}|${textB}`;
2556+
}
2557+
}
2558+
2559+
await using harness = new TestHarness(new BlobCombiner());
2560+
let stub = harness.stub as any;
2561+
let result = await stub.concatenate(
2562+
new Blob(["hello"], {type: "text/plain"}),
2563+
new Blob(["world"], {type: "text/plain"}),
2564+
);
2565+
expect(result).toBe("hello|world");
2566+
});
2567+
2568+
it("can receive an array of Blobs in one return value", async () => {
2569+
// Multiple LocatedBlobPromise entries produced from a single return value.
2570+
class BlobFactory extends RpcTarget {
2571+
makeBlobs() {
2572+
return [
2573+
new Blob(["first"], {type: "text/plain"}),
2574+
new Blob(["second"], {type: "text/plain"}),
2575+
new Blob(["third"], {type: "text/plain"}),
2576+
];
2577+
}
2578+
}
2579+
2580+
await using harness = new TestHarness(new BlobFactory());
2581+
let stub = harness.stub as any;
2582+
let [b1, b2, b3] = await stub.makeBlobs();
2583+
expect(await b1.text()).toBe("first");
2584+
expect(await b2.text()).toBe("second");
2585+
expect(await b3.text()).toBe("third");
2586+
});
2587+
2588+
it("round-trips a Blob with no MIME type", async () => {
2589+
// new Blob([bytes]) leaves .type as "" — the empty string must survive the round-trip
2590+
// and not become undefined or null.
2591+
await using harness = new TestHarness(new TestTarget());
2592+
let bytes = new TextEncoder().encode("untyped content");
2593+
let blob = new Blob([bytes]);
2594+
expect(blob.type).toBe("");
2595+
using result = await harness.stub.echoBlob(blob);
2596+
expect(result.type).toBe("");
2597+
expect(new Uint8Array(await result.arrayBuffer())).toStrictEqual(bytes);
2598+
});
2599+
2600+
it("preserves every possible byte value through the pipe", async () => {
2601+
// All 256 possible byte values in a single Blob — verifies the pipe mechanism
2602+
// neither corrupts nor truncates any byte.
2603+
await using harness = new TestHarness(new TestTarget());
2604+
let bytes = new Uint8Array(256);
2605+
for (let i = 0; i < 256; i++) bytes[i] = i;
2606+
let blob = new Blob([bytes], {type: "application/octet-stream"});
2607+
using result = await harness.stub.echoBlob(blob);
2608+
expect(result.size).toBe(256);
2609+
expect(new Uint8Array(await result.arrayBuffer())).toStrictEqual(bytes);
2610+
});
2611+
2612+
it("can send a large Blob over RPC", async () => {
2613+
// 1 MB blob — exercises multi-chunk stream collection in streamToBlob().
2614+
// Timeout is raised because CI machines can be slow to pump 1 MB through the
2615+
// fake in-process transport (default 5 s is too tight on some runners).
2616+
// Skipped in workerd: the isolate drops its connection when a large in-process
2617+
// stream is pumped through it (infrastructure limit, not a code bug).
2618+
if (navigator.userAgent === "Cloudflare-Workers") return;
2619+
await using harness = new TestHarness(new TestTarget());
2620+
let size = 1024 * 1024;
2621+
let bytes = new Uint8Array(size);
2622+
for (let i = 0; i < size; i++) bytes[i] = i & 0xff;
2623+
let blob = new Blob([bytes], {type: "application/octet-stream"});
2624+
using result = await harness.stub.echoBlob(blob);
2625+
expect(result.size).toBe(size);
2626+
expect(new Uint8Array(await result.arrayBuffer())).toStrictEqual(bytes);
2627+
}, 30_000);
2628+
2629+
it("can pass a Blob through a local (loopback) stub", async () => {
2630+
// No network — payload goes through deepCopy() rather than the Evaluator.
2631+
// Blobs are immutable so deepCopy() returns them as-is; no blobPromises involved.
2632+
using stub = new RpcStub(new TestTarget());
2633+
let bytes = new TextEncoder().encode("loopback content");
2634+
let blob = new Blob([bytes], {type: "text/plain"});
2635+
let result = await stub.echoBlob(blob);
2636+
expect(result).toBeInstanceOf(Blob);
2637+
expect(result.type).toBe("text/plain");
2638+
expect(await result.text()).toBe("loopback content");
2639+
result[Symbol.dispose]();
2640+
});
2641+
2642+
it("can send a Blob alongside a ReadableStream in the same call", async () => {
2643+
// Both types use the pipe mechanism. Tests that blobPromises processing in
2644+
// deliverTo() does not interfere with ReadableStream hooks.
2645+
class Server extends RpcTarget {
2646+
async receiveAll(blob: Blob, stream: ReadableStream<string>) {
2647+
let reader = stream.getReader();
2648+
let chunks: string[] = [];
2649+
for (;;) {
2650+
let {done, value} = await reader.read();
2651+
if (done) break;
2652+
chunks.push(value);
2653+
}
2654+
return `${await blob.text()}+${chunks.join(",")}`;
2655+
}
2656+
}
2657+
2658+
let stream = new ReadableStream<string>({
2659+
start(controller) {
2660+
controller.enqueue("a");
2661+
controller.enqueue("b");
2662+
controller.close();
2663+
}
2664+
});
2665+
2666+
await using harness = new TestHarness(new Server());
2667+
let stub = harness.stub as any;
2668+
let result = await stub.receiveAll(new Blob(["hello"], {type: "text/plain"}), stream);
2669+
expect(result).toBe("hello+a,b");
2670+
});
2671+
2672+
it("disposing a result containing a Blob does not throw", async () => {
2673+
// Blobs have no owned resources; disposeImpl() must be a silent no-op.
2674+
class BlobServer extends RpcTarget {
2675+
makeBlob() { return new Blob(["hello"], {type: "text/plain"}); }
2676+
}
2677+
2678+
await using harness = new TestHarness(new BlobServer());
2679+
let stub = harness.stub as any;
2680+
let result = await stub.makeBlob();
2681+
expect(result).toBeInstanceOf(Blob);
2682+
// Dispose without reading — should never throw.
2683+
expect(() => result[Symbol.dispose]()).not.toThrow();
2684+
});
2685+
2686+
it("blob call messages are dispatched synchronously (e-order)", async () => {
2687+
// Core regression test for the Approach-B design decision: the ["push"] wire message
2688+
// for a blob-containing call must be dispatched in the same microtask turn as surrounding
2689+
// calls, not deferred until blob.arrayBuffer() resolves.
2690+
//
2691+
// We fence the server transport so messages accumulate without being processed, then
2692+
// verify all three call messages are already in the queue before any await.
2693+
class OrderServer extends RpcTarget {
2694+
mark(n: number) { return n; }
2695+
acceptBlob(n: number, _blob: Blob) { return n; }
2696+
}
2697+
2698+
let clientTransport = new TestTransport("client");
2699+
let serverTransport = new TestTransport("server", clientTransport);
2700+
2701+
let client = new RpcSession<OrderServer>(clientTransport);
2702+
let server = new RpcSession(serverTransport, new OrderServer());
2703+
2704+
// Prevent the server from processing any messages until we release the fence.
2705+
serverTransport.fence();
2706+
2707+
let stub = client.getRemoteMain();
2708+
let blob = new Blob([new Uint8Array(64)], {type: "application/octet-stream"});
2709+
2710+
// Dispatch all three calls synchronously in the same microtask turn.
2711+
let p1 = stub.mark(1);
2712+
let p2 = stub.acceptBlob(2, blob);
2713+
let p3 = stub.mark(3);
2714+
2715+
// TestTransport.send() has no await so queue.push() is synchronous — no microtask
2716+
// yield needed. But we yield once to be safe.
2717+
await Promise.resolve();
2718+
2719+
// Queue must contain at least 4 messages: ["push" mark 1], ["pipe"], ["push" blob call],
2720+
// ["push" mark 3]. If blob dispatch were deferred (Approach A without sync plumbing),
2721+
// the blob's ["push"] would be missing here.
2722+
expect(serverTransport.pendingCount).toBeGreaterThanOrEqual(4);
2723+
2724+
serverTransport.releaseFence();
2725+
await Promise.all([p1, p2, p3]);
2726+
2727+
// Dispose the main stub to release the import table entry.
2728+
stub[Symbol.dispose]();
2729+
await pumpMicrotasks();
2730+
});
2731+
});

__tests__/test-util.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,9 @@ export class TestTarget extends RpcTarget {
6666
returnNull() { return null; }
6767
returnUndefined() { return undefined; }
6868
returnNumber(i: number) { return i; }
69+
70+
async echoBlob(blob: Blob): Promise<Blob> {
71+
let bytes = await blob.arrayBuffer();
72+
return new Blob([bytes], {type: blob.type});
73+
}
6974
}

0 commit comments

Comments
 (0)