Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/cli-specversion-probe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@workflow/core": patch
"@workflow/cli": patch
---

CLI `start` command probes deployment specVersion via health check before choosing queue transport. Health check always uses JSON transport for compatibility with old deployments.
23 changes: 22 additions & 1 deletion packages/cli/src/lib/inspect/run.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { start } from '@workflow/core/runtime';
import { healthCheck } from '@workflow/core/runtime/helpers';
import type { WorkflowRun, World } from '@workflow/world';
import { logger } from '../config/log.js';

Expand Down Expand Up @@ -57,7 +58,27 @@ export const startRun = async (
const deploymentId = run.deploymentId;
const workflowId = await getWorkflowName(world, workflowNameOrRunId);

const newRun = await start({ workflowId }, jsonArgs, { deploymentId });
// Probe the deployment's specVersion via health check so we use the
// correct queue transport (JSON for old deployments, CBOR for new).
// Falls back to the run's specVersion if the health check fails
// (e.g. old deployment without health check support).
let specVersion = run.specVersion;
try {
const hc = await healthCheck(world, 'workflow', {
deploymentId,
timeout: 10_000,
});
if (hc.healthy && hc.specVersion != null) {
specVersion = hc.specVersion;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: The 10s health check timeout adds latency to every CLI start invocation. When the deployment is warm, this is negligible (~1-2s). But on cold-start (or if the deployment is down), the user waits 10s with no feedback before the fallback kicks in.

Consider:

  1. A shorter timeout (e.g. 5s) — the health check response is tiny, so if it's not back in 5s something is wrong
  2. A log message before the probe: logger.info('Probing deployment version...') so the user understands the delay
  3. Both

}
} catch {
// Health check failed — use run's specVersion as fallback
}

const newRun = await start({ workflowId }, jsonArgs, {
deploymentId,
specVersion,
});

if (opts.json) {
process.stdout.write(JSON.stringify(newRun, null, 2));
Expand Down
2 changes: 2 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,7 @@ describe('e2e', () => {
healthy: true,
endpoint: '/.well-known/workflow/v1/flow',
specVersion: SPEC_VERSION_CURRENT,
workflowCoreVersion: expect.any(String),
});

// Test the step endpoint health check
Expand All @@ -1568,6 +1569,7 @@ describe('e2e', () => {
healthy: true,
endpoint: '/.well-known/workflow/v1/step',
specVersion: SPEC_VERSION_CURRENT,
workflowCoreVersion: expect.any(String),
});
}
);
Expand Down
31 changes: 26 additions & 5 deletions packages/core/src/runtime/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import type {
ValidQueueName,
World,
} from '@workflow/world';
import { HealthCheckPayloadSchema, SPEC_VERSION_CURRENT } from '@workflow/world';
import {
HealthCheckPayloadSchema,
SPEC_VERSION_CURRENT,
SPEC_VERSION_LEGACY,
} from '@workflow/world';
import { monotonicFactory } from 'ulid';

import { runtimeLogger } from '../logger.js';
import * as Attribute from '../telemetry/semantic-conventions.js';
import { getSpanKind, trace } from '../telemetry.js';
import { version as workflowCoreVersion } from '../version.js';
import { getWorld } from './world.js';

/** Default timeout for health checks in milliseconds */
Expand Down Expand Up @@ -99,6 +104,7 @@ export async function handleHealthCheckMessage(
endpoint,
correlationId: healthCheck.correlationId,
specVersion: SPEC_VERSION_CURRENT,
workflowCoreVersion,
timestamp: Date.now(),
});
// Use a fake runId that passes validation.
Expand All @@ -114,6 +120,8 @@ export type HealthCheckEndpoint = 'workflow' | 'step';
export interface HealthCheckOptions {
/** Timeout in milliseconds to wait for health check response. Default: 30000 (30s) */
timeout?: number;
/** Deployment ID to send the health check to. Falls back to process.env.VERCEL_DEPLOYMENT_ID. */
deploymentId?: string;
}

/**
Expand Down Expand Up @@ -186,6 +194,12 @@ function parseHealthCheckResponse(
try {
response = JSON.parse(responseText);
} catch {
// Old deployments (specVersion < 3) return plain text like
// 'Workflow SDK "..." endpoint is healthy'. Treat any non-empty
// text response as a healthy deployment with unknown specVersion.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: Good catch handling the old text/plain format. One edge case: if the deployment returns an error response body (e.g. HTML from a Vercel error page like <html>...), this would treat it as { healthy: true } since the text is non-empty. Consider checking for a known substring like "healthy" or "endpoint" in the text before returning healthy:

if (responseText.includes('healthy') || responseText.includes('endpoint')) {
  return { healthy: true };
}
return null;

This way, random error pages don't get misidentified as healthy deployments.

if (responseText.length > 0) {
return { healthy: true };
}
return null;
}

Expand Down Expand Up @@ -225,10 +239,16 @@ export async function healthCheck(
const startTime = Date.now();

try {
await world.queue(queueName, {
__healthCheck: true,
correlationId,
});
await world.queue(
queueName,
{ __healthCheck: true, correlationId },
{
// Use JSON transport so the health check works against both
// old (JSON-only) and new (dual) deployments.
specVersion: SPEC_VERSION_LEGACY,
deploymentId: options?.deploymentId,
}
);

while (Date.now() - startTime < timeout) {
try {
Expand Down Expand Up @@ -375,6 +395,7 @@ export function withHealthCheck(
healthy: true,
endpoint: url.pathname,
specVersion: SPEC_VERSION_CURRENT,
workflowCoreVersion,
}),
{
status: 200,
Expand Down
Loading