Skip to content

Commit 834d7f1

Browse files
committed
feat: Add versioning support and replay-safe logging recursive terminating/purging
1 parent 0376f94 commit 834d7f1

15 files changed

Lines changed: 992 additions & 9 deletions

packages/durabletask-js/src/client/client.ts

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import { OrchestrationStatus, toProtobuf, fromProtobuf } from "../orchestration/
1717
import { TimeoutError } from "../exception/timeout-error";
1818
import { PurgeResult } from "../orchestration/orchestration-purge-result";
1919
import { PurgeInstanceCriteria } from "../orchestration/orchestration-purge-criteria";
20+
import { PurgeInstanceOptions } from "../orchestration/orchestration-purge-options";
21+
import { TerminateInstanceOptions } from "../orchestration/orchestration-terminate-options";
2022
import { callWithMetadata, MetadataGenerator } from "../utils/grpc-helper.util";
2123
import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "../orchestration/orchestration-query";
2224
import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page";
@@ -177,6 +179,10 @@ export class TaskHubGrpcClient {
177179
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
178180
? undefined
179181
: instanceIdOrOptions.tags;
182+
const version =
183+
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
184+
? undefined
185+
: instanceIdOrOptions.version;
180186

181187
const req = new pb.CreateInstanceRequest();
182188
req.setName(name);
@@ -191,9 +197,15 @@ export class TaskHubGrpcClient {
191197
req.setInput(i);
192198
req.setScheduledstarttimestamp(ts);
193199

200+
if (version) {
201+
const v = new StringValue();
202+
v.setValue(version);
203+
req.setVersion(v);
204+
}
205+
194206
populateTagsMap(req.getTagsMap(), tags);
195207

196-
this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}`);
208+
this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}${version ? ` (version: ${version})` : ''}`);
197209

