From 9a6156e82231d00050cf523aa4e4f855d66f90a6 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 16 Apr 2026 15:13:27 -0700 Subject: [PATCH] Add CBOR queue transport helpers for spec v3 compatibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Workflow SDK runs at spec version 3 (see vercel/workflow#1627) carry a `runInput` on the first queue delivery whose `input` field is a `Uint8Array`. JSON serialization does not round-trip `Uint8Array`, so worlds that want to opt in to `SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT` need to serialize queue payloads with CBOR instead. This adds `CborTransport` + `DualTransport` helpers to each world package (turso, mongodb, redis) so the queue transport can be switched over without changing the public API. The Redis helper additionally provides a small `encodeForBullMq`/`decodeFromBullMq` wrapper because BullMQ stores `job.data` as a JSON value and needs the CBOR payload base64-wrapped to survive its internal serialization. Nothing is wired in yet — the follow-up work is described in the PR description and includes bumping `@workflow/world`, migrating to the restructured streams interface, wiring these transports into the queue and HTTP handler, and declaring `specVersion: 3` on each world. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/cbor-transport-mongodb.md | 5 ++ .changeset/cbor-transport-redis.md | 5 ++ .changeset/cbor-transport-turso.md | 5 ++ packages/mongodb/package.json | 1 + packages/mongodb/src/cbor-transport.ts | 72 ++++++++++++++++ packages/redis/package.json | 1 + packages/redis/src/cbor-transport.ts | 109 +++++++++++++++++++++++++ packages/turso/src/cbor-transport.ts | 72 ++++++++++++++++ pnpm-lock.yaml | 6 ++ 9 files changed, 276 insertions(+) create mode 100644 .changeset/cbor-transport-mongodb.md create mode 100644 .changeset/cbor-transport-redis.md create mode 100644 .changeset/cbor-transport-turso.md create mode 100644 packages/mongodb/src/cbor-transport.ts create mode 100644 packages/redis/src/cbor-transport.ts create mode 100644 packages/turso/src/cbor-transport.ts diff --git a/.changeset/cbor-transport-mongodb.md b/.changeset/cbor-transport-mongodb.md new file mode 100644 index 0000000..2866110 --- /dev/null +++ b/.changeset/cbor-transport-mongodb.md @@ -0,0 +1,5 @@ +--- +"@workflow-worlds/mongodb": patch +--- + +Add `CborTransport` and `DualTransport` helpers (`src/cbor-transport.ts`) and `cbor-x` dependency for use with Workflow SDK spec version 3 (CBOR queue transport). Not yet wired into the default queue — see the upstream PR description for the migration checklist. diff --git a/.changeset/cbor-transport-redis.md b/.changeset/cbor-transport-redis.md new file mode 100644 index 0000000..4c4cdb8 --- /dev/null +++ b/.changeset/cbor-transport-redis.md @@ -0,0 +1,5 @@ +--- +"@workflow-worlds/redis": patch +--- + +Add `CborTransport`/`DualTransport` and BullMQ wrapping helpers (`src/cbor-transport.ts`) and the `@vercel/queue` dependency for use with Workflow SDK spec version 3 (CBOR queue transport). Not yet wired into the default queue — see the upstream PR description for the migration checklist. diff --git a/.changeset/cbor-transport-turso.md b/.changeset/cbor-transport-turso.md new file mode 100644 index 0000000..8cf622f --- /dev/null +++ b/.changeset/cbor-transport-turso.md @@ -0,0 +1,5 @@ +--- +"@workflow-worlds/turso": patch +--- + +Add `CborTransport` and `DualTransport` helpers (`src/cbor-transport.ts`) for use with Workflow SDK spec version 3 (CBOR queue transport). Not yet wired into the default queue — see the upstream PR description for the migration checklist. diff --git a/packages/mongodb/package.json b/packages/mongodb/package.json index d9e131f..b9534cc 100644 --- a/packages/mongodb/package.json +++ b/packages/mongodb/package.json @@ -32,6 +32,7 @@ "@workflow/errors": "4.1.0-beta.14", "@workflow/world": "4.1.0-beta.2", "@vercel/queue": "^0.0.0-alpha.29", + "cbor-x": "1.6.0", "mongodb": "^6.0.0", "ulid": "^2.3.0", "zod": "^4.1.11" diff --git a/packages/mongodb/src/cbor-transport.ts b/packages/mongodb/src/cbor-transport.ts new file mode 100644 index 0000000..c77022d --- /dev/null +++ b/packages/mongodb/src/cbor-transport.ts @@ -0,0 +1,72 @@ +/** + * CBOR-based queue transport utilities. + * + * Workflow SDK runs at spec version 3+ carry a `runInput` on the first queue + * delivery whose `input` field is a `Uint8Array`. JSON serialization does not + * round-trip `Uint8Array` values, so any world that wants to opt in to + * `SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT` must serialize queue payloads + * with CBOR instead. + * + * These classes mirror the pattern used in `@workflow/world-vercel`: + * - `CborTransport` always encodes/decodes CBOR (for new outgoing messages). + * - `DualTransport` decodes CBOR first and falls back to JSON, so handlers keep + * accepting any JSON-encoded messages that were in-flight when the transport + * was switched over. + * + * Wire them into `queue.ts` by replacing `new JsonTransport()` with + * `new CborTransport()` for the encode path and `new DualTransport()` for the + * HTTP handler decode path. When `opts.specVersion < 3` is passed to `queue()`, + * fall back to JSON so older runs continue to use the format they were + * created with. + */ + +import type { Transport } from '@vercel/queue'; +import { decode, encode } from 'cbor-x'; + +export class CborTransport implements Transport { + readonly contentType = 'application/cbor'; + + serialize(value: unknown): Buffer { + return Buffer.from(encode(value)); + } + + async deserialize(stream: ReadableStream): Promise { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + return decode(Buffer.concat(chunks)); + } +} + +/** + * Dual-mode decoder: try CBOR first, fall back to JSON if that fails. + * Use this on the receive side while the sender is still transitioning, or to + * keep compatibility with JSON-encoded messages that were already queued. + */ +export class DualTransport implements Transport { + readonly contentType = 'application/cbor'; + + serialize(value: unknown): Buffer { + return Buffer.from(encode(value)); + } + + async deserialize(stream: ReadableStream): Promise { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + const buffer = Buffer.concat(chunks); + try { + return decode(buffer); + } catch { + return JSON.parse(buffer.toString('utf8')); + } + } +} diff --git a/packages/redis/package.json b/packages/redis/package.json index 46650a1..704856e 100644 --- a/packages/redis/package.json +++ b/packages/redis/package.json @@ -34,6 +34,7 @@ "dependencies": { "@workflow/errors": "4.1.0-beta.14", "@workflow/world": "4.1.0-beta.2", + "@vercel/queue": "^0.0.0-alpha.29", "bullmq": "^5.34.0", "cbor-x": "1.6.0", "ioredis": "^5.4.2", diff --git a/packages/redis/src/cbor-transport.ts b/packages/redis/src/cbor-transport.ts new file mode 100644 index 0000000..9c5b2d3 --- /dev/null +++ b/packages/redis/src/cbor-transport.ts @@ -0,0 +1,109 @@ +/** + * CBOR-based queue transport utilities. + * + * Workflow SDK runs at spec version 3+ carry a `runInput` on the first queue + * delivery whose `input` field is a `Uint8Array`. JSON serialization does not + * round-trip `Uint8Array` values, so any world that wants to opt in to + * `SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT` must serialize queue payloads + * with CBOR instead. BullMQ stores `job.data` as a JSON value, so we CBOR-encode + * the payload to a base64 string before queueing and decode it in the worker + * processor before POSTing to the workflow HTTP endpoint. + * + * Wire into `queue.ts`: + * - On the send path, wrap `message` with `encodeForBullMq()` before passing it + * to `queue.add()`, and update the worker processor to run `decodeFromBullMq()` + * on `job.data` before forwarding it. + * - On the HTTP send, serialize with `CborTransport` and set + * `content-type: application/cbor`. + * - On `createQueueHandler`, use `DualTransport` to decode the request body + * (CBOR-first, JSON fallback during rollout). + * - When `opts.specVersion < 3` is passed, keep using the JSON path so older + * runs continue to use the format they were created with. + */ + +import type { Transport } from '@vercel/queue'; +import { decode, encode } from 'cbor-x'; + +/** Prefix on base64 strings stored in BullMQ jobs to mark them as CBOR payloads. */ +export const CBOR_BULLMQ_PREFIX = 'cbor:'; + +/** + * Encode a payload for storage inside BullMQ `job.data`. Returns a discriminated + * union so the worker can recognize CBOR payloads and decode them back to a + * structured object before forwarding. + */ +export function encodeForBullMq(value: unknown): { __cbor: string } { + const buffer = encode(value); + return { + __cbor: CBOR_BULLMQ_PREFIX + Buffer.from(buffer).toString('base64'), + }; +} + +/** + * Decode a payload written by {@link encodeForBullMq}. If the data is not a + * CBOR-wrapped object (e.g., an older JSON payload from before the transition), + * the value is returned unchanged. + */ +export function decodeFromBullMq(data: unknown): T { + if ( + data && + typeof data === 'object' && + '__cbor' in data && + typeof (data as { __cbor: unknown }).__cbor === 'string' + ) { + const encoded = (data as { __cbor: string }).__cbor; + const body = encoded.startsWith(CBOR_BULLMQ_PREFIX) + ? encoded.slice(CBOR_BULLMQ_PREFIX.length) + : encoded; + return decode(Buffer.from(body, 'base64')) as T; + } + return data as T; +} + +export class CborTransport implements Transport { + readonly contentType = 'application/cbor'; + + serialize(value: unknown): Buffer { + return Buffer.from(encode(value)); + } + + async deserialize(stream: ReadableStream): Promise { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + return decode(Buffer.concat(chunks)); + } +} + +/** + * Dual-mode decoder: try CBOR first, fall back to JSON if that fails. + * Use this on the receive side while the sender is still transitioning, or to + * keep compatibility with JSON-encoded messages that were already queued. + */ +export class DualTransport implements Transport { + readonly contentType = 'application/cbor'; + + serialize(value: unknown): Buffer { + return Buffer.from(encode(value)); + } + + async deserialize(stream: ReadableStream): Promise { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + const buffer = Buffer.concat(chunks); + try { + return decode(buffer); + } catch { + return JSON.parse(buffer.toString('utf8')); + } + } +} diff --git a/packages/turso/src/cbor-transport.ts b/packages/turso/src/cbor-transport.ts new file mode 100644 index 0000000..c77022d --- /dev/null +++ b/packages/turso/src/cbor-transport.ts @@ -0,0 +1,72 @@ +/** + * CBOR-based queue transport utilities. + * + * Workflow SDK runs at spec version 3+ carry a `runInput` on the first queue + * delivery whose `input` field is a `Uint8Array`. JSON serialization does not + * round-trip `Uint8Array` values, so any world that wants to opt in to + * `SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT` must serialize queue payloads + * with CBOR instead. + * + * These classes mirror the pattern used in `@workflow/world-vercel`: + * - `CborTransport` always encodes/decodes CBOR (for new outgoing messages). + * - `DualTransport` decodes CBOR first and falls back to JSON, so handlers keep + * accepting any JSON-encoded messages that were in-flight when the transport + * was switched over. + * + * Wire them into `queue.ts` by replacing `new JsonTransport()` with + * `new CborTransport()` for the encode path and `new DualTransport()` for the + * HTTP handler decode path. When `opts.specVersion < 3` is passed to `queue()`, + * fall back to JSON so older runs continue to use the format they were + * created with. + */ + +import type { Transport } from '@vercel/queue'; +import { decode, encode } from 'cbor-x'; + +export class CborTransport implements Transport { + readonly contentType = 'application/cbor'; + + serialize(value: unknown): Buffer { + return Buffer.from(encode(value)); + } + + async deserialize(stream: ReadableStream): Promise { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + return decode(Buffer.concat(chunks)); + } +} + +/** + * Dual-mode decoder: try CBOR first, fall back to JSON if that fails. + * Use this on the receive side while the sender is still transitioning, or to + * keep compatibility with JSON-encoded messages that were already queued. + */ +export class DualTransport implements Transport { + readonly contentType = 'application/cbor'; + + serialize(value: unknown): Buffer { + return Buffer.from(encode(value)); + } + + async deserialize(stream: ReadableStream): Promise { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value) chunks.push(value); + } + const buffer = Buffer.concat(chunks); + try { + return decode(buffer); + } catch { + return JSON.parse(buffer.toString('utf8')); + } + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 11267e1..419dedb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -29,6 +29,9 @@ importers: '@workflow/world': specifier: 4.1.0-beta.2 version: 4.1.0-beta.2(zod@4.1.13) + cbor-x: + specifier: 1.6.0 + version: 1.6.0 mongodb: specifier: ^6.0.0 version: 6.21.0 @@ -60,6 +63,9 @@ importers: packages/redis: dependencies: + '@vercel/queue': + specifier: ^0.0.0-alpha.29 + version: 0.0.0-alpha.36 '@workflow/errors': specifier: 4.1.0-beta.14 version: 4.1.0-beta.14