Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cbor-transport-mongodb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow-worlds/mongodb": patch
---

Add `CborTransport` and `DualTransport` helpers (`src/cbor-transport.ts`) and `cbor-x` dependency for use with Workflow SDK spec version 3 (CBOR queue transport). Not yet wired into the default queue — see the upstream PR description for the migration checklist.
5 changes: 5 additions & 0 deletions .changeset/cbor-transport-redis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow-worlds/redis": patch
---

Add `CborTransport`/`DualTransport` and BullMQ wrapping helpers (`src/cbor-transport.ts`) and the `@vercel/queue` dependency for use with Workflow SDK spec version 3 (CBOR queue transport). Not yet wired into the default queue — see the upstream PR description for the migration checklist.
5 changes: 5 additions & 0 deletions .changeset/cbor-transport-turso.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@workflow-worlds/turso": patch
---

Add `CborTransport` and `DualTransport` helpers (`src/cbor-transport.ts`) for use with Workflow SDK spec version 3 (CBOR queue transport). Not yet wired into the default queue — see the upstream PR description for the migration checklist.
1 change: 1 addition & 0 deletions packages/mongodb/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"@workflow/errors": "4.1.0-beta.14",
"@workflow/world": "4.1.0-beta.2",
"@vercel/queue": "^0.0.0-alpha.29",
"cbor-x": "1.6.0",
"mongodb": "^6.0.0",
"ulid": "^2.3.0",
"zod": "^4.1.11"
Expand Down
72 changes: 72 additions & 0 deletions packages/mongodb/src/cbor-transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* CBOR-based queue transport utilities.
*
* Workflow SDK runs at spec version 3+ carry a `runInput` on the first queue
* delivery whose `input` field is a `Uint8Array`. JSON serialization does not
* round-trip `Uint8Array` values, so any world that wants to opt in to
* `SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT` must serialize queue payloads
* with CBOR instead.
*
* These classes mirror the pattern used in `@workflow/world-vercel`:
* - `CborTransport` always encodes/decodes CBOR (for new outgoing messages).
* - `DualTransport` decodes CBOR first and falls back to JSON, so handlers keep
* accepting any JSON-encoded messages that were in-flight when the transport
* was switched over.
*
* Wire them into `queue.ts` by replacing `new JsonTransport()` with
* `new CborTransport()` for the encode path and `new DualTransport()` for the
* HTTP handler decode path. When `opts.specVersion < 3` is passed to `queue()`,
* fall back to JSON so older runs continue to use the format they were
* created with.
*/

import type { Transport } from '@vercel/queue';
import { decode, encode } from 'cbor-x';

export class CborTransport implements Transport<unknown> {
readonly contentType = 'application/cbor';

serialize(value: unknown): Buffer {
return Buffer.from(encode(value));
}

async deserialize(stream: ReadableStream<Uint8Array>): Promise<unknown> {
const chunks: Uint8Array[] = [];
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) chunks.push(value);
}
return decode(Buffer.concat(chunks));
}
}