198210
const res = await callWithMetadata<pb.CreateInstanceRequest, pb.CreateInstanceResponse>(
199211
this._stub.startInstance.bind(this._stub),
@@ -364,18 +376,38 @@ export class TaskHubGrpcClient {
364376
* Terminates the orchestrator associated with the provided instance id.
365377
*
366378
* @param {string} instanceId - orchestrator instance id to terminate.
367-
* @param {any} output - The optional output to set for the terminated orchestrator instance.
379+
* @param {any | TerminateInstanceOptions} outputOrOptions - The optional output to set for the terminated orchestrator instance,
380+
* or an options object that can include both output and recursive termination settings.
368381
*/
369-
async terminateOrchestration(instanceId: string, output: any = null): Promise<void> {
382+
async terminateOrchestration(
383+
instanceId: string,
384+
outputOrOptions: any | TerminateInstanceOptions = null,
385+
): Promise<void> {
370386
const req = new pb.TerminateRequest();
371387
req.setInstanceid(instanceId);
372388

389+
let output: any = null;
390+
let recursive = false;
391+
392+
// Check if outputOrOptions is a TerminateInstanceOptions object
393+
if (
394+
outputOrOptions !== null &&
395+
typeof outputOrOptions === 'object' &&
396+
('recursive' in outputOrOptions || 'output' in outputOrOptions)
397+
) {
398+
output = outputOrOptions.output ?? null;
399+
recursive = outputOrOptions.recursive ?? false;
400+
} else {
401+
output = outputOrOptions;
402+
}
403+
373404
const i = new StringValue();
374405
i.setValue(JSON.stringify(output));
375406

376407
req.setOutput(i);
408+
req.setRecursive(recursive);
377409

378-
this._logger.info(`Terminating '${instanceId}'`);
410+
this._logger.info(`Terminating '${instanceId}'${recursive ? ' (recursive)' : ''}`);
379411

380412
await callWithMetadata<pb.TerminateRequest, pb.TerminateResponse>(
381413
this._stub.terminateInstance.bind(this._stub),
@@ -537,16 +569,21 @@ export class TaskHubGrpcClient {
537569
*
538570
* @param value - The unique ID of the orchestration instance to purge or orchestration instance filter criteria used
539571
* to determine which instances to purge.
572+
* @param options - Optional options to control the purge behavior, such as recursive purging of sub-orchestrations.
540573
* @returns A Promise that resolves to a {@link PurgeResult} or `undefined` if the purge operation was not successful.
541574
*/
542-
async purgeOrchestration(value: string | PurgeInstanceCriteria): Promise<PurgeResult | undefined> {
575+
async purgeOrchestration(
576+
value: string | PurgeInstanceCriteria,
577+
options?: PurgeInstanceOptions,
578+
): Promise<PurgeResult | undefined> {
543579
let res;
544580
if (typeof value === `string`) {
545581
const instanceId = value;
546582
const req = new pb.PurgeInstancesRequest();
547583
req.setInstanceid(instanceId);
584+
req.setRecursive(options?.recursive ?? false);
548585

549-
this._logger.info(`Purging Instance '${instanceId}'`);
586+
this._logger.info(`Purging Instance '${instanceId}'${options?.recursive ? ' (recursive)' : ''}`);
550587

551588
res = await callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
552589
this._stub.purgeInstances.bind(this._stub),
@@ -574,9 +611,10 @@ export class TaskHubGrpcClient {
574611
filter.addRuntimestatus(toProtobuf(status));
575612
}
576613
req.setPurgeinstancefilter(filter);
614+
req.setRecursive(options?.recursive ?? false);
577615
const timeout = purgeInstanceCriteria.getTimeout();
578616

579-
this._logger.info("Purging Instance using purging criteria");
617+
this._logger.info(`Purging Instances using purging criteria${options?.recursive ? ' (recursive)' : ''}`);
580618

581619
const callPromise = callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
582620
this._stub.purgeInstances.bind(this._stub),

packages/durabletask-js/src/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ export { ActivityContext } from "./task/context/activity-context";
1111

1212
// Orchestration types and utilities
1313
export { PurgeInstanceCriteria } from "./orchestration/orchestration-purge-criteria";
14+
export { PurgeInstanceOptions } from "./orchestration/orchestration-purge-options";
15+
export { TerminateInstanceOptions } from "./orchestration/orchestration-terminate-options";
1416
export { OrchestrationStatus } from "./orchestration/enum/orchestration-status.enum";
1517
export { OrchestrationState } from "./orchestration/orchestration-state";
1618

@@ -81,3 +83,7 @@ export { TOutput } from "./types/output.type";
8183

8284
// Logger
8385
export { Logger, ConsoleLogger, NoOpLogger } from "./types/logger.type";
86+
export { ReplaySafeLogger, ReplayContext } from "./types/replay-safe-logger";
87+
88+
// Versioning utilities
89+
export { compareVersions } from "./utils/versioning.util";
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* Options for purging orchestration instances.
6+
*/
7+
export interface PurgeInstanceOptions {
8+
/**
9+
* Whether to recursively purge sub-orchestrations as well.
10+
* When true, all child orchestrations spawned by the target orchestration
11+
* will also be purged.
12+
*
13+
* Note: Recursive purging may not be supported by all backend implementations.
14+
*
15+
* @default false
16+
*/
17+
recursive?: boolean;
18+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* Options for terminating orchestration instances.
6+
*/
7+
export interface TerminateInstanceOptions {
8+
/**
9+
* Whether to recursively terminate sub-orchestrations as well.
10+
* When true, all child orchestrations spawned by the target orchestration
11+
* will also be terminated.
12+
*
13+
* @default false
14+
*/
15+
recursive?: boolean;
16+
17+
/**
18+
* The optional output to set for the terminated orchestrator instance.
19+
*/
20+
output?: any;
21+
}

packages/durabletask-js/src/task/context/orchestration-context.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
import { TActivity } from "../../types/activity.type";
55
import { TOrchestrator } from "../../types/orchestrator.type";
6+
import { Logger } from "../../types/logger.type";
7+
import { ReplaySafeLogger } from "../../types/replay-safe-logger";
68
import { TaskOptions, SubOrchestrationOptions } from "../options";
79
import { Task } from "../task";
810

@@ -38,6 +40,17 @@ export abstract class OrchestrationContext {
3840
*/
3941
abstract get isReplaying(): boolean;
4042

43+
/**
44+
* Gets the version of the current orchestration instance.
45+
*
46+
* The version is set when the orchestration instance is created via the client's
47+
* scheduleNewOrchestration method using StartOrchestrationOptions.version.
48+
* If no version was specified, this returns an empty string.
49+
*
50+
* @returns {string} The version of the current orchestration instance.
51+
*/
52+
abstract get version(): string;
53+
4154
/**
4255
* Create a timer task that will fire at a specified time.
4356
*
@@ -128,4 +141,28 @@ export abstract class OrchestrationContext {
128141
* @returns {string} A new deterministic UUID string.
129142
*/
130143
abstract newGuid(): string;
144+
145+
/**
146+
* Creates a replay-safe logger that only writes logs when the orchestrator is not replaying.
147+
*
148+
* During orchestration replay, history events are re-processed to rebuild state.
149+
* This can cause duplicate log entries if not handled properly. The returned logger
150+
* wraps the provided logger and automatically suppresses log output during replay,
151+
* ensuring that logs are only written once when the orchestration is making forward progress.
152+
*
153+
* @param {Logger} logger The underlying logger to wrap.
154+
* @returns {Logger} A replay-safe logger instance.
155+
*
156+
* @example
157+
* ```typescript
158+
* const orchestrator: TOrchestrator = async function* (ctx, input) {
159+
* const logger = ctx.createReplaySafeLogger(myLogger);
160+
* logger.info("This will only be logged once, not during replay");
161+
* yield ctx.callActivity(myActivity, input);
162+
* };
163+
* ```
164+
*/
165+
createReplaySafeLogger(logger: Logger): Logger {
166+
return new ReplaySafeLogger(this, logger);
167+
}
131168
}

packages/durabletask-js/src/task/options/task-options.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ export interface TaskOptions {
1616
* The tags to associate with the task.
1717
*/
1818
tags?: Record<string, string>;
19+
/**
20+
* The version of the task (activity) to execute.
21+
* When specified, only workers that handle this version will process the task.
22+
*/
23+
version?: string;
1924
}
2025

2126
/**
@@ -48,6 +53,12 @@ export interface StartOrchestrationOptions {
4853
* The tags to associate with the orchestration instance.
4954
*/
5055
tags?: Record<string, string>;
56+
/**
57+
* The version of the orchestration to execute.
58+
* This version is stored with the orchestration instance and can be accessed
59+
* via the OrchestrationContext.version property.
60+
*/
61+
version?: string;
5162
}
5263

5364
/**
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
import { Logger } from "./logger.type";
5+
6+
/**
7+
* Interface representing a context that can determine if replay is occurring.
8+
* This is used by ReplaySafeLogger to check if logging should be suppressed.
9+
*/
10+
export interface ReplayContext {
11+
/**
12+
* Whether the orchestrator is currently replaying from history.
13+
*/
14+
readonly isReplaying: boolean;
15+
}
16+
17+
/**
18+
* A logger wrapper that only logs when the orchestration is not replaying.
19+
*
20+
* During orchestration replay, history events are re-processed to rebuild state.
21+
* This can cause duplicate log entries if not handled properly. The ReplaySafeLogger
22+
* wraps an existing logger and automatically suppresses log output during replay,
23+
* ensuring that logs are only written once when the orchestration is making forward progress.
24+
*
25+
* @example
26+
* ```typescript
27+
* // Inside an orchestrator function:
28+
* const logger = ctx.createReplaySafeLogger(myLogger);
29+
* logger.info("This will only be logged once, not during replay");
30+
* ```
31+
*/
32+
export class ReplaySafeLogger implements Logger {
33+
private readonly context: ReplayContext;
34+
private readonly innerLogger: Logger;
35+
36+
/**
37+
* Creates a new ReplaySafeLogger.
38+
*
39+
* @param context - The replay context used to determine if replay is occurring.
40+
* @param logger - The underlying logger to delegate to when not replaying.
41+
*/
42+
constructor(context: ReplayContext, logger: Logger) {
43+
if (!context) {
44+
throw new Error("context is required");
45+
}
46+
if (!logger) {
47+
throw new Error("logger is required");
48+
}
49+
this.context = context;
50+
this.innerLogger = logger;
51+
}
52+
53+
/**
54+
* Logs an error message if not replaying.
55+
* @param message - The error message to log.
56+
* @param args - Additional arguments to include in the log.
57+
*/
58+
error(message: string, ...args: unknown[]): void {
59+
if (!this.context.isReplaying) {
60+
this.innerLogger.error(message, ...args);
61+
}
62+
}
63+
64+
/**
65+
* Logs a warning message if not replaying.
66+
* @param message - The warning message to log.
67+
* @param args - Additional arguments to include in the log.
68+
*/
69+
warn(message: string, ...args: unknown[]): void {
70+
if (!this.context.isReplaying) {
71+
this.innerLogger.warn(message, ...args);
72+
}
73+
}
74+
75+
/**
76+
* Logs an informational message if not replaying.
77+
* @param message - The informational message to log.
78+
* @param args - Additional arguments to include in the log.
79+
*/
80+
info(message: string, ...args: unknown[]): void {
81+
if (!this.context.isReplaying) {
82+
this.innerLogger.info(message, ...args);
83+
}
84+
}
85+
86+
/**
87+
* Logs a debug message if not replaying.
88+
* @param message - The debug message to log.
89+
* @param args - Additional arguments to include in the log.
90+
*/
91+
debug(message: string, ...args: unknown[]): void {
92+
if (!this.context.isReplaying) {
93+
this.innerLogger.debug(message, ...args);
94+
}
95+
}
96+
}

packages/durabletask-js/src/utils/pb-helper.util.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,11 +306,15 @@ export function newScheduleTaskAction(
306306
name: string,
307307
encodedInput?: string,
308308
tags?: Record<string, string>,
309+
version?: string,
309310
): pb.OrchestratorAction {
310311
const scheduleTaskAction = new pb.ScheduleTaskAction();
311312
scheduleTaskAction.setName(name);
312313
scheduleTaskAction.setInput(getStringValue(encodedInput));
313314
populateTagsMap(scheduleTaskAction.getTagsMap(), tags);
315+
if (version) {
316+
scheduleTaskAction.setVersion(getStringValue(version));
317+
}
314318

315319
const action = new pb.OrchestratorAction();
316320
action.setId(id);
@@ -331,12 +335,16 @@ export function newCreateSubOrchestrationAction(
331335
instanceId?: string | null,
332336
encodedInput?: string,
333337
tags?: Record<string, string>,
338+
version?: string,
334339
): pb.OrchestratorAction {
335340
const createSubOrchestrationAction = new pb.CreateSubOrchestrationAction();
336341
createSubOrchestrationAction.setName(name);
337342
createSubOrchestrationAction.setInstanceid(instanceId || "");
338343
createSubOrchestrationAction.setInput(getStringValue(encodedInput));
339344
populateTagsMap(createSubOrchestrationAction.getTagsMap(), tags);
345+
if (version) {
346+
createSubOrchestrationAction.setVersion(getStringValue(version));
347+
}
340348

341349
const action = new pb.OrchestratorAction();
342350
action.setId(id);

0 commit comments

Comments
 (0)