Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 58 additions & 7 deletions packages/durabletask-js/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { OrchestrationStatus, toProtobuf, fromProtobuf } from "../orchestration/
import { TimeoutError } from "../exception/timeout-error";
import { PurgeResult } from "../orchestration/orchestration-purge-result";
import { PurgeInstanceCriteria } from "../orchestration/orchestration-purge-criteria";
import { PurgeInstanceOptions } from "../orchestration/orchestration-purge-options";
import { TerminateInstanceOptions } from "../orchestration/orchestration-terminate-options";
import { callWithMetadata, MetadataGenerator } from "../utils/grpc-helper.util";
import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "../orchestration/orchestration-query";
import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page";
Expand Down Expand Up @@ -47,12 +49,19 @@ export interface TaskHubGrpcClientOptions {
metadataGenerator?: MetadataGenerator;
/** Optional logger instance. Defaults to ConsoleLogger. */
logger?: Logger;
/**
* The default version to use when starting new orchestrations without an explicit version.
* If specified, this will be used as the version for orchestrations that don't provide
* their own version in StartOrchestrationOptions.
*/
defaultVersion?: string;
}

export class TaskHubGrpcClient {
private _stub: stubs.TaskHubSidecarServiceClient;
private _metadataGenerator?: MetadataGenerator;
private _logger: Logger;
private _defaultVersion?: string;

/**
* Creates a new TaskHubGrpcClient instance.
Expand Down Expand Up @@ -95,6 +104,7 @@ export class TaskHubGrpcClient {
let resolvedCredentials: grpc.ChannelCredentials | undefined;
let resolvedMetadataGenerator: MetadataGenerator | undefined;
let resolvedLogger: Logger | undefined;
let resolvedDefaultVersion: string | undefined;

if (typeof hostAddressOrOptions === "object" && hostAddressOrOptions !== null) {
// Options object constructor
Expand All @@ -104,6 +114,7 @@ export class TaskHubGrpcClient {
resolvedCredentials = hostAddressOrOptions.credentials;
resolvedMetadataGenerator = hostAddressOrOptions.metadataGenerator;
resolvedLogger = hostAddressOrOptions.logger;
resolvedDefaultVersion = hostAddressOrOptions.defaultVersion;
} else {
// Deprecated positional parameters constructor
resolvedHostAddress = hostAddressOrOptions;
Expand All @@ -117,6 +128,7 @@ export class TaskHubGrpcClient {
this._stub = new GrpcClient(resolvedHostAddress, resolvedOptions, resolvedUseTLS, resolvedCredentials).stub;
this._metadataGenerator = resolvedMetadataGenerator;
this._logger = resolvedLogger ?? new ConsoleLogger();
this._defaultVersion = resolvedDefaultVersion;
}

async stop(): Promise<void> {
Expand Down Expand Up @@ -177,6 +189,13 @@ export class TaskHubGrpcClient {
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
? undefined
: instanceIdOrOptions.tags;
const version =
typeof instanceIdOrOptions === "string" || instanceIdOrOptions === undefined
? undefined
: instanceIdOrOptions.version;

// Use provided version, or fall back to client's default version
const effectiveVersion = version ?? this._defaultVersion;

const req = new pb.CreateInstanceRequest();
req.setName(name);
Expand All @@ -191,9 +210,15 @@ export class TaskHubGrpcClient {
req.setInput(i);
req.setScheduledstarttimestamp(ts);

if (effectiveVersion) {
const v = new StringValue();
v.setValue(effectiveVersion);
req.setVersion(v);
}

populateTagsMap(req.getTagsMap(), tags);

this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}`);
this._logger.info(`Starting new ${name} instance with ID = ${req.getInstanceid()}${effectiveVersion ? ` (version: ${effectiveVersion})` : ''}`);
Comment thread
YunchuWang marked this conversation as resolved.
Outdated

const res = await callWithMetadata<pb.CreateInstanceRequest, pb.CreateInstanceResponse>(
this._stub.startInstance.bind(this._stub),
Expand Down Expand Up @@ -364,18 +389,38 @@ export class TaskHubGrpcClient {
* Terminates the orchestrator associated with the provided instance id.
*
* @param {string} instanceId - orchestrator instance id to terminate.
* @param {any} output - The optional output to set for the terminated orchestrator instance.
* @param {any | TerminateInstanceOptions} outputOrOptions - The optional output to set for the terminated orchestrator instance,
* or an options object that can include both output and recursive termination settings.
*/
async terminateOrchestration(instanceId: string, output: any = null): Promise<void> {
async terminateOrchestration(
instanceId: string,
outputOrOptions: any | TerminateInstanceOptions = null,
): Promise<void> {
const req = new pb.TerminateRequest();
req.setInstanceid(instanceId);

let output: any = null;
let recursive = false;

// Check if outputOrOptions is a TerminateInstanceOptions object
if (
outputOrOptions !== null &&
typeof outputOrOptions === 'object' &&
('recursive' in outputOrOptions || 'output' in outputOrOptions)
) {
output = outputOrOptions.output ?? null;
recursive = outputOrOptions.recursive ?? false;
} else {
output = outputOrOptions;
}
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
Comment thread
YunchuWang marked this conversation as resolved.
Outdated

const i = new StringValue();
i.setValue(JSON.stringify(output));

req.setOutput(i);
req.setRecursive(recursive);

this._logger.info(`Terminating '${instanceId}'`);
this._logger.info(`Terminating '${instanceId}'${recursive ? ' (recursive)' : ''}`);

await callWithMetadata<pb.TerminateRequest, pb.TerminateResponse>(
this._stub.terminateInstance.bind(this._stub),
Expand Down Expand Up @@ -537,16 +582,21 @@ export class TaskHubGrpcClient {
*
* @param value - The unique ID of the orchestration instance to purge or orchestration instance filter criteria used
* to determine which instances to purge.
* @param options - Optional options to control the purge behavior, such as recursive purging of sub-orchestrations.
* @returns A Promise that resolves to a {@link PurgeResult} or `undefined` if the purge operation was not successful.
*/
async purgeOrchestration(value: string | PurgeInstanceCriteria): Promise<PurgeResult | undefined> {
async purgeOrchestration(
value: string | PurgeInstanceCriteria,
options?: PurgeInstanceOptions,
): Promise<PurgeResult | undefined> {
let res;
if (typeof value === `string`) {
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
const instanceId = value;
const req = new pb.PurgeInstancesRequest();
req.setInstanceid(instanceId);
req.setRecursive(options?.recursive ?? false);

this._logger.info(`Purging Instance '${instanceId}'`);
this._logger.info(`Purging Instance '${instanceId}'${options?.recursive ? ' (recursive)' : ''}`);

res = await callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
this._stub.purgeInstances.bind(this._stub),
Expand Down Expand Up @@ -574,9 +624,10 @@ export class TaskHubGrpcClient {
filter.addRuntimestatus(toProtobuf(status));
}
req.setPurgeinstancefilter(filter);
req.setRecursive(options?.recursive ?? false);
const timeout = purgeInstanceCriteria.getTimeout();

this._logger.info("Purging Instance using purging criteria");
this._logger.info(`Purging Instances using purging criteria${options?.recursive ? ' (recursive)' : ''}`);
Comment thread
YunchuWang marked this conversation as resolved.
Outdated

const callPromise = callWithMetadata<pb.PurgeInstancesRequest, pb.PurgeInstancesResponse>(
this._stub.purgeInstances.bind(this._stub),
Expand Down
7 changes: 7 additions & 0 deletions packages/durabletask-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
// Client and Worker
export { TaskHubGrpcClient, TaskHubGrpcClientOptions, MetadataGenerator } from "./client/client";
export { TaskHubGrpcWorker, TaskHubGrpcWorkerOptions } from "./worker/task-hub-grpc-worker";
export { VersioningOptions, VersionMatchStrategy, VersionFailureStrategy } from "./worker/versioning-options";

// Contexts
export { OrchestrationContext } from "./task/context/orchestration-context";
export { ActivityContext } from "./task/context/activity-context";

// Orchestration types and utilities
export { PurgeInstanceCriteria } from "./orchestration/orchestration-purge-criteria";
export { PurgeInstanceOptions } from "./orchestration/orchestration-purge-options";
export { TerminateInstanceOptions } from "./orchestration/orchestration-terminate-options";
export { OrchestrationStatus } from "./orchestration/enum/orchestration-status.enum";
export { OrchestrationState } from "./orchestration/orchestration-state";

Expand Down Expand Up @@ -81,3 +84,7 @@ export { TOutput } from "./types/output.type";

// Logger
export { Logger, ConsoleLogger, NoOpLogger } from "./types/logger.type";
export { ReplaySafeLogger, ReplayContext } from "./types/replay-safe-logger";

// Versioning utilities
export { compareVersions } from "./utils/versioning.util";
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* Options for purging orchestration instances.
*/
export interface PurgeInstanceOptions {
/**
* Whether to recursively purge sub-orchestrations as well.
* When true, all child orchestrations spawned by the target orchestration
* will also be purged.
*
* Note: Recursive purging may not be supported by all backend implementations.
*
* @default false
*/
recursive?: boolean;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

/**
* Options for terminating orchestration instances.
*/
export interface TerminateInstanceOptions {
/**
* Whether to recursively terminate sub-orchestrations as well.
* When true, all child orchestrations spawned by the target orchestration
* will also be terminated.
*
* @default false
*/
recursive?: boolean;

/**
* The optional output to set for the terminated orchestrator instance.
*/
output?: any;
}
67 changes: 67 additions & 0 deletions packages/durabletask-js/src/task/context/orchestration-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

import { TActivity } from "../../types/activity.type";
import { TOrchestrator } from "../../types/orchestrator.type";
import { Logger } from "../../types/logger.type";
import { ReplaySafeLogger } from "../../types/replay-safe-logger";
import { TaskOptions, SubOrchestrationOptions } from "../options";
import { Task } from "../task";
import { compareVersions } from "../../utils/versioning.util";

export abstract class OrchestrationContext {
/**
Expand Down Expand Up @@ -38,6 +41,46 @@ export abstract class OrchestrationContext {
*/
abstract get isReplaying(): boolean;

/**
* Gets the version of the current orchestration instance.
*
* The version is set when the orchestration instance is created via the client's
* scheduleNewOrchestration method using StartOrchestrationOptions.version.
* If no version was specified, this returns an empty string.
*
* @returns {string} The version of the current orchestration instance.
*/
abstract get version(): string;

/**
* Compares the current orchestration version to the specified version.
*
* This method uses semantic versioning comparison when both versions are valid
* semantic versions, and falls back to lexicographic comparison otherwise.
*
* @remarks
* - If both versions are empty, this returns 0 (equal).
* - An empty context version is considered less than a defined version.
* - An empty parameter version is considered less than a defined context version.
*
* @param {string} version The version to compare against.
* @returns {number} A negative number if context version < parameter version,
* zero if equal, positive if context version > parameter version.
*
* @example
* ```typescript
* const orchestrator: TOrchestrator = async function* (ctx, input) {
* if (ctx.compareVersionTo("2.0.0") >= 0) {
* // This orchestration is version 2.0.0 or newer
* yield ctx.callActivity(newFeature, input);
* }
* };
* ```
*/
compareVersionTo(version: string): number {
return compareVersions(this.version, version);
}

/**
* Create a timer task that will fire at a specified time.
*
Expand Down Expand Up @@ -128,4 +171,28 @@ export abstract class OrchestrationContext {
* @returns {string} A new deterministic UUID string.
*/
abstract newGuid(): string;

/**
* Creates a replay-safe logger that only writes logs when the orchestrator is not replaying.
*
* During orchestration replay, history events are re-processed to rebuild state.
* This can cause duplicate log entries if not handled properly. The returned logger
* wraps the provided logger and automatically suppresses log output during replay,
* ensuring that logs are only written once when the orchestration is making forward progress.
*
* @param {Logger} logger The underlying logger to wrap.
* @returns {Logger} A replay-safe logger instance.
*
* @example
* ```typescript
* const orchestrator: TOrchestrator = async function* (ctx, input) {
* const logger = ctx.createReplaySafeLogger(myLogger);
* logger.info("This will only be logged once, not during replay");
* yield ctx.callActivity(myActivity, input);
* };
* ```
*/
createReplaySafeLogger(logger: Logger): Logger {
return new ReplaySafeLogger(this, logger);
}
}
11 changes: 11 additions & 0 deletions packages/durabletask-js/src/task/options/task-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ export interface TaskOptions {
* The tags to associate with the task.
*/
tags?: Record<string, string>;
/**
* The version of the task (activity) to execute.
* When specified, only workers that handle this version will process the task.
*/
version?: string;
}

/**
Expand Down Expand Up @@ -48,6 +53,12 @@ export interface StartOrchestrationOptions {
* The tags to associate with the orchestration instance.
*/
tags?: Record<string, string>;
/**
* The version of the orchestration to execute.
* This version is stored with the orchestration instance and can be accessed
* via the OrchestrationContext.version property.
*/
version?: string;
}

/**
Expand Down
Loading
Loading