Skip to content

Commit 2ece7ba

Browse files
[core] [world] Fix community world E2E tests broken by specVersion bump (#1658)
Signed-off-by: Peter Wielander <mittgfu@gmail.com>
1 parent 7aab36b commit 2ece7ba

12 files changed

Lines changed: 105 additions & 23 deletions

File tree

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@workflow/core': patch
3+
'@workflow/world': patch
4+
'@workflow/world-vercel': patch
5+
'@workflow/world-local': patch
6+
'@workflow/world-postgres': patch
7+
---
8+
9+
Fix community world E2E tests by adding `specVersion` to the World interface so `start()` uses the safe baseline (v2) for worlds that don't declare their supported version

packages/core/e2e/e2e.test.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,9 +1539,12 @@ describe('e2e', () => {
15391539
expect(flowBody).toEqual({
15401540
healthy: true,
15411541
endpoint: '/.well-known/workflow/v1/flow',
1542-
specVersion: SPEC_VERSION_CURRENT,
1542+
// specVersion comes from the World's declared specVersion (e.g. 3
1543+
// for world-vercel) or falls back to SPEC_VERSION_CURRENT (2).
1544+
specVersion: expect.any(Number),
15431545
workflowCoreVersion: expect.any(String),
15441546
});
1547+
expect(flowBody.specVersion).toBeGreaterThanOrEqual(SPEC_VERSION_CURRENT);
15451548

15461549
// Test the step endpoint health check
15471550
const stepHealthUrl = new URL(
@@ -1558,9 +1561,10 @@ describe('e2e', () => {
15581561
expect(stepBody).toEqual({
15591562
healthy: true,
15601563
endpoint: '/.well-known/workflow/v1/step',
1561-
specVersion: SPEC_VERSION_CURRENT,
1564+
specVersion: expect.any(Number),
15621565
workflowCoreVersion: expect.any(String),
15631566
});
1567+
expect(stepBody.specVersion).toBeGreaterThanOrEqual(SPEC_VERSION_CURRENT);
15641568
}
15651569
);
15661570

packages/core/src/runtime.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ export {
9898
export function workflowEntrypoint(
9999
workflowCode: string
100100
): (req: Request) => Promise<Response> {
101-
const handler = getWorldHandlers().createQueueHandler(
101+
const { createQueueHandler, specVersion: worldSpecVersion } =
102+
getWorldHandlers();
103+
const handler = createQueueHandler(
102104
'__wkf_workflow_',
103105
async (message_, metadata) => {
104106
// Check if this is a health check message
@@ -107,7 +109,11 @@ export function workflowEntrypoint(
107109
// The stream name includes a unique correlationId that must be known by the caller.
108110
const healthCheck = parseHealthCheckPayload(message_);
109111
if (healthCheck) {
110-
await handleHealthCheckMessage(healthCheck, 'workflow');
112+
await handleHealthCheckMessage(
113+
healthCheck,
114+
'workflow',
115+
worldSpecVersion
116+
);
111117
return;
112118
}
113119

@@ -625,7 +631,7 @@ export function workflowEntrypoint(
625631
}
626632
);
627633

628-
return withHealthCheck(handler);
634+
return withHealthCheck(handler, worldSpecVersion);
629635
}
630636

631637
// this is a no-op placeholder as the client is

packages/core/src/runtime/helpers.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,16 @@ function generateHealthCheckRunId(): string {
9595
*/
9696
export async function handleHealthCheckMessage(
9797
healthCheck: HealthCheckPayload,
98-
endpoint: 'workflow' | 'step'
98+
endpoint: 'workflow' | 'step',
99+
worldSpecVersion?: number
99100
): Promise<void> {
100101
const world = getWorld();
101102
const streamName = getHealthCheckStreamName(healthCheck.correlationId);
102103
const response = JSON.stringify({
103104
healthy: true,
104105
endpoint,
105106
correlationId: healthCheck.correlationId,
106-
specVersion: SPEC_VERSION_CURRENT,
107+
specVersion: worldSpecVersion ?? SPEC_VERSION_CURRENT,
107108
workflowCoreVersion,
108109
timestamp: Date.now(),
109110
});
@@ -377,7 +378,8 @@ const HEALTH_CHECK_CORS_HEADERS = {
377378
* based on the presence of a `__health` query parameter.
378379
*/
379380
export function withHealthCheck(
380-
handler: (req: Request) => Promise<Response>
381+
handler: (req: Request) => Promise<Response>,
382+
worldSpecVersion?: number
381383
): (req: Request) => Promise<Response> {
382384
return async (req: Request) => {
383385
const url = new URL(req.url);
@@ -394,7 +396,7 @@ export function withHealthCheck(
394396
JSON.stringify({
395397
healthy: true,
396398
endpoint: url.pathname,
397-
specVersion: SPEC_VERSION_CURRENT,
399+
specVersion: worldSpecVersion ?? SPEC_VERSION_CURRENT,
398400
workflowCoreVersion,
399401
}),
400402
{

packages/core/src/runtime/start.test.ts

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import { WorkflowRuntimeError, WorkflowWorldError } from '@workflow/errors';
2-
import { SPEC_VERSION_CURRENT, SPEC_VERSION_LEGACY } from '@workflow/world';
2+
import {
3+
SPEC_VERSION_CURRENT,
4+
SPEC_VERSION_LEGACY,
5+
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
6+
SPEC_VERSION_SUPPORTS_EVENT_SOURCING,
7+
} from '@workflow/world';
38
import {
49
afterEach,
510
beforeEach,
@@ -10,8 +15,8 @@ import {
1015
vi,
1116
} from 'vitest';
1217
import type { Run } from './run.js';
13-
import { start } from './start.js';
1418
import type { WorkflowFunction } from './start.js';
19+
import { start } from './start.js';
1520
import { getWorld } from './world.js';
1621

1722
// Mock @vercel/functions
@@ -109,11 +114,35 @@ describe('start', () => {
109114
vi.clearAllMocks();
110115
});
111116

112-
it('should use SPEC_VERSION_CURRENT when specVersion is not provided', async () => {
117+
it('should use world.specVersion when available, falling back to SPEC_VERSION_SUPPORTS_EVENT_SOURCING', async () => {
113118
const validWorkflow = Object.assign(() => Promise.resolve('result'), {
114119
workflowId: 'test-workflow',
115120
});
116121

122+
// Mock world without specVersion → falls back to safe baseline (v2)
123+
await start(validWorkflow, []);
124+
125+
expect(mockEventsCreate).toHaveBeenCalledWith(
126+
expect.stringMatching(/^wrun_/),
127+
expect.objectContaining({
128+
eventType: 'run_created',
129+
specVersion: SPEC_VERSION_SUPPORTS_EVENT_SOURCING,
130+
}),
131+
expect.objectContaining({
132+
v1Compat: false,
133+
})
134+
);
135+
136+
vi.clearAllMocks();
137+
138+
// Mock world with specVersion 3 → uses it
139+
vi.mocked(getWorld).mockReturnValue({
140+
specVersion: SPEC_VERSION_CURRENT,
141+
getDeploymentId: vi.fn().mockResolvedValue('deploy_123'),
142+
events: { create: mockEventsCreate },
143+
queue: mockQueue,
144+
} as any);
145+
117146
await start(validWorkflow, []);
118147

119148
expect(mockEventsCreate).toHaveBeenCalledWith(
@@ -408,6 +437,8 @@ describe('start', () => {
408437
const mockEventsCreate = vi.fn().mockRejectedValue(serverError);
409438

410439
vi.mocked(getWorld).mockReturnValue({
440+
// World declares specVersion 3 to enable CBOR queue transport + runInput
441+
specVersion: SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
411442
getDeploymentId: vi.fn().mockResolvedValue('deploy_123'),
412443
events: { create: mockEventsCreate },
413444
queue: mockQueue,
@@ -423,7 +454,9 @@ describe('start', () => {
423454
expect(queuePayload.runInput).toBeDefined();
424455
expect(queuePayload.runInput.deploymentId).toBe('deploy_123');
425456
expect(queuePayload.runInput.workflowName).toBe('test-workflow');
426-
expect(queuePayload.runInput.specVersion).toBe(SPEC_VERSION_CURRENT);
457+
expect(queuePayload.runInput.specVersion).toBe(
458+
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT
459+
);
427460
});
428461

429462
it('should throw when queue fails even if events.create succeeds', async () => {

packages/core/src/runtime/start.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import {
88
import type { WorkflowInvokePayload, World } from '@workflow/world';
99
import {
1010
isLegacySpecVersion,
11-
SPEC_VERSION_CURRENT,
1211
SPEC_VERSION_SUPPORTS_CBOR_QUEUE_TRANSPORT,
12+
SPEC_VERSION_SUPPORTS_EVENT_SOURCING,
1313
} from '@workflow/world';
1414
import { monotonicFactory } from 'ulid';
1515
import { importKey } from '../encryption.js';
@@ -172,7 +172,14 @@ export async function start<TArgs extends unknown[], TResult>(
172172
// Serialize current trace context to propagate across queue boundary
173173
const traceCarrier = await serializeTraceCarrier();
174174

175-
const specVersion = opts.specVersion ?? SPEC_VERSION_CURRENT;
175+
// Use world-declared specVersion when available (our worlds set this),
176+
// otherwise fall back to the safe baseline that community worlds handle.
177+
// Community worlds built against older @workflow/world reject runs with
178+
// specVersion > their SPEC_VERSION_CURRENT via requiresNewerWorld().
179+
const specVersion =
180+
opts.specVersion ??
181+
world.specVersion ??
182+
SPEC_VERSION_SUPPORTS_EVENT_SOURCING;
176183
const v1Compat = isLegacySpecVersion(specVersion);
177184

178185
// Resolve encryption key for the new run. The runId has already been

packages/core/src/runtime/step-handler.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import {
3434
getErrorStack,
3535
normalizeUnknownError,
3636
} from '../types.js';
37+
import { MAX_QUEUE_DELIVERIES } from './constants.js';
3738
import {
3839
getQueueOverhead,
3940
getWorkflowQueueName,
@@ -42,12 +43,13 @@ import {
4243
queueMessage,
4344
withHealthCheck,
4445
} from './helpers.js';
45-
import { MAX_QUEUE_DELIVERIES } from './constants.js';
4646
import { getWorld, getWorldHandlers } from './world.js';
4747

4848
const DEFAULT_STEP_MAX_RETRIES = 3;
4949

50-
const stepHandler = getWorldHandlers().createQueueHandler(
50+
const { createQueueHandler, specVersion: worldSpecVersion } =
51+
getWorldHandlers();
52+
const stepHandler = createQueueHandler(
5153
'__wkf_step_',
5254
async (message_, metadata) => {
5355
// Check if this is a health check message
@@ -56,7 +58,7 @@ const stepHandler = getWorldHandlers().createQueueHandler(
5658
// The stream name includes a unique correlationId that must be known by the caller.
5759
const healthCheck = parseHealthCheckPayload(message_);
5860
if (healthCheck) {
59-
await handleHealthCheckMessage(healthCheck, 'step');
61+
await handleHealthCheckMessage(healthCheck, 'step', worldSpecVersion);
6062
return;
6163
}
6264

@@ -864,4 +866,4 @@ const stepHandler = getWorldHandlers().createQueueHandler(
864866
* for each step, this is temporary.
865867
*/
866868
export const stepEntrypoint: (req: Request) => Promise<Response> =
867-
/* @__PURE__ */ withHealthCheck(stepHandler);
869+
/* @__PURE__ */ withHealthCheck(stepHandler, worldSpecVersion);

packages/core/src/runtime/world.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,18 @@ export const createWorld = (): World => {
8585
* Once we migrate to a file-based configuration (workflow.config.ts), we should
8686
* be able to re-combine getWorld and getWorldHandlers into one singleton.
8787
*/
88-
export const getWorldHandlers = (): Pick<World, 'createQueueHandler'> => {
88+
export const getWorldHandlers = (): Pick<
89+
World,
90+
'createQueueHandler' | 'specVersion'
91+
> => {
8992
if (globalSymbols[StubbedWorldCache]) {
9093
return globalSymbols[StubbedWorldCache];
9194
}
9295
const _world = createWorld();
9396
globalSymbols[StubbedWorldCache] = _world;
9497
return {
9598
createQueueHandler: _world.createQueueHandler,
99+
specVersion: _world.specVersion,
96100
};
97101
};
98102

packages/world-local/src/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { promises as fs } from 'node:fs';
22
import { rm } from 'node:fs/promises';
33
import path from 'node:path';
44
import type { World } from '@workflow/world';
5-
import { reenqueueActiveRuns } from '@workflow/world';
5+
import { reenqueueActiveRuns, SPEC_VERSION_CURRENT } from '@workflow/world';
66
import type { Config } from './config.js';
77
import { config } from './config.js';
88
import {
@@ -29,7 +29,7 @@ export {
2929
parseVersion,
3030
} from './init.js';
3131

32-
export { type DirectHandler } from './queue.js';
32+
export type { DirectHandler } from './queue.js';
3333

3434
export type LocalWorld = World & {
3535
/** Register a direct in-process handler for a queue prefix, bypassing HTTP. */
@@ -64,6 +64,7 @@ export function createLocalWorld(args?: Partial<Config>): LocalWorld {
6464
const queue = createQueue(mergedConfig);
6565
const storage = createStorage(mergedConfig.dataDir, tag);
6666
return {
67+
specVersion: SPEC_VERSION_CURRENT,
6768
...queue,
6869
...createStorage(mergedConfig.dataDir, tag),
6970
...instrumentObject('world.streams', {

packages/world-postgres/src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { Storage, World } from '@workflow/world';
2-
import { reenqueueActiveRuns } from '@workflow/world';
2+
import { reenqueueActiveRuns, SPEC_VERSION_CURRENT } from '@workflow/world';
33
import { Pool } from 'pg';
44
import type { PostgresWorldConfig } from './config.js';
55
import { createClient, type Drizzle } from './drizzle/index.js';
@@ -57,6 +57,7 @@ export function createWorld(
5757
const streamer = createStreamer(pool, drizzle);
5858

5959
return {
60+
specVersion: SPEC_VERSION_CURRENT,
6061
...storage,
6162
...streamer,
6263
...queue,

0 commit comments

Comments
 (0)