Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/fix-community-world-specversion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@workflow/core': patch
'@workflow/world': patch
'@workflow/world-vercel': patch
'@workflow/world-local': patch
'@workflow/world-postgres': patch
---

Fix community world E2E tests by adding `specVersion` to the World interface so `start()` uses the safe baseline (v2) for worlds that don't declare their supported version
8 changes: 6 additions & 2 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1549,9 +1549,12 @@ describe('e2e', () => {
expect(flowBody).toEqual({
healthy: true,
endpoint: '/.well-known/workflow/v1/flow',
specVersion: SPEC_VERSION_CURRENT,
// specVersion comes from the World's declared specVersion (e.g. 3
// for world-vercel) or falls back to SPEC_VERSION_CURRENT (2).
specVersion: expect.any(Number),
workflowCoreVersion: expect.any(String),
});
expect(flowBody.specVersion).toBeGreaterThanOrEqual(SPEC_VERSION_CURRENT);

// Test the step endpoint health check
const stepHealthUrl = new URL(
Expand All @@ -1568,9 +1571,10 @@ describe('e2e', () => {
expect(stepBody).toEqual({
healthy: true,
endpoint: '/.well-known/workflow/v1/step',
specVersion: SPEC_VERSION_CURRENT,
specVersion: expect.any(Number),
workflowCoreVersion: expect.any(String),
});
expect(stepBody.specVersion).toBeGreaterThanOrEqual(SPEC_VERSION_CURRENT);
}
);

Expand Down
12 changes: 9 additions & 3 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ export {
export function workflowEntrypoint(
workflowCode: string
): (req: Request) => Promise<Response> {
const handler = getWorldHandlers().createQueueHandler(
const { createQueueHandler, specVersion: worldSpecVersion } =
getWorldHandlers();
const handler = createQueueHandler(
'__wkf_workflow_',
async (message_, metadata) => {
// Check if this is a health check message
Expand All @@ -106,7 +108,11 @@ export function workflowEntrypoint(
// The stream name includes a unique correlationId that must be known by the caller.
const healthCheck = parseHealthCheckPayload(message_);
if (healthCheck) {
await handleHealthCheckMessage(healthCheck, 'workflow');
await handleHealthCheckMessage(
healthCheck,
'workflow',
worldSpecVersion
);
return;
}

Expand Down Expand Up @@ -615,7 +621,7 @@ export function workflowEntrypoint(
}
);

return withHealthCheck(handler);
return withHealthCheck(handler, worldSpecVersion);
}

// this is a no-op placeholder as the client is
Expand Down
10 changes: 6 additions & 4 deletions packages/core/src/runtime/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,16 @@ function generateHealthCheckRunId(): string {
*/
export async function handleHealthCheckMessage(
healthCheck: HealthCheckPayload,
endpoint: 'workflow' | 'step'
endpoint: 'workflow' | 'step',
worldSpecVersion?: number
): Promise<void> {
const world = getWorld();
const streamName = getHealthCheckStreamName(healthCheck.correlationId);
const response = JSON.stringify({
healthy: true,
endpoint,
correlationId: healthCheck.correlationId,
specVersion: SPEC_VERSION_CURRENT,
specVersion: worldSpecVersion ?? SPEC_VERSION_CURRENT,
workflowCoreVersion,
timestamp: Date.now(),
});
Expand Down Expand Up @@ -377,7 +378,8 @@ const HEALTH_CHECK_CORS_HEADERS = {
* based on the presence of a `__health` query parameter.
*/
export function withHealthCheck(
handler: (req: Request) => Promise<Response>
handler: (req: Request) => Promise<Response>,
worldSpecVersion?: number
): (req: Request) => Promise<Response> {
return async (req: Request) => {
const url = new URL(req.url);
Expand All @@ -394,7 +396,7 @@ export function withHealthCheck(
JSON.stringify({
healthy: true,
endpoint: url.pathname,
specVersion: SPEC_VERSION_CURRENT,
specVersion: worldSpecVersion ?? SPEC_VERSION_CURRENT,
workflowCoreVersion,
}),
{
Expand Down
41 changes: 37 additions & 4 deletions packages/core/src/runtime/start.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { WorkflowRuntimeError, WorkflowWorldError } from '@workflow/errors';
import { SPEC_VERSION_CURRENT, SPEC_VERSION_LEGACY } from '@workflow/world';
import {
SPEC_VERSION_CURRENT,
SPEC_VERSION_LEGACY,
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
SPEC_VERSION_SUPPORTS_EVENT_SOURCING,
} from '@workflow/world';
import {
afterEach,
beforeEach,
Expand All @@ -10,8 +15,8 @@ import {
vi,
} from 'vitest';
import type { Run } from './run.js';
import { start } from './start.js';
import type { WorkflowFunction } from './start.js';
import { start } from './start.js';
import { getWorld } from './world.js';

// Mock @vercel/functions
Expand Down Expand Up @@ -109,11 +114,35 @@ describe('start', () => {
vi.clearAllMocks();
});

it('should use SPEC_VERSION_CURRENT when specVersion is not provided', async () => {
it('should use world.specVersion when available, falling back to SPEC_VERSION_SUPPORTS_EVENT_SOURCING', async () => {
const validWorkflow = Object.assign(() => Promise.resolve('result'), {
workflowId: 'test-workflow',
});

// Mock world without specVersion → falls back to safe baseline (v2)
await start(validWorkflow, []);

expect(mockEventsCreate).toHaveBeenCalledWith(
expect.stringMatching(/^wrun_/),
expect.objectContaining({
eventType: 'run_created',
specVersion: SPEC_VERSION_SUPPORTS_EVENT_SOURCING,
}),
expect.objectContaining({
v1Compat: false,
})
);

vi.clearAllMocks();

// Mock world with specVersion 3 → uses it
vi.mocked(getWorld).mockReturnValue({
specVersion: SPEC_VERSION_CURRENT,
getDeploymentId: vi.fn().mockResolvedValue('deploy_123'),
events: { create: mockEventsCreate },
queue: mockQueue,
} as any);

await start(validWorkflow, []);

expect(mockEventsCreate).toHaveBeenCalledWith(
Expand Down Expand Up @@ -408,6 +437,8 @@ describe('start', () => {
const mockEventsCreate = vi.fn().mockRejectedValue(serverError);

vi.mocked(getWorld).mockReturnValue({
// World declares specVersion 3 to enable CBOR queue transport + runInput
specVersion: SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
getDeploymentId: vi.fn().mockResolvedValue('deploy_123'),
events: { create: mockEventsCreate },
queue: mockQueue,
Expand All @@ -423,7 +454,9 @@ describe('start', () => {
expect(queuePayload.runInput).toBeDefined();
expect(queuePayload.runInput.deploymentId).toBe('deploy_123');
expect(queuePayload.runInput.workflowName).toBe('test-workflow');
expect(queuePayload.runInput.specVersion).toBe(SPEC_VERSION_CURRENT);
expect(queuePayload.runInput.specVersion).toBe(
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT
);
});

it('should throw when queue fails even if events.create succeeds', async () => {
Expand Down
11 changes: 9 additions & 2 deletions packages/core/src/runtime/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import {
import type { WorkflowInvokePayload, World } from '@workflow/world';
import {
isLegacySpecVersion,
SPEC_VERSION_CURRENT,
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
SPEC_VERSION_SUPPORTS_EVENT_SOURCING,
} from '@workflow/world';
import { monotonicFactory } from 'ulid';
import { importKey } from '../encryption.js';
Expand Down Expand Up @@ -172,7 +172,14 @@ export async function start<TArgs extends unknown[], TResult>(
// Serialize current trace context to propagate across queue boundary
const traceCarrier = await serializeTraceCarrier();

const specVersion = opts.specVersion ?? SPEC_VERSION_CURRENT;
// Use world-declared specVersion when available (our worlds set this),
// otherwise fall back to the safe baseline that community worlds handle.
// Community worlds built against older @workflow/world reject runs with
// specVersion > their SPEC_VERSION_CURRENT via requiresNewerWorld().
const specVersion =
opts.specVersion ??
world.specVersion ??
SPEC_VERSION_SUPPORTS_EVENT_SOURCING;
const v1Compat = isLegacySpecVersion(specVersion);

// Resolve encryption key for the new run. The runId has already been
Expand Down
10 changes: 6 additions & 4 deletions packages/core/src/runtime/step-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
getErrorStack,
normalizeUnknownError,
} from '../types.js';
import { MAX_QUEUE_DELIVERIES } from './constants.js';
import {
getQueueOverhead,
getWorkflowQueueName,
Expand All @@ -42,12 +43,13 @@ import {
queueMessage,
withHealthCheck,
} from './helpers.js';
import { MAX_QUEUE_DELIVERIES } from './constants.js';
import { getWorld, getWorldHandlers } from './world.js';

const DEFAULT_STEP_MAX_RETRIES = 3;

const stepHandler = getWorldHandlers().createQueueHandler(
const { createQueueHandler, specVersion: worldSpecVersion } =
getWorldHandlers();
const stepHandler = createQueueHandler(
'__wkf_step_',
async (message_, metadata) => {
// Check if this is a health check message
Expand All @@ -56,7 +58,7 @@ const stepHandler = getWorldHandlers().createQueueHandler(
// The stream name includes a unique correlationId that must be known by the caller.
const healthCheck = parseHealthCheckPayload(message_);
if (healthCheck) {
await handleHealthCheckMessage(healthCheck, 'step');
await handleHealthCheckMessage(healthCheck, 'step', worldSpecVersion);
return;
}

Expand Down Expand Up @@ -864,4 +866,4 @@ const stepHandler = getWorldHandlers().createQueueHandler(
* for each step, this is temporary.
*/
export const stepEntrypoint: (req: Request) => Promise<Response> =
/* @__PURE__ */ withHealthCheck(stepHandler);
/* @__PURE__ */ withHealthCheck(stepHandler, worldSpecVersion);
6 changes: 5 additions & 1 deletion packages/core/src/runtime/world.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,18 @@ export const createWorld = (): World => {
* Once we migrate to a file-based configuration (workflow.config.ts), we should
* be able to re-combine getWorld and getWorldHandlers into one singleton.
*/
export const getWorldHandlers = (): Pick<World, 'createQueueHandler'> => {
export const getWorldHandlers = (): Pick<
World,
'createQueueHandler' | 'specVersion'
> => {
if (globalSymbols[StubbedWorldCache]) {
return globalSymbols[StubbedWorldCache];
}
const _world = createWorld();
globalSymbols[StubbedWorldCache] = _world;
return {
createQueueHandler: _world.createQueueHandler,
specVersion: _world.specVersion,
};
};

Expand Down
5 changes: 3 additions & 2 deletions packages/world-local/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { promises as fs } from 'node:fs';
import { rm } from 'node:fs/promises';
import path from 'node:path';
import type { World } from '@workflow/world';
import { reenqueueActiveRuns } from '@workflow/world';
import { reenqueueActiveRuns, SPEC_VERSION_CURRENT } from '@workflow/world';
import type { Config } from './config.js';
import { config } from './config.js';
import {
Expand All @@ -29,7 +29,7 @@ export {
parseVersion,
} from './init.js';

export { type DirectHandler } from './queue.js';
export type { DirectHandler } from './queue.js';

export type LocalWorld = World & {
/** Register a direct in-process handler for a queue prefix, bypassing HTTP. */
Expand Down Expand Up @@ -64,6 +64,7 @@ export function createLocalWorld(args?: Partial<Config>): LocalWorld {
const queue = createQueue(mergedConfig);
const storage = createStorage(mergedConfig.dataDir, tag);
return {
specVersion: SPEC_VERSION_CURRENT,
...queue,
...createStorage(mergedConfig.dataDir, tag),
...instrumentObject('world.streams', {
Expand Down
3 changes: 2 additions & 1 deletion packages/world-postgres/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Storage, World } from '@workflow/world';
import { reenqueueActiveRuns } from '@workflow/world';
import { reenqueueActiveRuns, SPEC_VERSION_CURRENT } from '@workflow/world';
import { Pool } from 'pg';
import type { PostgresWorldConfig } from './config.js';
import { createClient, type Drizzle } from './drizzle/index.js';
Expand Down Expand Up @@ -57,6 +57,7 @@ export function createWorld(
const streamer = createStreamer(pool, drizzle);

return {
specVersion: SPEC_VERSION_CURRENT,
...storage,
...streamer,
...queue,
Expand Down
2 changes: 2 additions & 0 deletions packages/world-vercel/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { World } from '@workflow/world';
import { SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT } from '@workflow/world';
import { createGetEncryptionKeyForRun } from './encryption.js';
import { instrumentObject } from './instrumentObject.js';
import { createQueue } from './queue.js';
Expand All @@ -24,6 +25,7 @@ export function createVercelWorld(config?: APIConfig): World {
config?.projectConfig?.projectId || process.env.VERCEL_PROJECT_ID;

return {
specVersion: SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
...createQueue(config),
...createStorage(config),
...instrumentObject('world.streams', createStreamer(config)),
Expand Down
11 changes: 11 additions & 0 deletions packages/world/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@ export interface Storage {
* The "World" interface represents how Workflows are able to communicate with the outside world.
*/
export interface World extends Queue, Storage, Streamer {
/**
* The highest spec version this World supports.
*
* When set, `start()` creates runs at this version so world-specific
* features (e.g., CBOR queue transport) are enabled automatically.
* When omitted, runs default to `SPEC_VERSION_SUPPORTS_EVENT_SOURCING` (2),
* the safe baseline that all worlds — including community worlds on
* older @workflow/world versions — are expected to handle.
*/
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: The JSDoc says "When omitted, runs default to SPEC_VERSION_CURRENT (the lowest version all worlds are expected to support)" — but SPEC_VERSION_CURRENT is 3, not 2. The actual fallback in start.ts is SPEC_VERSION_SUPPORTS_EVENT_SOURCING (2). This comment will confuse anyone reading the interface.

Suggested:

/**
 * The highest spec version this World supports.
 *
 * When set, `start()` creates runs at this version so world-specific
 * features (e.g., CBOR queue transport) are enabled automatically.
 * When omitted, runs default to `SPEC_VERSION_SUPPORTS_EVENT_SOURCING` (2),
 * the safe baseline that all worlds — including community worlds on
 * older @workflow/world versions — are expected to handle.
 */

specVersion?: number;

/**
* A function that will be called to start any background tasks needed by the World implementation.
* For example, in the case of a queue backed World, this would start the queue processing.
Expand Down
Loading