diff --git a/.changeset/fix-community-world-specversion.md b/.changeset/fix-community-world-specversion.md new file mode 100644 index 0000000000..b7abefca2a --- /dev/null +++ b/.changeset/fix-community-world-specversion.md @@ -0,0 +1,9 @@ +--- +'@workflow/core': patch +'@workflow/world': patch +'@workflow/world-vercel': patch +'@workflow/world-local': patch +'@workflow/world-postgres': patch +--- + +Fix community world E2E tests by adding `specVersion` to the World interface so `start()` uses the safe baseline (v2) for worlds that don't declare their supported version diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index 6260a5cfdb..b0c3e05754 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -1549,9 +1549,12 @@ describe('e2e', () => { expect(flowBody).toEqual({ healthy: true, endpoint: '/.well-known/workflow/v1/flow', - specVersion: SPEC_VERSION_CURRENT, + // specVersion comes from the World's declared specVersion (e.g. 3 + // for world-vercel) or falls back to SPEC_VERSION_CURRENT (2). + specVersion: expect.any(Number), workflowCoreVersion: expect.any(String), }); + expect(flowBody.specVersion).toBeGreaterThanOrEqual(SPEC_VERSION_CURRENT); // Test the step endpoint health check const stepHealthUrl = new URL( @@ -1568,9 +1571,10 @@ describe('e2e', () => { expect(stepBody).toEqual({ healthy: true, endpoint: '/.well-known/workflow/v1/step', - specVersion: SPEC_VERSION_CURRENT, + specVersion: expect.any(Number), workflowCoreVersion: expect.any(String), }); + expect(stepBody.specVersion).toBeGreaterThanOrEqual(SPEC_VERSION_CURRENT); } ); diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 250955b482..902936b74f 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -97,7 +97,9 @@ export { export function workflowEntrypoint( workflowCode: string ): (req: Request) => Promise { - const handler = getWorldHandlers().createQueueHandler( + const { createQueueHandler, specVersion: worldSpecVersion } = + getWorldHandlers(); + const handler = createQueueHandler( '__wkf_workflow_', async (message_, metadata) => { // Check if this is a health check message @@ -106,7 +108,11 @@ export function workflowEntrypoint( // The stream name includes a unique correlationId that must be known by the caller. const healthCheck = parseHealthCheckPayload(message_); if (healthCheck) { - await handleHealthCheckMessage(healthCheck, 'workflow'); + await handleHealthCheckMessage( + healthCheck, + 'workflow', + worldSpecVersion + ); return; } @@ -615,7 +621,7 @@ export function workflowEntrypoint( } ); - return withHealthCheck(handler); + return withHealthCheck(handler, worldSpecVersion); } // this is a no-op placeholder as the client is diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index b4597c6d07..7916997b33 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -95,7 +95,8 @@ function generateHealthCheckRunId(): string { */ export async function handleHealthCheckMessage( healthCheck: HealthCheckPayload, - endpoint: 'workflow' | 'step' + endpoint: 'workflow' | 'step', + worldSpecVersion?: number ): Promise { const world = getWorld(); const streamName = getHealthCheckStreamName(healthCheck.correlationId); @@ -103,7 +104,7 @@ export async function handleHealthCheckMessage( healthy: true, endpoint, correlationId: healthCheck.correlationId, - specVersion: SPEC_VERSION_CURRENT, + specVersion: worldSpecVersion ?? SPEC_VERSION_CURRENT, workflowCoreVersion, timestamp: Date.now(), }); @@ -377,7 +378,8 @@ const HEALTH_CHECK_CORS_HEADERS = { * based on the presence of a `__health` query parameter. */ export function withHealthCheck( - handler: (req: Request) => Promise + handler: (req: Request) => Promise, + worldSpecVersion?: number ): (req: Request) => Promise { return async (req: Request) => { const url = new URL(req.url); @@ -394,7 +396,7 @@ export function withHealthCheck( JSON.stringify({ healthy: true, endpoint: url.pathname, - specVersion: SPEC_VERSION_CURRENT, + specVersion: worldSpecVersion ?? SPEC_VERSION_CURRENT, workflowCoreVersion, }), { diff --git a/packages/core/src/runtime/start.test.ts b/packages/core/src/runtime/start.test.ts index f0423db343..f8785f2dd3 100644 --- a/packages/core/src/runtime/start.test.ts +++ b/packages/core/src/runtime/start.test.ts @@ -1,5 +1,10 @@ import { WorkflowRuntimeError, WorkflowWorldError } from '@workflow/errors'; -import { SPEC_VERSION_CURRENT, SPEC_VERSION_LEGACY } from '@workflow/world'; +import { + SPEC_VERSION_CURRENT, + SPEC_VERSION_LEGACY, + SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT, + SPEC_VERSION_SUPPORTS_EVENT_SOURCING, +} from '@workflow/world'; import { afterEach, beforeEach, @@ -10,8 +15,8 @@ import { vi, } from 'vitest'; import type { Run } from './run.js'; -import { start } from './start.js'; import type { WorkflowFunction } from './start.js'; +import { start } from './start.js'; import { getWorld } from './world.js'; // Mock @vercel/functions @@ -109,11 +114,35 @@ describe('start', () => { vi.clearAllMocks(); }); - it('should use SPEC_VERSION_CURRENT when specVersion is not provided', async () => { + it('should use world.specVersion when available, falling back to SPEC_VERSION_SUPPORTS_EVENT_SOURCING', async () => { const validWorkflow = Object.assign(() => Promise.resolve('result'), { workflowId: 'test-workflow', }); + // Mock world without specVersion → falls back to safe baseline (v2) + await start(validWorkflow, []); + + expect(mockEventsCreate).toHaveBeenCalledWith( + expect.stringMatching(/^wrun_/), + expect.objectContaining({ + eventType: 'run_created', + specVersion: SPEC_VERSION_SUPPORTS_EVENT_SOURCING, + }), + expect.objectContaining({ + v1Compat: false, + }) + ); + + vi.clearAllMocks(); + + // Mock world with specVersion 3 → uses it + vi.mocked(getWorld).mockReturnValue({ + specVersion: SPEC_VERSION_CURRENT, + getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), + events: { create: mockEventsCreate }, + queue: mockQueue, + } as any); + await start(validWorkflow, []); expect(mockEventsCreate).toHaveBeenCalledWith( @@ -408,6 +437,8 @@ describe('start', () => { const mockEventsCreate = vi.fn().mockRejectedValue(serverError); vi.mocked(getWorld).mockReturnValue({ + // World declares specVersion 3 to enable CBOR queue transport + runInput + specVersion: SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT, getDeploymentId: vi.fn().mockResolvedValue('deploy_123'), events: { create: mockEventsCreate }, queue: mockQueue, @@ -423,7 +454,9 @@ describe('start', () => { expect(queuePayload.runInput).toBeDefined(); expect(queuePayload.runInput.deploymentId).toBe('deploy_123'); expect(queuePayload.runInput.workflowName).toBe('test-workflow'); - expect(queuePayload.runInput.specVersion).toBe(SPEC_VERSION_CURRENT); + expect(queuePayload.runInput.specVersion).toBe( + SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT + ); }); it('should throw when queue fails even if events.create succeeds', async () => { diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index ffe7d49981..8d7af65d44 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -8,8 +8,8 @@ import { import type { WorkflowInvokePayload, World } from '@workflow/world'; import { isLegacySpecVersion, - SPEC_VERSION_CURRENT, SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT, + SPEC_VERSION_SUPPORTS_EVENT_SOURCING, } from '@workflow/world'; import { monotonicFactory } from 'ulid'; import { importKey } from '../encryption.js'; @@ -172,7 +172,14 @@ export async function start( // Serialize current trace context to propagate across queue boundary const traceCarrier = await serializeTraceCarrier(); - const specVersion = opts.specVersion ?? SPEC_VERSION_CURRENT; + // Use world-declared specVersion when available (our worlds set this), + // otherwise fall back to the safe baseline that community worlds handle. + // Community worlds built against older @workflow/world reject runs with + // specVersion > their SPEC_VERSION_CURRENT via requiresNewerWorld(). + const specVersion = + opts.specVersion ?? + world.specVersion ?? + SPEC_VERSION_SUPPORTS_EVENT_SOURCING; const v1Compat = isLegacySpecVersion(specVersion); // Resolve encryption key for the new run. The runId has already been diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 822f632a92..6bf0057a60 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -34,6 +34,7 @@ import { getErrorStack, normalizeUnknownError, } from '../types.js'; +import { MAX_QUEUE_DELIVERIES } from './constants.js'; import { getQueueOverhead, getWorkflowQueueName, @@ -42,12 +43,13 @@ import { queueMessage, withHealthCheck, } from './helpers.js'; -import { MAX_QUEUE_DELIVERIES } from './constants.js'; import { getWorld, getWorldHandlers } from './world.js'; const DEFAULT_STEP_MAX_RETRIES = 3; -const stepHandler = getWorldHandlers().createQueueHandler( +const { createQueueHandler, specVersion: worldSpecVersion } = + getWorldHandlers(); +const stepHandler = createQueueHandler( '__wkf_step_', async (message_, metadata) => { // Check if this is a health check message @@ -56,7 +58,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( // The stream name includes a unique correlationId that must be known by the caller. const healthCheck = parseHealthCheckPayload(message_); if (healthCheck) { - await handleHealthCheckMessage(healthCheck, 'step'); + await handleHealthCheckMessage(healthCheck, 'step', worldSpecVersion); return; } @@ -864,4 +866,4 @@ const stepHandler = getWorldHandlers().createQueueHandler( * for each step, this is temporary. */ export const stepEntrypoint: (req: Request) => Promise = - /* @__PURE__ */ withHealthCheck(stepHandler); + /* @__PURE__ */ withHealthCheck(stepHandler, worldSpecVersion); diff --git a/packages/core/src/runtime/world.ts b/packages/core/src/runtime/world.ts index 4ca3339fee..524dc3cd7a 100644 --- a/packages/core/src/runtime/world.ts +++ b/packages/core/src/runtime/world.ts @@ -85,7 +85,10 @@ export const createWorld = (): World => { * Once we migrate to a file-based configuration (workflow.config.ts), we should * be able to re-combine getWorld and getWorldHandlers into one singleton. */ -export const getWorldHandlers = (): Pick => { +export const getWorldHandlers = (): Pick< + World, + 'createQueueHandler' | 'specVersion' +> => { if (globalSymbols[StubbedWorldCache]) { return globalSymbols[StubbedWorldCache]; } @@ -93,6 +96,7 @@ export const getWorldHandlers = (): Pick => { globalSymbols[StubbedWorldCache] = _world; return { createQueueHandler: _world.createQueueHandler, + specVersion: _world.specVersion, }; }; diff --git a/packages/world-local/src/index.ts b/packages/world-local/src/index.ts index ccc28f8188..3eb9cee622 100644 --- a/packages/world-local/src/index.ts +++ b/packages/world-local/src/index.ts @@ -2,7 +2,7 @@ import { promises as fs } from 'node:fs'; import { rm } from 'node:fs/promises'; import path from 'node:path'; import type { World } from '@workflow/world'; -import { reenqueueActiveRuns } from '@workflow/world'; +import { reenqueueActiveRuns, SPEC_VERSION_CURRENT } from '@workflow/world'; import type { Config } from './config.js'; import { config } from './config.js'; import { @@ -29,7 +29,7 @@ export { parseVersion, } from './init.js'; -export { type DirectHandler } from './queue.js'; +export type { DirectHandler } from './queue.js'; export type LocalWorld = World & { /** Register a direct in-process handler for a queue prefix, bypassing HTTP. */ @@ -64,6 +64,7 @@ export function createLocalWorld(args?: Partial): LocalWorld { const queue = createQueue(mergedConfig); const storage = createStorage(mergedConfig.dataDir, tag); return { + specVersion: SPEC_VERSION_CURRENT, ...queue, ...createStorage(mergedConfig.dataDir, tag), ...instrumentObject('world.streams', { diff --git a/packages/world-postgres/src/index.ts b/packages/world-postgres/src/index.ts index 62e1530138..31fb1d84ac 100644 --- a/packages/world-postgres/src/index.ts +++ b/packages/world-postgres/src/index.ts @@ -1,5 +1,5 @@ import type { Storage, World } from '@workflow/world'; -import { reenqueueActiveRuns } from '@workflow/world'; +import { reenqueueActiveRuns, SPEC_VERSION_CURRENT } from '@workflow/world'; import { Pool } from 'pg'; import type { PostgresWorldConfig } from './config.js'; import { createClient, type Drizzle } from './drizzle/index.js'; @@ -57,6 +57,7 @@ export function createWorld( const streamer = createStreamer(pool, drizzle); return { + specVersion: SPEC_VERSION_CURRENT, ...storage, ...streamer, ...queue, diff --git a/packages/world-vercel/src/index.ts b/packages/world-vercel/src/index.ts index 896fede377..f15c480869 100644 --- a/packages/world-vercel/src/index.ts +++ b/packages/world-vercel/src/index.ts @@ -1,4 +1,5 @@ import type { World } from '@workflow/world'; +import { SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT } from '@workflow/world'; import { createGetEncryptionKeyForRun } from './encryption.js'; import { instrumentObject } from './instrumentObject.js'; import { createQueue } from './queue.js'; @@ -24,6 +25,7 @@ export function createVercelWorld(config?: APIConfig): World { config?.projectConfig?.projectId || process.env.VERCEL_PROJECT_ID; return { + specVersion: SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT, ...createQueue(config), ...createStorage(config), ...instrumentObject('world.streams', createStreamer(config)), diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index c9628a9685..cd650c07a1 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -233,6 +233,17 @@ export interface Storage { * The "World" interface represents how Workflows are able to communicate with the outside world. */ export interface World extends Queue, Storage, Streamer { + /** + * The highest spec version this World supports. + * + * When set, `start()` creates runs at this version so world-specific + * features (e.g., CBOR queue transport) are enabled automatically. + * When omitted, runs default to `SPEC_VERSION_SUPPORTS_EVENT_SOURCING` (2), + * the safe baseline that all worlds — including community worlds on + * older @workflow/world versions — are expected to handle. + */ + specVersion?: number; + /** * A function that will be called to start any background tasks needed by the World implementation. * For example, in the case of a queue backed World, this would start the queue processing.