Skip to content

Commit 6e59303

Browse files
committed
update versioning
1 parent 834d7f1 commit 6e59303

File tree

5 files changed

+340
-0
lines changed

5 files changed

+340
-0
lines changed

packages/durabletask-js/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// Client and Worker
55
export { TaskHubGrpcClient, TaskHubGrpcClientOptions, MetadataGenerator } from "./client/client";
66
export { TaskHubGrpcWorker, TaskHubGrpcWorkerOptions } from "./worker/task-hub-grpc-worker";
7+
export { VersioningOptions, VersionMatchStrategy, VersionFailureStrategy } from "./worker/versioning-options";
78

89
// Contexts
910
export { OrchestrationContext } from "./task/context/orchestration-context";

packages/durabletask-js/src/worker/task-hub-grpc-worker.ts

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import { ActivityExecutor } from "./activity-executor";
1818
import { StringValue } from "google-protobuf/google/protobuf/wrappers_pb";
1919
import { Logger, ConsoleLogger } from "../types/logger.type";
2020
import { ExponentialBackoff, sleep, withTimeout } from "../utils/backoff.util";
21+
import { VersioningOptions, VersionMatchStrategy, VersionFailureStrategy } from "./versioning-options";
22+
import { compareVersions } from "../utils/versioning.util";
2123

2224
/** Default timeout in milliseconds for graceful shutdown. */
2325
const DEFAULT_SHUTDOWN_TIMEOUT_MS = 30000;
@@ -40,6 +42,8 @@ export interface TaskHubGrpcWorkerOptions {
4042
logger?: Logger;
4143
/** Optional timeout in milliseconds for graceful shutdown. Defaults to 30000. */
4244
shutdownTimeoutMs?: number;
45+
/** Optional versioning options for filtering orchestrations by version. */
46+
versioning?: VersioningOptions;
4347
}
4448