/**
* Dual-mode decoder: try CBOR first, fall back to JSON if that fails.
* Use this on the receive side while the sender is still transitioning, or to
* keep compatibility with JSON-encoded messages that were already queued.
*/
export class DualTransport implements Transport<unknown> {
readonly contentType = 'application/cbor';

serialize(value: unknown): Buffer {
return Buffer.from(encode(value));
}

async deserialize(stream: ReadableStream<Uint8Array>): Promise<unknown> {
const chunks: Uint8Array[] = [];
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) chunks.push(value);
}
const buffer = Buffer.concat(chunks);
try {
return decode(buffer);
} catch {
return JSON.parse(buffer.toString('utf8'));
}
}
}
1 change: 1 addition & 0 deletions packages/redis/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"dependencies": {
"@workflow/errors": "4.1.0-beta.14",
"@workflow/world": "4.1.0-beta.2",
"@vercel/queue": "^0.0.0-alpha.29",
"bullmq": "^5.34.0",
"cbor-x": "1.6.0",
"ioredis": "^5.4.2",
Expand Down
109 changes: 109 additions & 0 deletions packages/redis/src/cbor-transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* CBOR-based queue transport utilities.
*
* Workflow SDK runs at spec version 3+ carry a `runInput` on the first queue
* delivery whose `input` field is a `Uint8Array`. JSON serialization does not
* round-trip `Uint8Array` values, so any world that wants to opt in to
* `SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT` must serialize queue payloads
* with CBOR instead. BullMQ stores `job.data` as a JSON value, so we CBOR-encode
* the payload to a base64 string before queueing and decode it in the worker
* processor before POSTing to the workflow HTTP endpoint.
*
* Wire into `queue.ts`:
* - On the send path, wrap `message` with `encodeForBullMq()` before passing it
* to `queue.add()`, and update the worker processor to run `decodeFromBullMq()`
* on `job.data` before forwarding it.
* - On the HTTP send, serialize with `CborTransport` and set
* `content-type: application/cbor`.
* - On `createQueueHandler`, use `DualTransport` to decode the request body
* (CBOR-first, JSON fallback during rollout).
* - When `opts.specVersion < 3` is passed, keep using the JSON path so older
* runs continue to use the format they were created with.
*/

import type { Transport } from '@vercel/queue';
import { decode, encode } from 'cbor-x';

/** Prefix on base64 strings stored in BullMQ jobs to mark them as CBOR payloads. */
export const CBOR_BULLMQ_PREFIX = 'cbor:';

/**
* Encode a payload for storage inside BullMQ `job.data`. Returns a discriminated
* union so the worker can recognize CBOR payloads and decode them back to a
* structured object before forwarding.
*/
export function encodeForBullMq(value: unknown): { __cbor: string } {
const buffer = encode(value);
return {
__cbor: CBOR_BULLMQ_PREFIX + Buffer.from(buffer).toString('base64'),
};
}

/**
* Decode a payload written by {@link encodeForBullMq}. If the data is not a
* CBOR-wrapped object (e.g., an older JSON payload from before the transition),
* the value is returned unchanged.
*/
export function decodeFromBullMq<T = unknown>(data: unknown): T {
if (
data &&
typeof data === 'object' &&
'__cbor' in data &&
typeof (data as { __cbor: unknown }).__cbor === 'string'
) {
const encoded = (data as { __cbor: string }).__cbor;
const body = encoded.startsWith(CBOR_BULLMQ_PREFIX)
? encoded.slice(CBOR_BULLMQ_PREFIX.length)
: encoded;
return decode(Buffer.from(body, 'base64')) as T;
}
return data as T;
}

export class CborTransport implements Transport<unknown> {
readonly contentType = 'application/cbor';

serialize(value: unknown): Buffer {
return Buffer.from(encode(value));
}

async deserialize(stream: ReadableStream<Uint8Array>): Promise<unknown> {
const chunks: Uint8Array[] = [];
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) chunks.push(value);
}
return decode(Buffer.concat(chunks));
}
}

/**
* Dual-mode decoder: try CBOR first, fall back to JSON if that fails.
* Use this on the receive side while the sender is still transitioning, or to
* keep compatibility with JSON-encoded messages that were already queued.
*/
export class DualTransport implements Transport<unknown> {
readonly contentType = 'application/cbor';

serialize(value: unknown): Buffer {
return Buffer.from(encode(value));
}

async deserialize(stream: ReadableStream<Uint8Array>): Promise<unknown> {
const chunks: Uint8Array[] = [];
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) chunks.push(value);
}
const buffer = Buffer.concat(chunks);
try {
return decode(buffer);
} catch {
return JSON.parse(buffer.toString('utf8'));
}
}
}
72 changes: 72 additions & 0 deletions packages/turso/src/cbor-transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* CBOR-based queue transport utilities.
*
* Workflow SDK runs at spec version 3+ carry a `runInput` on the first queue
* delivery whose `input` field is a `Uint8Array`. JSON serialization does not
* round-trip `Uint8Array` values, so any world that wants to opt in to
* `SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT` must serialize queue payloads
* with CBOR instead.
*
* These classes mirror the pattern used in `@workflow/world-vercel`:
* - `CborTransport` always encodes/decodes CBOR (for new outgoing messages).
* - `DualTransport` decodes CBOR first and falls back to JSON, so handlers keep
* accepting any JSON-encoded messages that were in-flight when the transport
* was switched over.
*
* Wire them into `queue.ts` by replacing `new JsonTransport()` with
* `new CborTransport()` for the encode path and `new DualTransport()` for the
* HTTP handler decode path. When `opts.specVersion < 3` is passed to `queue()`,
* fall back to JSON so older runs continue to use the format they were
* created with.
*/

import type { Transport } from '@vercel/queue';
import { decode, encode } from 'cbor-x';

export class CborTransport implements Transport<unknown> {
readonly contentType = 'application/cbor';

serialize(value: unknown): Buffer {
return Buffer.from(encode(value));
}

async deserialize(stream: ReadableStream<Uint8Array>): Promise<unknown> {
const chunks: Uint8Array[] = [];
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) chunks.push(value);
}
return decode(Buffer.concat(chunks));
}
}

/**
* Dual-mode decoder: try CBOR first, fall back to JSON if that fails.
* Use this on the receive side while the sender is still transitioning, or to
* keep compatibility with JSON-encoded messages that were already queued.
*/
export class DualTransport implements Transport<unknown> {
readonly contentType = 'application/cbor';

serialize(value: unknown): Buffer {
return Buffer.from(encode(value));
}

async deserialize(stream: ReadableStream<Uint8Array>): Promise<unknown> {
const chunks: Uint8Array[] = [];
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value) chunks.push(value);
}
const buffer = Buffer.concat(chunks);
try {
return decode(buffer);
} catch {
return JSON.parse(buffer.toString('utf8'));
}
}
}
6 changes: 6 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.