diff --git a/.changeset/cli-specversion-probe.md b/.changeset/cli-specversion-probe.md new file mode 100644 index 0000000000..732534b3f3 --- /dev/null +++ b/.changeset/cli-specversion-probe.md @@ -0,0 +1,6 @@ +--- +"@workflow/core": patch +"@workflow/cli": patch +--- + +CLI `start` command probes deployment specVersion via health check before choosing queue transport. Health check always uses JSON transport for compatibility with old deployments. diff --git a/packages/cli/src/lib/inspect/run.ts b/packages/cli/src/lib/inspect/run.ts index a5f89366b3..335bb9544d 100644 --- a/packages/cli/src/lib/inspect/run.ts +++ b/packages/cli/src/lib/inspect/run.ts @@ -1,4 +1,5 @@ import { start } from '@workflow/core/runtime'; +import { healthCheck } from '@workflow/core/runtime/helpers'; import type { WorkflowRun, World } from '@workflow/world'; import { logger } from '../config/log.js'; @@ -57,7 +58,27 @@ export const startRun = async ( const deploymentId = run.deploymentId; const workflowId = await getWorkflowName(world, workflowNameOrRunId); - const newRun = await start({ workflowId }, jsonArgs, { deploymentId }); + // Probe the deployment's specVersion via health check so we use the + // correct queue transport (JSON for old deployments, CBOR for new). + // Falls back to the run's specVersion if the health check fails + // (e.g. old deployment without health check support). + let specVersion = run.specVersion; + try { + const hc = await healthCheck(world, 'workflow', { + deploymentId, + timeout: 10_000, + }); + if (hc.healthy && hc.specVersion != null) { + specVersion = hc.specVersion; + } + } catch { + // Health check failed — use run's specVersion as fallback + } + + const newRun = await start({ workflowId }, jsonArgs, { + deploymentId, + specVersion, + }); if (opts.json) { process.stdout.write(JSON.stringify(newRun, null, 2)); diff --git a/packages/core/e2e/e2e.test.ts b/packages/core/e2e/e2e.test.ts index fc1c486a88..f1091dc050 100644 --- a/packages/core/e2e/e2e.test.ts +++ b/packages/core/e2e/e2e.test.ts @@ -1550,6 +1550,7 @@ describe('e2e', () => { healthy: true, endpoint: '/.well-known/workflow/v1/flow', specVersion: SPEC_VERSION_CURRENT, + workflowCoreVersion: expect.any(String), }); // Test the step endpoint health check @@ -1568,6 +1569,7 @@ describe('e2e', () => { healthy: true, endpoint: '/.well-known/workflow/v1/step', specVersion: SPEC_VERSION_CURRENT, + workflowCoreVersion: expect.any(String), }); } ); diff --git a/packages/core/src/runtime/helpers.ts b/packages/core/src/runtime/helpers.ts index fbabe993eb..b4597c6d07 100644 --- a/packages/core/src/runtime/helpers.ts +++ b/packages/core/src/runtime/helpers.ts @@ -4,12 +4,17 @@ import type { ValidQueueName, World, } from '@workflow/world'; -import { HealthCheckPayloadSchema, SPEC_VERSION_CURRENT } from '@workflow/world'; +import { + HealthCheckPayloadSchema, + SPEC_VERSION_CURRENT, + SPEC_VERSION_LEGACY, +} from '@workflow/world'; import { monotonicFactory } from 'ulid'; import { runtimeLogger } from '../logger.js'; import * as Attribute from '../telemetry/semantic-conventions.js'; import { getSpanKind, trace } from '../telemetry.js'; +import { version as workflowCoreVersion } from '../version.js'; import { getWorld } from './world.js'; /** Default timeout for health checks in milliseconds */ @@ -99,6 +104,7 @@ export async function handleHealthCheckMessage( endpoint, correlationId: healthCheck.correlationId, specVersion: SPEC_VERSION_CURRENT, + workflowCoreVersion, timestamp: Date.now(), }); // Use a fake runId that passes validation. @@ -114,6 +120,8 @@ export type HealthCheckEndpoint = 'workflow' | 'step'; export interface HealthCheckOptions { /** Timeout in milliseconds to wait for health check response. Default: 30000 (30s) */ timeout?: number; + /** Deployment ID to send the health check to. Falls back to process.env.VERCEL_DEPLOYMENT_ID. */ + deploymentId?: string; } /** @@ -186,6 +194,12 @@ function parseHealthCheckResponse( try { response = JSON.parse(responseText); } catch { + // Old deployments (specVersion < 3) return plain text like + // 'Workflow SDK "..." endpoint is healthy'. Treat any non-empty + // text response as a healthy deployment with unknown specVersion. + if (responseText.length > 0) { + return { healthy: true }; + } return null; } @@ -225,10 +239,16 @@ export async function healthCheck( const startTime = Date.now(); try { - await world.queue(queueName, { - __healthCheck: true, - correlationId, - }); + await world.queue( + queueName, + { __healthCheck: true, correlationId }, + { + // Use JSON transport so the health check works against both + // old (JSON-only) and new (dual) deployments. + specVersion: SPEC_VERSION_LEGACY, + deploymentId: options?.deploymentId, + } + ); while (Date.now() - startTime < timeout) { try { @@ -375,6 +395,7 @@ export function withHealthCheck( healthy: true, endpoint: url.pathname, specVersion: SPEC_VERSION_CURRENT, + workflowCoreVersion, }), { status: 200,