4549
export class TaskHubGrpcWorker {
@@ -57,6 +61,7 @@ export class TaskHubGrpcWorker {
5761
private _pendingWorkItems: Set<Promise<void>>;
5862
private _shutdownTimeoutMs: number;
5963
private _backoff: ExponentialBackoff;
64+
private _versioning?: VersioningOptions;
6065

6166
/**
6267
* Creates a new TaskHubGrpcWorker instance.
@@ -103,6 +108,7 @@ export class TaskHubGrpcWorker {
103108
let resolvedMetadataGenerator: MetadataGenerator | undefined;
104109
let resolvedLogger: Logger | undefined;
105110
let resolvedShutdownTimeoutMs: number | undefined;
111+
let resolvedVersioning: VersioningOptions | undefined;
106112

107113
if (typeof hostAddressOrOptions === "object" && hostAddressOrOptions !== null) {
108114
// Options object constructor
@@ -113,6 +119,7 @@ export class TaskHubGrpcWorker {
113119
resolvedMetadataGenerator = hostAddressOrOptions.metadataGenerator;
114120
resolvedLogger = hostAddressOrOptions.logger;
115121
resolvedShutdownTimeoutMs = hostAddressOrOptions.shutdownTimeoutMs;
122+
resolvedVersioning = hostAddressOrOptions.versioning;
116123
} else {
117124
// Deprecated positional parameters constructor
118125
resolvedHostAddress = hostAddressOrOptions;
@@ -142,6 +149,7 @@ export class TaskHubGrpcWorker {
142149
maxDelayMs: 30000,
143150
multiplier: 2,
144151
});
152+
this._versioning = resolvedVersioning;
145153
}
146154

147155
/**
@@ -400,6 +408,74 @@ export class TaskHubGrpcWorker {
400408
await sleep(1000);
401409
}
402410

411+
/**
412+
* Result of version compatibility check.
413+
*/
414+
private _checkVersionCompatibility(req: pb.OrchestratorRequest): {
415+
compatible: boolean;
416+
shouldFail: boolean;
417+
orchestrationVersion?: string;
418+
} {
419+
// If no versioning options configured or match strategy is None, always compatible
420+
if (!this._versioning || this._versioning.matchStrategy === VersionMatchStrategy.None) {
421+
return { compatible: true, shouldFail: false };
422+
}
423+
424+
// Extract orchestration version from ExecutionStarted event
425+
const orchestrationVersion = this._getOrchestrationVersion(req);
426+
const workerVersion = this._versioning.version;
427+
428+
// If worker version is not set, process all
429+
if (!workerVersion) {
430+
return { compatible: true, shouldFail: false };
431+
}
432+
433+
let compatible = false;
434+
435+
switch (this._versioning.matchStrategy) {
436+
case VersionMatchStrategy.Strict:
437+
// Only process if versions match exactly
438+
compatible = orchestrationVersion === workerVersion;
439+
break;
440+
441+
case VersionMatchStrategy.CurrentOrOlder:
442+
// Process if orchestration version is current or older
443+
if (!orchestrationVersion) {
444+
// Empty orchestration version is considered older
445+
compatible = true;
446+
} else {
447+
compatible = compareVersions(orchestrationVersion, workerVersion) <= 0;
448+
}
449+
break;
450+
451+
default:
452+
compatible = true;
453+
}
454+
455+
if (!compatible) {
456+
const shouldFail = this._versioning.failureStrategy === VersionFailureStrategy.Fail;
457+
return { compatible: false, shouldFail, orchestrationVersion };
458+
}
459+
460+
return { compatible: true, shouldFail: false };
461+
}
462+
463+
/**
464+
* Extracts the orchestration version from the ExecutionStarted event in the request.
465+
*/
466+
private _getOrchestrationVersion(req: pb.OrchestratorRequest): string | undefined {
467+
// Look for ExecutionStarted event in both past and new events
468+
const allEvents = [...req.getPasteventsList(), ...req.getNeweventsList()];
469+
470+
for (const event of allEvents) {
471+
if (event.hasExecutionstarted()) {
472+
return event.getExecutionstarted()?.getVersion()?.getValue();
473+
}
474+
}
475+
476+
return undefined;
477+
}
478+
403479
/**
404480
* Executes an orchestrator request and tracks it as a pending work item.
405481
*/
@@ -429,6 +505,55 @@ export class TaskHubGrpcWorker {
429505
throw new Error(`Could not execute the orchestrator as the instanceId was not provided (${instanceId})`);
430506
}
431507

508+
// Check version compatibility if versioning is enabled
509+
const versionCheckResult = this._checkVersionCompatibility(req);
510+
if (!versionCheckResult.compatible) {
511+
if (versionCheckResult.shouldFail) {
512+
// Fail the orchestration with version mismatch error
513+
this._logger.warn(
514+
`Version mismatch for instance '${instanceId}': orchestration version '${versionCheckResult.orchestrationVersion}' does not match worker version '${this._versioning?.version}'. Failing orchestration.`,
515+
);
516+
517+
const failureDetails = pbh.newFailureDetails(
518+
new Error(`Version mismatch: orchestration version '${versionCheckResult.orchestrationVersion}' is not compatible with worker version '${this._versioning?.version}'`),
519+
);
520+
521+
const actions = [
522+
pbh.newCompleteOrchestrationAction(
523+
-1,
524+
pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED,
525+
failureDetails?.toString(),
526+
),
527+
];
528+
529+
const res = new pb.OrchestratorResponse();
530+
res.setInstanceid(instanceId);
531+
res.setCompletiontoken(completionToken);
532+
res.setActionsList(actions);
533+
534+
try {
535+
await callWithMetadata(stub.completeOrchestratorTask.bind(stub), res, this._metadataGenerator);
536+
} catch (e: any) {
537+
this._logger.error(`An error occurred while trying to complete instance '${instanceId}': ${e?.message}`);
538+
}
539+
return;
540+
} else {
541+
// Reject the work item - explicitly abandon it so it can be picked up by another worker
542+
this._logger.info(
543+
`Version mismatch for instance '${instanceId}': orchestration version '${versionCheckResult.orchestrationVersion}' does not match worker version '${this._versioning?.version}'. Abandoning work item.`,
544+
);
545+
546+
try {
547+
const abandonRequest = new pb.AbandonOrchestrationTaskRequest();
548+
abandonRequest.setCompletiontoken(completionToken);
549+
await callWithMetadata(stub.abandonTaskOrchestratorWorkItem.bind(stub), abandonRequest, this._metadataGenerator);
550+
} catch (e: any) {
551+
this._logger.error(`An error occurred while trying to abandon work item for instance '${instanceId}': ${e?.message}`);
552+
}
553+
return;
554+
}
555+
}
556+
432557
let res;
433558

434559
try {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* Strategy for matching orchestration versions when processing work items.
6+
*/
7+
export enum VersionMatchStrategy {
8+
/**
9+
* No version matching - process all orchestrations regardless of version.
10+
*/
11+
None = 0,
12+
13+
/**
14+
* Only process orchestrations that exactly match the worker's version.
15+
*/
16+
Strict = 1,
17+
18+
/**
19+
* Process orchestrations with the current version or older versions.
20+
* Uses semantic versioning comparison.
21+
*/
22+
CurrentOrOlder = 2,
23+
}
24+
25+
/**
26+
* Strategy for handling version mismatches when processing work items.
27+
*/
28+
export enum VersionFailureStrategy {
29+
/**
30+
* Reject the work item and let it be picked up by another worker.
31+
* The orchestration will be retried by a compatible worker.
32+
*/
33+
Reject = 0,
34+
35+
/**
36+
* Fail the orchestration with a version mismatch error.
37+
* This will mark the orchestration as failed.
38+
*/
39+
Fail = 1,
40+
}
41+
42+
/**
43+
* Options for configuring version-based filtering of orchestrations.
44+
*/
45+
export interface VersioningOptions {
46+
/**
47+
* The version of the worker. This is used for version matching when processing orchestrations.
48+
*/
49+
version?: string;
50+
51+
/**
52+
* The strategy for matching orchestration versions.
53+
* @default VersionMatchStrategy.None
54+
*/
55+
matchStrategy?: VersionMatchStrategy;
56+
57+
/**
58+
* The strategy for handling version mismatches.
59+
* @default VersionFailureStrategy.Reject
60+
*/
61+
failureStrategy?: VersionFailureStrategy;
62+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
import { VersionMatchStrategy, VersionFailureStrategy, VersioningOptions } from "../src/worker/versioning-options";
5+
6+
describe("VersioningOptions", () => {
7+
describe("VersionMatchStrategy enum", () => {
8+
it("should have None = 0", () => {
9+
expect(VersionMatchStrategy.None).toBe(0);
10+
});
11+
12+
it("should have Strict = 1", () => {
13+
expect(VersionMatchStrategy.Strict).toBe(1);
14+
});
15+
16+
it("should have CurrentOrOlder = 2", () => {
17+
expect(VersionMatchStrategy.CurrentOrOlder).toBe(2);
18+
});
19+
});
20+
21+
describe("VersionFailureStrategy enum", () => {
22+
it("should have Reject = 0", () => {
23+
expect(VersionFailureStrategy.Reject).toBe(0);
24+
});
25+
26+
it("should have Fail = 1", () => {
27+
expect(VersionFailureStrategy.Fail).toBe(1);
28+
});
29+
});
30+
31+
describe("VersioningOptions interface", () => {
32+
it("should allow creating options with all properties", () => {
33+
const options: VersioningOptions = {
34+
version: "1.0.0",
35+
matchStrategy: VersionMatchStrategy.Strict,
36+
failureStrategy: VersionFailureStrategy.Fail,
37+
};
38+
39+
expect(options.version).toBe("1.0.0");
40+
expect(options.matchStrategy).toBe(VersionMatchStrategy.Strict);
41+
expect(options.failureStrategy).toBe(VersionFailureStrategy.Fail);
42+
});
43+
44+
it("should allow creating options with only version", () => {
45+
const options: VersioningOptions = {
46+
version: "2.0.0",
47+
};
48+
49+
expect(options.version).toBe("2.0.0");
50+
expect(options.matchStrategy).toBeUndefined();
51+
expect(options.failureStrategy).toBeUndefined();
52+
});
53+
54+
it("should allow creating empty options", () => {
55+
const options: VersioningOptions = {};
56+
57+
expect(options.version).toBeUndefined();
58+
expect(options.matchStrategy).toBeUndefined();
59+
expect(options.failureStrategy).toBeUndefined();
60+
});
61+
62+
it("should allow CurrentOrOlder match strategy", () => {
63+
const options: VersioningOptions = {
64+
version: "3.0.0",
65+
matchStrategy: VersionMatchStrategy.CurrentOrOlder,
66+
failureStrategy: VersionFailureStrategy.Reject,
67+
};
68+
69+
expect(options.matchStrategy).toBe(VersionMatchStrategy.CurrentOrOlder);
70+
expect(options.failureStrategy).toBe(VersionFailureStrategy.Reject);
71+
});
72+
73+
it("should allow None match strategy (process all versions)", () => {
74+
const options: VersioningOptions = {
75+
version: "1.0.0",
76+
matchStrategy: VersionMatchStrategy.None,
77+
};
78+
79+
expect(options.matchStrategy).toBe(VersionMatchStrategy.None);
80+
});
81+
});
82+
});

0 commit comments

Comments
 (0)