Skip to content

Add CBOR queue transport helpers for Workflow SDK spec v3#40

Draft
VaguelySerious wants to merge 1 commit intomizzle-dev:mainfrom
VaguelySerious:peter/cbor-queue-transport
Draft

Add CBOR queue transport helpers for Workflow SDK spec v3#40
VaguelySerious wants to merge 1 commit intomizzle-dev:mainfrom
VaguelySerious:peter/cbor-queue-transport

Conversation

@VaguelySerious
Copy link
Copy Markdown

Summary

Workflow SDK recently moved the `@workflow/world-vercel` queue transport from JSON to CBOR (vercel/workflow#1627). The move is gated behind `SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT` (spec version 3) so worlds that don't declare support stay on JSON.

This PR adds the CBOR transport building blocks to each world in this repo so they can eventually opt in. It intentionally does not flip any of the queues over — the full migration is larger than this and deserves careful review on a per-world basis.

What's in the PR

  • `packages/turso/src/cbor-transport.ts` — `CborTransport` + `DualTransport` (CBOR-first, JSON fallback) mirroring `@workflow/world-vercel`'s implementation.
  • `packages/mongodb/src/cbor-transport.ts` — same pair. Adds `cbor-x` as a dependency (mongodb didn't have it yet).
  • `packages/redis/src/cbor-transport.ts` — same pair plus `encodeForBullMq`/`decodeFromBullMq` helpers. BullMQ stores `job.data` as a JSON value, so the CBOR payload is wrapped as a base64 string on the way in and unwrapped in the processor before the HTTP POST. Adds `@vercel/queue` as a dependency to match the other packages.
  • Three changesets (`patch`) so these show up in the next release train.

Background from upstream

In short: nothing is breaking today because these worlds haven't declared `specVersion`. But they also can't opt in to resilient start (or any future CBOR-only feature) until a transport like this is wired up.

Why not wire it in here?

These packages currently pin `@workflow/world: 4.1.0-beta.2`, which is a pre-v5 shape (flat `writeToStream`/`readFromStream` on the `World`). `@workflow/world@5.x` restructures streams into a nested `streams.{ write, close, get, list, getChunks, getInfo }` interface and requires `runId` on reads. Migrating to v5 is a prerequisite for spec v3 and I didn't want to bundle that migration into this PR — it's significant enough to deserve its own review pass (and will break the current consumer story on 4.1.x betas).

Suggested follow-up checklist

For each world (turso, mongodb, redis):

  1. Bump `@workflow/world` to `^5.0.0-beta.1` (the first 5.x beta to export `SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT`) and corresponding `@workflow/errors` / `@workflow/world-testing` versions.
  2. Migrate the world to the new `streams.*` shape (`writeToStream(name, runId, chunk)` → `streams.write(runId, name, chunk)`, etc.). This is the bulk of the work.
  3. Swap `new JsonTransport()` for `new CborTransport()` on the send path and `new DualTransport()` on the receive path in `queue.ts`. For Redis, wrap `job.data` with `encodeForBullMq` / `decodeFromBullMq` at the queue/worker boundary.
  4. For Turso, change the `queue_messages.payload` column from `TEXT` to `BLOB` (or base64-wrap the CBOR bytes) — CBOR is binary and will not round-trip as UTF-8 text.
  5. Honor `opts?.specVersion` in `queue()`: fall back to `JsonTransport` when `specVersion < 3` so existing runs continue to use the format they were created with.
  6. Declare `specVersion: SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT` on the exported world so the SDK creates new runs at v3.
  7. Extend the e2e tests to exercise `runInput` (start a workflow with a binary input and assert it round-trips).

Happy to split out per-world follow-up PRs if that's easier to review.

Test plan

  • `pnpm typecheck` passes for `@workflow-worlds/{turso,mongodb,redis}`.
  • Once wired in: existing e2e suite passes + new test exercising `Uint8Array` `runInput`.

🤖 Generated with Claude Code

Workflow SDK runs at spec version 3 (see vercel/workflow#1627) carry a
`runInput` on the first queue delivery whose `input` field is a
`Uint8Array`. JSON serialization does not round-trip `Uint8Array`, so
worlds that want to opt in to `SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT`
need to serialize queue payloads with CBOR instead.

This adds `CborTransport` + `DualTransport` helpers to each world
package (turso, mongodb, redis) so the queue transport can be switched
over without changing the public API. The Redis helper additionally
provides a small `encodeForBullMq`/`decodeFromBullMq` wrapper because
BullMQ stores `job.data` as a JSON value and needs the CBOR payload
base64-wrapped to survive its internal serialization.

Nothing is wired in yet — the follow-up work is described in the PR
description and includes bumping `@workflow/world`, migrating to the
restructured streams interface, wiring these transports into the queue
and HTTP handler, and declaring `specVersion: 3` on each world.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant