From 21b72f28975bbc9ec7875521a3a2d7d255beb774 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 30 Jan 2026 11:21:52 -0800 Subject: [PATCH 1/5] Support retryoptions when calling act/suborch delete old task when scheduling retries --- packages/durabletask-js/src/index.ts | 4 + .../src/task/context/orchestration-context.ts | 21 +- .../durabletask-js/src/task/options/index.ts | 9 + .../src/task/options/task-options.ts | 71 +++++ .../src/task/retry-timer-task.ts | 31 ++ .../durabletask-js/src/task/retry/index.ts | 4 + .../src/task/retry/retry-policy.ts | 159 ++++++++++ .../durabletask-js/src/task/retryable-task.ts | 209 +++++++++++++ .../src/worker/orchestration-executor.ts | 110 +++++-- .../worker/runtime-orchestration-context.ts | 97 +++++- .../test/orchestration_executor.spec.ts | 176 +++++++++++ .../durabletask-js/test/retry-policy.spec.ts | 151 +++++++++ .../test/retryable-task.spec.ts | 289 ++++++++++++++++++ test/e2e-azuremanaged/orchestration.spec.ts | 210 +++++++++++++ 14 files changed, 1502 insertions(+), 39 deletions(-) create mode 100644 packages/durabletask-js/src/task/options/index.ts create mode 100644 packages/durabletask-js/src/task/options/task-options.ts create mode 100644 packages/durabletask-js/src/task/retry-timer-task.ts create mode 100644 packages/durabletask-js/src/task/retry/index.ts create mode 100644 packages/durabletask-js/src/task/retry/retry-policy.ts create mode 100644 packages/durabletask-js/src/task/retryable-task.ts create mode 100644 packages/durabletask-js/test/retry-policy.spec.ts create mode 100644 packages/durabletask-js/test/retryable-task.spec.ts diff --git a/packages/durabletask-js/src/index.ts b/packages/durabletask-js/src/index.ts index eb1b345..f63e990 100644 --- a/packages/durabletask-js/src/index.ts +++ b/packages/durabletask-js/src/index.ts @@ -20,6 +20,10 @@ export { OrchestrationStatus as ProtoOrchestrationStatus } from "./proto/orchest export { getName, whenAll, whenAny } from "./task"; export { Task } from "./task/task"; +// Retry policies and task options +export { RetryPolicy, RetryPolicyOptions } from "./task/retry"; +export { TaskOptions, SubOrchestrationOptions, taskOptionsFromRetryPolicy, subOrchestrationOptionsFromRetryPolicy } from "./task/options"; + // Types export { TOrchestrator } from "./types/orchestrator.type"; export { TActivity } from "./types/activity.type"; diff --git a/packages/durabletask-js/src/task/context/orchestration-context.ts b/packages/durabletask-js/src/task/context/orchestration-context.ts index cb49151..c27363f 100644 --- a/packages/durabletask-js/src/task/context/orchestration-context.ts +++ b/packages/durabletask-js/src/task/context/orchestration-context.ts @@ -3,6 +3,7 @@ import { TActivity } from "../../types/activity.type"; import { TOrchestrator } from "../../types/orchestrator.type"; +import { TaskOptions, SubOrchestrationOptions } from "../options"; import { Task } from "../task"; export abstract class OrchestrationContext { @@ -48,27 +49,31 @@ export abstract class OrchestrationContext { /** * Schedule an activity for execution. * - * @param {Orchestrator} orchestrator The sub-orchestrator function to call. - * @param {TInput} input The JSON-serializable input value for the sub-orchestrator function. - * @param {string} instanceId The ID to use for the sub-orchestration instance. If not provided, a new GUID will be used. + * @param {TActivity} activity The activity function to call. + * @param {TInput} input The JSON-serializable input value for the activity function. + * @param {TaskOptions} options Optional options to control the behavior of the activity execution, including retry policies. * - * @returns {Task} A Durable Task that completes when the sub-orchestrator function completes. + * @returns {Task} A Durable Task that completes when the activity function completes. */ - abstract callActivity(activity: TActivity | string, input?: TInput): Task; + abstract callActivity( + activity: TActivity | string, + input?: TInput, + options?: TaskOptions, + ): Task; /** * Schedule sub-orchestrator function for execution. * - * @param orchestrator A reference to the orchestrator function call + * @param orchestrator A reference to the orchestrator function to call. * @param input The JSON-serializable input value for the orchestrator function. - * @param instanceId A unique ID to use for the sub-orchestration instance. If not provided, a new GUID will be used. + * @param options Optional options to control the behavior of the sub-orchestration, including retry policies and instance ID. * * @returns {Task} A Durable Task that completes when the sub-orchestrator function completes. */ abstract callSubOrchestrator( orchestrator: TOrchestrator | string, input?: TInput, - instanceId?: string, + options?: SubOrchestrationOptions, ): Task; /** diff --git a/packages/durabletask-js/src/task/options/index.ts b/packages/durabletask-js/src/task/options/index.ts new file mode 100644 index 0000000..73e5f24 --- /dev/null +++ b/packages/durabletask-js/src/task/options/index.ts @@ -0,0 +1,9 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +export { + TaskOptions, + SubOrchestrationOptions, + taskOptionsFromRetryPolicy, + subOrchestrationOptionsFromRetryPolicy, +} from "./task-options"; diff --git a/packages/durabletask-js/src/task/options/task-options.ts b/packages/durabletask-js/src/task/options/task-options.ts new file mode 100644 index 0000000..37a3ed7 --- /dev/null +++ b/packages/durabletask-js/src/task/options/task-options.ts @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { RetryPolicy } from "../retry/retry-policy"; + +/** + * Options that can be used to control the behavior of orchestrator task execution. + */ +export interface TaskOptions { + /** + * The retry policy for the task. + * Controls how many times a task is retried and the delay between retries. + */ + retry?: RetryPolicy; +} + +/** + * Options that can be used to control the behavior of sub-orchestrator execution. + * Extends TaskOptions with additional options specific to sub-orchestrations. + */ +export interface SubOrchestrationOptions extends TaskOptions { + /** + * The unique ID to use for the sub-orchestration instance. + * If not specified, a deterministic ID will be generated based on the parent instance ID. + */ + instanceId?: string; +} + +/** + * Creates a TaskOptions instance from a RetryPolicy. + * + * @param policy - The retry policy to use + * @returns A TaskOptions instance configured with the retry policy + * + * @example + * ```typescript + * const retryPolicy = new RetryPolicy({ + * maxNumberOfAttempts: 3, + * firstRetryIntervalInMilliseconds: 1000 + * }); + * + * const options = taskOptionsFromRetryPolicy(retryPolicy); + * ``` + */ +export function taskOptionsFromRetryPolicy(policy: RetryPolicy): TaskOptions { + return { retry: policy }; +} + +/** + * Creates a SubOrchestrationOptions instance from a RetryPolicy and optional instance ID. + * + * @param policy - The retry policy to use + * @param instanceId - Optional instance ID for the sub-orchestration + * @returns A SubOrchestrationOptions instance configured with the retry policy + * + * @example + * ```typescript + * const retryPolicy = new RetryPolicy({ + * maxNumberOfAttempts: 3, + * firstRetryIntervalInMilliseconds: 1000 + * }); + * + * const options = subOrchestrationOptionsFromRetryPolicy(retryPolicy, "my-sub-orch-123"); + * ``` + */ +export function subOrchestrationOptionsFromRetryPolicy( + policy: RetryPolicy, + instanceId?: string, +): SubOrchestrationOptions { + return { retry: policy, instanceId }; +} diff --git a/packages/durabletask-js/src/task/retry-timer-task.ts b/packages/durabletask-js/src/task/retry-timer-task.ts new file mode 100644 index 0000000..b010afd --- /dev/null +++ b/packages/durabletask-js/src/task/retry-timer-task.ts @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { CompletableTask } from "./completable-task"; +import { RetryableTask } from "./retryable-task"; + +/** + * A timer task that is associated with a retryable task for retry purposes. + * + * When this timer fires, the retryable task should be rescheduled for another attempt. + */ +export class RetryTimerTask extends CompletableTask { + private readonly _retryableParent: RetryableTask; + + /** + * Creates a new RetryTimerTask. + * + * @param retryableParent - The retryable task that this timer is associated with + */ + constructor(retryableParent: RetryableTask) { + super(); + this._retryableParent = retryableParent; + } + + /** + * Gets the retryable task that this timer is associated with. + */ + get retryableParent(): RetryableTask { + return this._retryableParent; + } +} diff --git a/packages/durabletask-js/src/task/retry/index.ts b/packages/durabletask-js/src/task/retry/index.ts new file mode 100644 index 0000000..fc4250a --- /dev/null +++ b/packages/durabletask-js/src/task/retry/index.ts @@ -0,0 +1,4 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +export { RetryPolicy, RetryPolicyOptions } from "./retry-policy"; diff --git a/packages/durabletask-js/src/task/retry/retry-policy.ts b/packages/durabletask-js/src/task/retry/retry-policy.ts new file mode 100644 index 0000000..c057b66 --- /dev/null +++ b/packages/durabletask-js/src/task/retry/retry-policy.ts @@ -0,0 +1,159 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * A declarative retry policy that can be configured for activity or sub-orchestration calls. + * + * @remarks + * Retry policies control how many times a task is retried and the delay between retries. + * The delay between retries increases exponentially based on the backoffCoefficient. + * + * @example + * ```typescript + * const retryPolicy = new RetryPolicy({ + * maxNumberOfAttempts: 5, + * firstRetryIntervalInMilliseconds: 1000, + * backoffCoefficient: 2.0, + * maxRetryIntervalInMilliseconds: 30000, + * retryTimeoutInMilliseconds: 300000 + * }); + * ``` + */ +export class RetryPolicy { + private readonly _maxNumberOfAttempts: number; + private readonly _firstRetryIntervalInMilliseconds: number; + private readonly _backoffCoefficient: number; + private readonly _maxRetryIntervalInMilliseconds: number; + private readonly _retryTimeoutInMilliseconds: number; + + /** + * Creates a new RetryPolicy instance. + * + * @param options - The retry policy options + * @throws Error if any of the validation constraints are violated + */ + constructor(options: RetryPolicyOptions) { + const { + maxNumberOfAttempts, + firstRetryIntervalInMilliseconds, + backoffCoefficient = 1.0, + maxRetryIntervalInMilliseconds, + retryTimeoutInMilliseconds, + } = options; + + // Validation aligned with .NET SDK + if (maxNumberOfAttempts <= 0) { + throw new Error("maxNumberOfAttempts must be greater than zero"); + } + + if (firstRetryIntervalInMilliseconds <= 0) { + throw new Error("firstRetryIntervalInMilliseconds must be greater than zero"); + } + + if (backoffCoefficient < 1.0) { + throw new Error("backoffCoefficient must be greater than or equal to 1.0"); + } + + if ( + maxRetryIntervalInMilliseconds !== undefined && + maxRetryIntervalInMilliseconds !== -1 && + maxRetryIntervalInMilliseconds < firstRetryIntervalInMilliseconds + ) { + throw new Error("maxRetryIntervalInMilliseconds must be greater than or equal to firstRetryIntervalInMilliseconds"); + } + + if ( + retryTimeoutInMilliseconds !== undefined && + retryTimeoutInMilliseconds !== -1 && + retryTimeoutInMilliseconds < firstRetryIntervalInMilliseconds + ) { + throw new Error("retryTimeoutInMilliseconds must be greater than or equal to firstRetryIntervalInMilliseconds"); + } + + this._maxNumberOfAttempts = maxNumberOfAttempts; + this._firstRetryIntervalInMilliseconds = firstRetryIntervalInMilliseconds; + this._backoffCoefficient = backoffCoefficient; + // Default to 1 hour (3600000ms) if not specified, -1 means infinite + this._maxRetryIntervalInMilliseconds = maxRetryIntervalInMilliseconds ?? 3600000; + // Default to -1 (infinite) if not specified + this._retryTimeoutInMilliseconds = retryTimeoutInMilliseconds ?? -1; + } + + /** + * Gets the max number of attempts for executing a given task. + */ + get maxNumberOfAttempts(): number { + return this._maxNumberOfAttempts; + } + + /** + * Gets the amount of time in milliseconds to delay between the first and second attempt. + */ + get firstRetryIntervalInMilliseconds(): number { + return this._firstRetryIntervalInMilliseconds; + } + + /** + * Gets the exponential back-off coefficient used to determine the delay between subsequent retries. + * @remarks Defaults to 1.0 for no back-off. + */ + get backoffCoefficient(): number { + return this._backoffCoefficient; + } + + /** + * Gets the maximum time in milliseconds to delay between attempts. + * @remarks Defaults to 1 hour (3600000ms). Use -1 for infinite. + */ + get maxRetryIntervalInMilliseconds(): number { + return this._maxRetryIntervalInMilliseconds; + } + + /** + * Gets the overall timeout for retries in milliseconds. + * No further attempts will be made after this timeout expires. + * @remarks Defaults to -1 (infinite). + */ + get retryTimeoutInMilliseconds(): number { + return this._retryTimeoutInMilliseconds; + } +} + +/** + * Options for creating a RetryPolicy. + */ +export interface RetryPolicyOptions { + /** + * The maximum number of task invocation attempts. Must be 1 or greater. + */ + maxNumberOfAttempts: number; + + /** + * The amount of time in milliseconds to delay between the first and second attempt. + * Must be greater than 0. + */ + firstRetryIntervalInMilliseconds: number; + + /** + * The exponential back-off coefficient used to determine the delay between subsequent retries. + * Must be 1.0 or greater. + * @default 1.0 + */ + backoffCoefficient?: number; + + /** + * The maximum time in milliseconds to delay between attempts. + * Must be greater than or equal to firstRetryIntervalInMilliseconds. + * Use -1 for infinite (no maximum). + * @default 3600000 (1 hour) + */ + maxRetryIntervalInMilliseconds?: number; + + /** + * The overall timeout for retries in milliseconds. + * No further attempts will be made after this timeout expires. + * Use -1 for infinite (no timeout). + * @default -1 (infinite) + */ + retryTimeoutInMilliseconds?: number; +} diff --git a/packages/durabletask-js/src/task/retryable-task.ts b/packages/durabletask-js/src/task/retryable-task.ts new file mode 100644 index 0000000..19b7c37 --- /dev/null +++ b/packages/durabletask-js/src/task/retryable-task.ts @@ -0,0 +1,209 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import * as pb from "../proto/orchestrator_service_pb"; +import { TaskFailedError } from "./exception/task-failed-error"; +import { CompletableTask } from "./completable-task"; +import { RetryPolicy } from "./retry/retry-policy"; + +/** + * Represents the type of retryable task - activity or sub-orchestration. + */ +export type RetryableTaskType = "activity" | "subOrchestration"; + +/** + * A task that can be retried according to a retry policy. + * + * @remarks + * This class extends CompletableTask and adds retry tracking and delay computation. + * It tracks the number of attempts and computes the next retry delay based on + * exponential backoff with the configured retry policy. + */ +export class RetryableTask extends CompletableTask { + private readonly _retryPolicy: RetryPolicy; + private readonly _action: pb.OrchestratorAction; + private readonly _startTime: Date; + private readonly _taskType: RetryableTaskType; + private _attemptCount: number; + private _lastFailure: pb.TaskFailureDetails | undefined; + + /** + * Creates a new RetryableTask instance. + * + * @param retryPolicy - The retry policy to use for this task + * @param action - The orchestrator action associated with this task + * @param startTime - The time when the task was first scheduled + * @param taskType - The type of task (activity or sub-orchestration) + */ + constructor( + retryPolicy: RetryPolicy, + action: pb.OrchestratorAction, + startTime: Date, + taskType: RetryableTaskType, + ) { + super(); + this._retryPolicy = retryPolicy; + this._action = action; + this._startTime = startTime; + this._taskType = taskType; + this._attemptCount = 1; + } + + /** + * Gets the retry policy for this task. + */ + get retryPolicy(): RetryPolicy { + return this._retryPolicy; + } + + /** + * Gets the orchestrator action associated with this task. + */ + get action(): pb.OrchestratorAction { + return this._action; + } + + /** + * Gets the current attempt count. + */ + get attemptCount(): number { + return this._attemptCount; + } + + /** + * Gets the time when the task was first scheduled. + */ + get startTime(): Date { + return this._startTime; + } + + /** + * Gets the type of task (activity or sub-orchestration). + */ + get taskType(): RetryableTaskType { + return this._taskType; + } + + /** + * Gets the last failure details. + */ + get lastFailure(): pb.TaskFailureDetails | undefined { + return this._lastFailure; + } + + /** + * Increments the attempt count. + */ + incrementAttemptCount(): void { + this._attemptCount++; + } + + /** + * Records a failure for potential retry. + * + * @param message - The failure message + * @param details - The failure details from the protobuf + */ + recordFailure(message: string, details?: pb.TaskFailureDetails): void { + details = details ?? new pb.TaskFailureDetails(); + this._lastFailure = details; + + // Store the exception for later if we exhaust retries + this._exception = new TaskFailedError(message, details); + } + + /** + * Completes the task with the given result. + * Clears any previously recorded failure since the retry succeeded. + * + * @param result - The result of the task + */ + override complete(result: T): void { + // Clear any previously recorded failure since the retry succeeded + this._exception = undefined; + this._lastFailure = undefined; + + // Call the parent implementation + super.complete(result); + } + + /** + * Marks the task as failed after exhausting all retry attempts. + * + * @param message - The failure message + * @param details - Optional failure details + */ + override fail(message: string, details?: pb.TaskFailureDetails): void { + if (this._isComplete) { + throw new Error("Task is already completed"); + } + + details = details ?? new pb.TaskFailureDetails(); + + // Create error message that includes task type and task ID + const taskTypeLabel = this._taskType === "activity" ? "Activity" : "Sub-orchestration"; + const fullMessage = `${taskTypeLabel} task #${this._action.getId()} failed: ${message}`; + + this._exception = new TaskFailedError(fullMessage, details); + this._isComplete = true; + + if (this._parent) { + this._parent.onChildCompleted(this); + } + } + + /** + * Computes the next retry delay in milliseconds. + * + * @param currentTime - The current orchestration time (for deterministic replay) + * @returns The delay in milliseconds until the next retry, or undefined if no more retries should be attempted + * + * @remarks + * Returns undefined if: + * - The maximum number of attempts has been reached + * - The retry timeout has been exceeded + * + * The delay is calculated using exponential backoff: + * delay = firstRetryInterval * (backoffCoefficient ^ (attemptCount - 1)) + * + * The delay is capped at maxRetryInterval. + */ + computeNextDelayInMilliseconds(currentTime: Date): number | undefined { + // Check if we've exhausted max attempts + if (this._attemptCount >= this._retryPolicy.maxNumberOfAttempts) { + return undefined; + } + + // Check if we've exceeded the retry timeout + if (this._retryPolicy.retryTimeoutInMilliseconds !== -1) { + const elapsedTime = currentTime.getTime() - this._startTime.getTime(); + if (elapsedTime >= this._retryPolicy.retryTimeoutInMilliseconds) { + return undefined; + } + } + + // Calculate the next delay using exponential backoff + // delay = firstRetryInterval * (backoffCoefficient ^ (attemptCount - 1)) + const backoffCoefficient = this._retryPolicy.backoffCoefficient; + const exponent = this._attemptCount - 1; + let nextDelay = + this._retryPolicy.firstRetryIntervalInMilliseconds * Math.pow(backoffCoefficient, exponent); + + // Cap at maxRetryInterval if specified and not infinite + if (this._retryPolicy.maxRetryIntervalInMilliseconds !== -1) { + nextDelay = Math.min(nextDelay, this._retryPolicy.maxRetryIntervalInMilliseconds); + } + + // Check if the computed delay would exceed the retry timeout + if (this._retryPolicy.retryTimeoutInMilliseconds !== -1) { + const elapsedTime = currentTime.getTime() - this._startTime.getTime(); + const remainingTime = this._retryPolicy.retryTimeoutInMilliseconds - elapsedTime; + if (nextDelay > remainingTime) { + // No more retries - timeout would be exceeded + return undefined; + } + } + + return nextDelay; + } +} diff --git a/packages/durabletask-js/src/worker/orchestration-executor.ts b/packages/durabletask-js/src/worker/orchestration-executor.ts index b8fd7a0..502149f 100644 --- a/packages/durabletask-js/src/worker/orchestration-executor.ts +++ b/packages/durabletask-js/src/worker/orchestration-executor.ts @@ -11,6 +11,8 @@ import { import * as pb from "../proto/orchestrator_service_pb"; import { getName } from "../task"; import { OrchestrationStateError } from "../task/exception/orchestration-state-error"; +import { RetryableTask } from "../task/retryable-task"; +import { RetryTimerTask } from "../task/retry-timer-task"; import { TOrchestrator } from "../types/orchestrator.type"; import { enumValueToKey } from "../utils/enum.util"; import { getOrchestrationStatusStr, isEmpty } from "../utils/pb-helper.util"; @@ -161,19 +163,31 @@ export class OrchestrationExecutor { const timerFiredEvent = event.getTimerfired(); const timerId = timerFiredEvent ? timerFiredEvent.getTimerid() : undefined; - let timerTask; - - if (timerId) { - timerTask = ctx._pendingTasks[timerId]; - delete ctx._pendingTasks[timerId]; + if (timerId === undefined) { + if (!ctx._isReplaying) { + console.warn(`${ctx._instanceId}: Ignoring timerFired event with undefined ID`); + } + return; } + const timerTask = ctx._pendingTasks[timerId]; + delete ctx._pendingTasks[timerId]; + if (!timerTask) { // TODO: Should this be an error? When would it ever happen? if (!ctx._isReplaying) { console.warn(`${ctx._instanceId}: Ignoring unexpected timerFired event with ID = ${timerId}`); } + return; + } + // Check if this is a retry timer + if (timerTask instanceof RetryTimerTask) { + // Get the retryable parent task and reschedule it + const retryableTask = timerTask.retryableParent; + // Reschedule the original action - this will add it back to pendingActions + ctx.rescheduleRetryableTask(retryableTask); + // Don't resume the orchestrator - we're just rescheduling the task return; } @@ -243,28 +257,47 @@ export class OrchestrationExecutor { const taskFailedEvent = event.getTaskfailed(); const taskId = taskFailedEvent ? taskFailedEvent.getTaskscheduledid() : undefined; - let activityTask; - - if (taskId) { - activityTask = ctx._pendingTasks[taskId]; - delete ctx._pendingTasks[taskId]; + if (taskId === undefined) { + if (!ctx._isReplaying) { + console.warn(`${ctx._instanceId}: Ignoring taskFailed event with undefined ID`); + } + return; } + const failureDetails = event.getTaskfailed()?.getFailuredetails(); + const errorMessage = failureDetails?.getErrormessage() || "Unknown error"; + + // Get the task (don't delete yet - we might retry) + const activityTask = ctx._pendingTasks[taskId]; + if (!activityTask) { - // TODO: Should this be an error? When would it ever happen? if (!ctx._isReplaying) { console.warn(`${ctx._instanceId}: Ignoring unexpected taskFailed event with ID = ${taskId}`); } - return; } + // Check if this is a retryable task and if we should retry + if (activityTask instanceof RetryableTask) { + activityTask.recordFailure(errorMessage, failureDetails); + const nextDelayMs = activityTask.computeNextDelayInMilliseconds(ctx._currentUtcDatetime); + + if (nextDelayMs !== undefined) { + // Schedule a retry timer + activityTask.incrementAttemptCount(); + ctx.createRetryTimer(activityTask, nextDelayMs); + // Remove from pendingTasks - the task will be re-added with a new ID when rescheduled + delete ctx._pendingTasks[taskId]; + return; + } + } + + // No retry - fail the task + delete ctx._pendingTasks[taskId]; + activityTask.fail( - `${ctx._instanceId}: Activity task #${taskId} failed: ${event - .getTaskfailed() - ?.getFailuredetails() - ?.getErrormessage()}`, - event.getTaskfailed()?.getFailuredetails(), + `Activity task #${taskId} failed: ${errorMessage}`, + failureDetails, ); await ctx.resume(); @@ -330,30 +363,49 @@ export class OrchestrationExecutor { ? subOrchestrationInstanceFailedEvent.getTaskscheduledid() : undefined; - let subOrchTask; - - if (taskId) { - subOrchTask = ctx._pendingTasks[taskId]; - delete ctx._pendingTasks[taskId]; + if (taskId === undefined) { + if (!ctx._isReplaying) { + console.warn(`${ctx._instanceId}: Ignoring subOrchestrationInstanceFailed event with undefined ID`); + } + return; } + const failureDetails = event.getSuborchestrationinstancefailed()?.getFailuredetails(); + const errorMessage = failureDetails?.getErrormessage() || "Unknown error"; + + // Get the task (don't delete yet - we might retry) + const subOrchTask = ctx._pendingTasks[taskId]; + if (!subOrchTask) { - // TODO: Should this be an error? When would it ever happen? if (!ctx._isReplaying) { console.warn( `${ctx._instanceId}: Ignoring unexpected subOrchestrationInstanceFailed event with ID = ${taskId}`, ); } - return; } + // Check if this is a retryable task and if we should retry + if (subOrchTask instanceof RetryableTask) { + subOrchTask.recordFailure(errorMessage, failureDetails); + const nextDelayMs = subOrchTask.computeNextDelayInMilliseconds(ctx._currentUtcDatetime); + + if (nextDelayMs !== undefined) { + // Schedule a retry timer + subOrchTask.incrementAttemptCount(); + ctx.createRetryTimer(subOrchTask, nextDelayMs); + // Remove from pendingTasks - the task will be re-added with a new ID when rescheduled + delete ctx._pendingTasks[taskId]; + return; + } + } + + // No retry - fail the task + delete ctx._pendingTasks[taskId]; + subOrchTask.fail( - `${ctx._instanceId}: Sub-orchestration task #${taskId} failed: ${event - .getSuborchestrationinstancefailed() - ?.getFailuredetails() - ?.getErrormessage()}`, - event.getSuborchestrationinstancefailed()?.getFailuredetails(), + `Sub-orchestration task #${taskId} failed: ${errorMessage}`, + failureDetails, ); await ctx.resume(); diff --git a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts index ae42ad6..4b392f7 100644 --- a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts +++ b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts @@ -6,6 +6,9 @@ import { OrchestrationContext } from "../task/context/orchestration-context"; import * as pb from "../proto/orchestrator_service_pb"; import * as ph from "../utils/pb-helper.util"; import { CompletableTask } from "../task/completable-task"; +import { RetryableTask } from "../task/retryable-task"; +import { RetryTimerTask } from "../task/retry-timer-task"; +import { TaskOptions, SubOrchestrationOptions } from "../task/options"; import { TActivity } from "../types/activity.type"; import { TOrchestrator } from "../types/orchestrator.type"; import { Task } from "../task/task"; @@ -247,6 +250,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { callActivity( activity: TActivity | string, input?: TInput | undefined, + options?: TaskOptions, ): Task { const id = this.nextSequenceNumber(); const name = typeof activity === "string" ? activity : getName(activity); @@ -254,6 +258,18 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { const action = ph.newScheduleTaskAction(id, name, encodedInput); this._pendingActions[action.getId()] = action; + // If a retry policy is provided, create a RetryableTask + if (options?.retry) { + const retryableTask = new RetryableTask( + options.retry, + action, + this._currentUtcDatetime, + "activity", + ); + this._pendingTasks[id] = retryableTask; + return retryableTask; + } + const task = new CompletableTask(); this._pendingTasks[id] = task; return task; @@ -262,7 +278,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { callSubOrchestrator( orchestrator: TOrchestrator | string, input?: TInput | undefined, - instanceId?: string | undefined, + options?: SubOrchestrationOptions, ): Task { let name; if (typeof orchestrator === "string") { @@ -272,8 +288,11 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { } const id = this.nextSequenceNumber(); + // Get instance ID from options or generate a deterministic one + let instanceId = options?.instanceId; + // Create a deterministic instance ID based on the parent instance ID - // use the instanceId and apprent the id to it in hexadecimal with 4 digits (e.g. 0001) + // use the instanceId and append the id to it in hexadecimal with 4 digits (e.g. 0001) if (!instanceId) { const instanceIdSuffix = id.toString(16).padStart(4, "0"); instanceId = `${this._instanceId}:${instanceIdSuffix}`; @@ -283,6 +302,18 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { const action = ph.newCreateSubOrchestrationAction(id, name, instanceId, encodedInput); this._pendingActions[action.getId()] = action; + // If a retry policy is provided, create a RetryableTask + if (options?.retry) { + const retryableTask = new RetryableTask( + options.retry, + action, + this._currentUtcDatetime, + "subOrchestration", + ); + this._pendingTasks[id] = retryableTask; + return retryableTask; + } + const task = new CompletableTask(); this._pendingTasks[id] = task; return task; @@ -330,4 +361,66 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { this.setContinuedAsNew(newInput, saveEvents); } + + /** + * Creates a retry timer for a retryable task. + * The timer will be associated with the retryable task so that when it fires, + * the original task can be rescheduled. + * + * @param retryableTask - The retryable task to create a timer for + * @param delayMs - The delay in milliseconds before the timer fires + * @returns The timer task + */ + createRetryTimer(retryableTask: RetryableTask, delayMs: number): RetryTimerTask { + const timerId = this.nextSequenceNumber(); + const fireAt = new Date(this._currentUtcDatetime.getTime() + delayMs); + const timerAction = ph.newCreateTimerAction(timerId, fireAt); + this._pendingActions[timerAction.getId()] = timerAction; + + // Create a RetryTimerTask that holds a reference to the retryable task + const retryTimerTask = new RetryTimerTask(retryableTask); + this._pendingTasks[timerId] = retryTimerTask; + + return retryTimerTask; + } + + /** + * Reschedules a retryable task for retry by creating a new action with a new ID. + * This is called when a retry timer fires and the task needs to be retried. + * + * @param retryableTask - The retryable task to reschedule + */ + rescheduleRetryableTask(retryableTask: RetryableTask): void { + const originalAction = retryableTask.action; + const newId = this.nextSequenceNumber(); + + let newAction: pb.OrchestratorAction; + + if (retryableTask.taskType === "activity") { + // Reschedule an activity task + const scheduleTask = originalAction.getScheduletask(); + if (!scheduleTask) { + throw new Error("Expected ScheduleTaskAction on activity retryable task"); + } + const name = scheduleTask.getName(); + const input = scheduleTask.getInput()?.getValue(); + newAction = ph.newScheduleTaskAction(newId, name, input); + } else { + // Reschedule a sub-orchestration task + const subOrch = originalAction.getCreatesuborchestration(); + if (!subOrch) { + throw new Error("Expected CreateSubOrchestrationAction on sub-orchestration retryable task"); + } + const name = subOrch.getName(); + const instanceId = subOrch.getInstanceid(); + const input = subOrch.getInput()?.getValue(); + newAction = ph.newCreateSubOrchestrationAction(newId, name, instanceId, input); + } + + // Register the new action + this._pendingActions[newAction.getId()] = newAction; + + // Map the retryable task to the new action ID + this._pendingTasks[newId] = retryableTask; + } } diff --git a/packages/durabletask-js/test/orchestration_executor.spec.ts b/packages/durabletask-js/test/orchestration_executor.spec.ts index 1812a65..acb73e2 100644 --- a/packages/durabletask-js/test/orchestration_executor.spec.ts +++ b/packages/durabletask-js/test/orchestration_executor.spec.ts @@ -796,6 +796,182 @@ describe("Orchestration Executor", () => { expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(encodedOutput); }); + + // ==================== Retry Policy Tests ==================== + describe("Retry Policy", () => { + it("should schedule retry timer when activity fails with retry policy", async () => { + // Arrange + const { RetryPolicy } = await import("../src/task/retry/retry-policy"); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + backoffCoefficient: 1.0, + }); + const result = yield ctx.callActivity("flakyActivity", input, { retry: retryPolicy }); + return result; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + + // Act - Step 1: Start orchestration + let executor = new OrchestrationExecutor(registry); + let newEvents = [ + newOrchestratorStartedEvent(), + newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(21)), + ]; + let actions = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + + // Assert - Step 1: Should schedule activity + expect(actions.length).toBe(1); + expect(actions[0].hasScheduletask()).toBe(true); + expect(actions[0].getScheduletask()?.getName()).toBe("flakyActivity"); + + // Act - Step 2: Activity scheduled, then fails + const oldEvents = [ + ...newEvents, + newTaskScheduledEvent(1, "flakyActivity"), + ]; + executor = new OrchestrationExecutor(registry); + newEvents = [ + newTaskFailedEvent(1, new Error("Transient failure on attempt 1")), + ]; + actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + + // Assert - Step 2: Should schedule a retry timer + expect(actions.length).toBe(1); + expect(actions[0].hasCreatetimer()).toBe(true); + }); + + it("should complete successfully after retry timer fires and activity succeeds", async () => { + // Arrange + const { RetryPolicy } = await import("../src/task/retry/retry-policy"); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + backoffCoefficient: 1.0, + }); + const result = yield ctx.callActivity("flakyActivity", input, { retry: retryPolicy }); + return result; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const startTime = new Date(); + + // Step 1: Start orchestration + let executor = new OrchestrationExecutor(registry); + let allEvents = [ + newOrchestratorStartedEvent(startTime), + newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(21)), + ]; + let actions = await executor.execute(TEST_INSTANCE_ID, [], allEvents); + expect(actions.length).toBe(1); + expect(actions[0].hasScheduletask()).toBe(true); + + // Step 2: Activity scheduled, then fails + allEvents.push(newTaskScheduledEvent(1, "flakyActivity")); + executor = new OrchestrationExecutor(registry); + actions = await executor.execute(TEST_INSTANCE_ID, allEvents, [ + newTaskFailedEvent(1, new Error("Transient failure on attempt 1")), + ]); + expect(actions.length).toBe(1); + expect(actions[0].hasCreatetimer()).toBe(true); + const timerFireAt = actions[0].getCreatetimer()?.getFireat()?.toDate(); + expect(timerFireAt).toBeDefined(); + + // Step 3: Timer created, then fires + allEvents.push(newTaskFailedEvent(1, new Error("Transient failure on attempt 1"))); + allEvents.push(newTimerCreatedEvent(2, timerFireAt!)); + executor = new OrchestrationExecutor(registry); + actions = await executor.execute(TEST_INSTANCE_ID, allEvents, [ + newTimerFiredEvent(2, timerFireAt!), + ]); + // Should reschedule the activity with a new ID + expect(actions.length).toBe(1); + expect(actions[0].hasScheduletask()).toBe(true); + expect(actions[0].getScheduletask()?.getName()).toBe("flakyActivity"); + expect(actions[0].getId()).toBe(3); // New ID after timer + + // Step 4: Retried activity scheduled, then completes + allEvents.push(newTimerFiredEvent(2, timerFireAt!)); + allEvents.push(newTaskScheduledEvent(3, "flakyActivity")); + executor = new OrchestrationExecutor(registry); + actions = await executor.execute(TEST_INSTANCE_ID, allEvents, [ + newTaskCompletedEvent(3, JSON.stringify(42)), + ]); + + // Assert: Orchestration should complete successfully + const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(completeAction?.getResult()?.getValue()).toEqual(JSON.stringify(42)); + }); + + it("should fail after exhausting all retry attempts", async () => { + // Arrange + const { RetryPolicy } = await import("../src/task/retry/retry-policy"); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 2, + firstRetryIntervalInMilliseconds: 100, + backoffCoefficient: 1.0, + }); + const result = yield ctx.callActivity("alwaysFailsActivity", input, { retry: retryPolicy }); + return result; + }; + + const registry = new Registry(); + const name = registry.addOrchestrator(orchestrator); + const startTime = new Date(); + + // Step 1: Start orchestration + let executor = new OrchestrationExecutor(registry); + let allEvents = [ + newOrchestratorStartedEvent(startTime), + newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(21)), + ]; + let actions = await executor.execute(TEST_INSTANCE_ID, [], allEvents); + expect(actions.length).toBe(1); + expect(actions[0].hasScheduletask()).toBe(true); + + // Step 2: Activity fails - first attempt + allEvents.push(newTaskScheduledEvent(1, "alwaysFailsActivity")); + executor = new OrchestrationExecutor(registry); + actions = await executor.execute(TEST_INSTANCE_ID, allEvents, [ + newTaskFailedEvent(1, new Error("Failure on attempt 1")), + ]); + expect(actions.length).toBe(1); + expect(actions[0].hasCreatetimer()).toBe(true); + const timerFireAt = actions[0].getCreatetimer()?.getFireat()?.toDate(); + + // Step 3: Timer fires, activity is rescheduled + allEvents.push(newTaskFailedEvent(1, new Error("Failure on attempt 1"))); + allEvents.push(newTimerCreatedEvent(2, timerFireAt!)); + executor = new OrchestrationExecutor(registry); + actions = await executor.execute(TEST_INSTANCE_ID, allEvents, [ + newTimerFiredEvent(2, timerFireAt!), + ]); + expect(actions.length).toBe(1); + expect(actions[0].hasScheduletask()).toBe(true); + + // Step 4: Second activity attempt fails - max attempts reached + allEvents.push(newTimerFiredEvent(2, timerFireAt!)); + allEvents.push(newTaskScheduledEvent(3, "alwaysFailsActivity")); + executor = new OrchestrationExecutor(registry); + actions = await executor.execute(TEST_INSTANCE_ID, allEvents, [ + newTaskFailedEvent(3, new Error("Failure on attempt 2")), + ]); + + // Assert: Orchestration should fail + const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + }); + }); }); function getAndValidateSingleCompleteOrchestrationAction( diff --git a/packages/durabletask-js/test/retry-policy.spec.ts b/packages/durabletask-js/test/retry-policy.spec.ts new file mode 100644 index 0000000..2be2681 --- /dev/null +++ b/packages/durabletask-js/test/retry-policy.spec.ts @@ -0,0 +1,151 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import { RetryPolicy } from "../src/task/retry/retry-policy"; + +describe("RetryPolicy", () => { + describe("constructor validation", () => { + it("should create a valid retry policy with minimum options", () => { + // Arrange & Act + const policy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + }); + + // Assert + expect(policy.maxNumberOfAttempts).toBe(3); + expect(policy.firstRetryIntervalInMilliseconds).toBe(1000); + expect(policy.backoffCoefficient).toBe(1.0); + expect(policy.maxRetryIntervalInMilliseconds).toBe(3600000); // 1 hour default + expect(policy.retryTimeoutInMilliseconds).toBe(-1); // infinite default + }); + + it("should create a valid retry policy with all options", () => { + // Arrange & Act + const policy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 500, + backoffCoefficient: 2.0, + maxRetryIntervalInMilliseconds: 30000, + retryTimeoutInMilliseconds: 120000, + }); + + // Assert + expect(policy.maxNumberOfAttempts).toBe(5); + expect(policy.firstRetryIntervalInMilliseconds).toBe(500); + expect(policy.backoffCoefficient).toBe(2.0); + expect(policy.maxRetryIntervalInMilliseconds).toBe(30000); + expect(policy.retryTimeoutInMilliseconds).toBe(120000); + }); + + it("should throw error when maxNumberOfAttempts is less than 1", () => { + // Arrange & Act & Assert + expect(() => { + new RetryPolicy({ + maxNumberOfAttempts: 0, + firstRetryIntervalInMilliseconds: 1000, + }); + }).toThrow("maxNumberOfAttempts must be greater than zero"); + }); + + it("should throw error when firstRetryIntervalInMilliseconds is zero", () => { + // Arrange & Act & Assert + expect(() => { + new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 0, + }); + }).toThrow("firstRetryIntervalInMilliseconds must be greater than zero"); + }); + + it("should throw error when firstRetryIntervalInMilliseconds is negative", () => { + // Arrange & Act & Assert + expect(() => { + new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: -100, + }); + }).toThrow("firstRetryIntervalInMilliseconds must be greater than zero"); + }); + + it("should throw error when backoffCoefficient is less than 1.0", () => { + // Arrange & Act & Assert + expect(() => { + new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + backoffCoefficient: 0.5, + }); + }).toThrow("backoffCoefficient must be greater than or equal to 1.0"); + }); + + it("should throw error when maxRetryIntervalInMilliseconds is less than firstRetryInterval", () => { + // Arrange & Act & Assert + expect(() => { + new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + maxRetryIntervalInMilliseconds: 500, + }); + }).toThrow("maxRetryIntervalInMilliseconds must be greater than or equal to firstRetryIntervalInMilliseconds"); + }); + + it("should throw error when retryTimeoutInMilliseconds is less than firstRetryInterval", () => { + // Arrange & Act & Assert + expect(() => { + new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + retryTimeoutInMilliseconds: 500, + }); + }).toThrow("retryTimeoutInMilliseconds must be greater than or equal to firstRetryIntervalInMilliseconds"); + }); + + it("should accept exactly 1.0 as backoffCoefficient", () => { + // Arrange & Act + const policy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + backoffCoefficient: 1.0, + }); + + // Assert + expect(policy.backoffCoefficient).toBe(1.0); + }); + + it("should accept exactly 1 as maxNumberOfAttempts", () => { + // Arrange & Act + const policy = new RetryPolicy({ + maxNumberOfAttempts: 1, + firstRetryIntervalInMilliseconds: 1000, + }); + + // Assert + expect(policy.maxNumberOfAttempts).toBe(1); + }); + + it("should accept -1 as maxRetryIntervalInMilliseconds (infinite)", () => { + // Arrange & Act + const policy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + maxRetryIntervalInMilliseconds: -1, + }); + + // Assert - infinite is valid + expect(policy.maxRetryIntervalInMilliseconds).toBe(-1); + }); + + it("should accept -1 as retryTimeoutInMilliseconds (infinite)", () => { + // Arrange & Act + const policy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + retryTimeoutInMilliseconds: -1, + }); + + // Assert - infinite is valid + expect(policy.retryTimeoutInMilliseconds).toBe(-1); + }); + }); +}); diff --git a/packages/durabletask-js/test/retryable-task.spec.ts b/packages/durabletask-js/test/retryable-task.spec.ts new file mode 100644 index 0000000..19cf327 --- /dev/null +++ b/packages/durabletask-js/test/retryable-task.spec.ts @@ -0,0 +1,289 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +import { RetryableTask } from "../src/task/retryable-task"; +import { RetryPolicy } from "../src/task/retry/retry-policy"; +import * as pb from "../src/proto/orchestrator_service_pb"; + +describe("RetryableTask", () => { + describe("constructor", () => { + it("should create a retryable task with correct initial state", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + }); + const action = new pb.OrchestratorAction(); + const startTime = new Date(); + + // Act + const task = new RetryableTask(retryPolicy, action, startTime, "activity"); + + // Assert + expect(task.attemptCount).toBe(1); + expect(task.taskType).toBe("activity"); + expect(task.action).toBe(action); + expect(task.startTime).toBe(startTime); + }); + }); + + describe("computeNextDelayInMilliseconds", () => { + it("should return first retry interval on first retry (attempt 1)", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + backoffCoefficient: 2.0, + }); + const action = new pb.OrchestratorAction(); + const startTime = new Date(); + const task = new RetryableTask(retryPolicy, action, startTime, "activity"); + const currentTime = new Date(startTime.getTime() + 100); // 100ms after start + + // Act + const delay = task.computeNextDelayInMilliseconds(currentTime); + + // Assert (delay = 1000 * 2^(1-1) = 1000 * 1 = 1000) + expect(delay).toBe(1000); + }); + + it("should apply exponential backoff on subsequent retries", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 1000, + backoffCoefficient: 2.0, + }); + const action = new pb.OrchestratorAction(); + const startTime = new Date(); + const task = new RetryableTask(retryPolicy, action, startTime, "activity"); + const currentTime = new Date(startTime.getTime() + 1100); // after first retry + + // First attempt already done + task.incrementAttemptCount(); // Now at attempt 2 + + // Act + const delay = task.computeNextDelayInMilliseconds(currentTime); + + // Assert (delay = 1000 * 2^(2-1) = 1000 * 2 = 2000) + expect(delay).toBe(2000); + }); + + it("should cap delay at maxRetryInterval", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 10, + firstRetryIntervalInMilliseconds: 1000, + backoffCoefficient: 10.0, + maxRetryIntervalInMilliseconds: 5000, + }); + const action = new pb.OrchestratorAction(); + const startTime = new Date(); + const task = new RetryableTask(retryPolicy, action, startTime, "activity"); + const currentTime = new Date(startTime.getTime() + 10000); + + // Simulate several attempts to trigger high delay + task.incrementAttemptCount(); // attempt 2 + task.incrementAttemptCount(); // attempt 3 + + // Act + const delay = task.computeNextDelayInMilliseconds(currentTime); + + // Assert (would be 1000 * 10^2 = 100000, but capped at 5000) + expect(delay).toBe(5000); + }); + + it("should use constant delay when backoffCoefficient is 1.0", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 2000, + backoffCoefficient: 1.0, + }); + const action = new pb.OrchestratorAction(); + const startTime = new Date(); + const task = new RetryableTask(retryPolicy, action, startTime, "activity"); + const currentTime = new Date(startTime.getTime() + 100); + + // Act & Assert - delay should always be 2000 + expect(task.computeNextDelayInMilliseconds(currentTime)).toBe(2000); + + task.incrementAttemptCount(); + expect(task.computeNextDelayInMilliseconds(currentTime)).toBe(2000); + + task.incrementAttemptCount(); + expect(task.computeNextDelayInMilliseconds(currentTime)).toBe(2000); + }); + + it("should return undefined when max attempts exceeded", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + }); + const action = new pb.OrchestratorAction(); + const startTime = new Date(); + const task = new RetryableTask(retryPolicy, action, startTime, "activity"); + const currentTime = new Date(startTime.getTime() + 100); + + // Simulate all attempts + task.incrementAttemptCount(); // attempt 2 + task.incrementAttemptCount(); // attempt 3 (max reached) + + // Act + const delay = task.computeNextDelayInMilliseconds(currentTime); + + // Assert + expect(delay).toBeUndefined(); + }); + + it("should return undefined when retry timeout exceeded", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 10, + firstRetryIntervalInMilliseconds: 1000, + retryTimeoutInMilliseconds: 5000, + }); + const startTime = new Date(Date.now() - 6000); // Started 6 seconds ago + const action = new pb.OrchestratorAction(); + const task = new RetryableTask(retryPolicy, action, startTime, "activity"); + const currentTime = new Date(); // Now + + // Act + const delay = task.computeNextDelayInMilliseconds(currentTime); + + // Assert + expect(delay).toBeUndefined(); + }); + + it("should allow retry when within timeout", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 10, + firstRetryIntervalInMilliseconds: 1000, + retryTimeoutInMilliseconds: 60000, + }); + const startTime = new Date(); + const action = new pb.OrchestratorAction(); + const task = new RetryableTask(retryPolicy, action, startTime, "activity"); + const currentTime = new Date(startTime.getTime() + 5000); // 5 seconds after start + + // Act + const delay = task.computeNextDelayInMilliseconds(currentTime); + + // Assert + expect(delay).toBeGreaterThan(0); + }); + + it("should ignore timeout when set to -1 (infinite)", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 10, + firstRetryIntervalInMilliseconds: 1000, + retryTimeoutInMilliseconds: -1, + }); + // Started a long time ago + const startTime = new Date(Date.now() - 3600000); // 1 hour ago + const action = new pb.OrchestratorAction(); + const task = new RetryableTask(retryPolicy, action, startTime, "activity"); + const currentTime = new Date(); // Now + + // Act + const delay = task.computeNextDelayInMilliseconds(currentTime); + + // Assert - should still allow retry (timeout is infinite) + expect(delay).toBeGreaterThan(0); + }); + }); + + describe("incrementAttemptCount", () => { + it("should increment attempt count", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 1000, + }); + const action = new pb.OrchestratorAction(); + const task = new RetryableTask(retryPolicy, action, new Date(), "activity"); + + // Act + task.incrementAttemptCount(); + task.incrementAttemptCount(); + + // Assert + expect(task.attemptCount).toBe(3); + }); + }); + + describe("recordFailure", () => { + it("should store failure details", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + }); + const action = new pb.OrchestratorAction(); + const task = new RetryableTask(retryPolicy, action, new Date(), "activity"); + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrortype("TestError"); + failureDetails.setErrormessage("Test failure message"); + + // Act + task.recordFailure("Test failure message", failureDetails); + + // Assert + const lastFailure = task.lastFailure; + expect(lastFailure).toBeDefined(); + expect(lastFailure?.getErrortype()).toBe("TestError"); + expect(lastFailure?.getErrormessage()).toBe("Test failure message"); + }); + }); + + describe("action property", () => { + it("should return the original action", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + }); + const action = new pb.OrchestratorAction(); + action.setId(42); + const task = new RetryableTask(retryPolicy, action, new Date(), "activity"); + + // Act + const originalAction = task.action; + + // Assert + expect(originalAction).toBe(action); + expect(originalAction.getId()).toBe(42); + }); + }); + + describe("task types", () => { + it("should support activity task type", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + }); + const action = new pb.OrchestratorAction(); + const task = new RetryableTask(retryPolicy, action, new Date(), "activity"); + + // Assert + expect(task.taskType).toBe("activity"); + }); + + it("should support subOrchestration task type", () => { + // Arrange + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 1000, + }); + const action = new pb.OrchestratorAction(); + const task = new RetryableTask(retryPolicy, action, new Date(), "subOrchestration"); + + // Assert + expect(task.taskType).toBe("subOrchestration"); + }); + }); +}); diff --git a/test/e2e-azuremanaged/orchestration.spec.ts b/test/e2e-azuremanaged/orchestration.spec.ts index bebde0c..efb1733 100644 --- a/test/e2e-azuremanaged/orchestration.spec.ts +++ b/test/e2e-azuremanaged/orchestration.spec.ts @@ -22,6 +22,7 @@ import { OrchestrationContext, Task, TOrchestrator, + RetryPolicy, } from "@microsoft/durabletask-js"; import { DurableTaskAzureManagedClientBuilder, @@ -281,4 +282,213 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => { expect(state?.serializedInput).toEqual(JSON.stringify(15)); expect(state?.serializedOutput).toEqual(JSON.stringify(16)); }, 31000); + + // ==================== Retry Policy Tests ==================== + + it("should retry activity and succeed after transient failures", async () => { + let attemptCount = 0; + + const flakyActivity = async (_: ActivityContext, input: number) => { + attemptCount++; + if (attemptCount < 3) { + throw new Error(`Transient failure on attempt ${attemptCount}`); + } + return input * 2; + }; + + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 100, + backoffCoefficient: 1.0, + }); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { + const result = yield ctx.callActivity(flakyActivity, input, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addActivity(flakyActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 21); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.serializedOutput).toEqual(JSON.stringify(42)); + expect(attemptCount).toBe(3); // Failed twice, succeeded on third attempt + }, 31000); + + it("should fail activity after exhausting all retry attempts", async () => { + let attemptCount = 0; + + const alwaysFailsActivity = async (_: ActivityContext) => { + attemptCount++; + throw new Error(`Permanent failure on attempt ${attemptCount}`); + }; + + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 100, + backoffCoefficient: 1.0, + }); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callActivity(alwaysFailsActivity, undefined, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addActivity(alwaysFailsActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + expect(attemptCount).toBe(3); // All 3 attempts exhausted + }, 31000); + + it("should use exponential backoff for retries", async () => { + const attemptTimes: number[] = []; + + const flakyActivity = async (_: ActivityContext, input: number) => { + attemptTimes.push(Date.now()); + if (attemptTimes.length < 3) { + throw new Error(`Transient failure on attempt ${attemptTimes.length}`); + } + return input; + }; + + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 5, + firstRetryIntervalInMilliseconds: 500, + backoffCoefficient: 2.0, // Exponential backoff + }); + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { + const result = yield ctx.callActivity(flakyActivity, input, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addActivity(flakyActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 100); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(attemptTimes.length).toBe(3); + + // Verify exponential backoff: second delay should be roughly 2x the first + // Allow some tolerance for timing variations + if (attemptTimes.length >= 3) { + const firstDelay = attemptTimes[1] - attemptTimes[0]; + const secondDelay = attemptTimes[2] - attemptTimes[1]; + // Second delay should be approximately 2x the first (with 50% tolerance for timing) + expect(secondDelay).toBeGreaterThan(firstDelay * 1.5); + } + }, 31000); + + it("should retry sub-orchestration and succeed after transient failures", async () => { + let attemptCount = 0; + + const flakySubOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { + attemptCount++; + if (attemptCount < 2) { + throw new Error(`Sub-orchestration transient failure on attempt ${attemptCount}`); + } + return input * 3; + }; + + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 3, + firstRetryIntervalInMilliseconds: 100, + backoffCoefficient: 1.0, + }); + + const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { + const result = yield ctx.callSubOrchestrator(flakySubOrchestrator, input, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addOrchestrator(flakySubOrchestrator); + taskHubWorker.addOrchestrator(parentOrchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(parentOrchestrator, 7); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.serializedOutput).toEqual(JSON.stringify(21)); + expect(attemptCount).toBe(2); // Failed once, succeeded on second attempt + }, 31000); + + it("should fail sub-orchestration after exhausting all retry attempts", async () => { + let attemptCount = 0; + + const alwaysFailsSubOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + attemptCount++; + throw new Error(`Sub-orchestration permanent failure on attempt ${attemptCount}`); + }; + + const retryPolicy = new RetryPolicy({ + maxNumberOfAttempts: 2, + firstRetryIntervalInMilliseconds: 100, + backoffCoefficient: 1.0, + }); + + const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const result = yield ctx.callSubOrchestrator(alwaysFailsSubOrchestrator, undefined, { retry: retryPolicy }); + return result; + }; + + taskHubWorker.addOrchestrator(alwaysFailsSubOrchestrator); + taskHubWorker.addOrchestrator(parentOrchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(parentOrchestrator); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + expect(state?.failureDetails).toBeDefined(); + expect(attemptCount).toBe(2); // All 2 attempts exhausted + }, 31000); + + it("should work without retry policy (backward compatibility)", async () => { + let invoked = false; + + const simpleActivity = async (_: ActivityContext, input: string) => { + invoked = true; + return `Hello, ${input}!`; + }; + + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: string): any { + // No retry policy - should work as before + const result = yield ctx.callActivity(simpleActivity, input); + return result; + }; + + taskHubWorker.addActivity(simpleActivity); + taskHubWorker.addOrchestrator(orchestrator); + await taskHubWorker.start(); + + const id = await taskHubClient.scheduleNewOrchestration(orchestrator, "World"); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + expect(state).toBeDefined(); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + expect(state?.failureDetails).toBeUndefined(); + expect(state?.serializedOutput).toEqual(JSON.stringify("Hello, World!")); + expect(invoked).toBe(true); + }, 31000); }); From 494b172c420292ff2f83ee76e8e12165d42c40cf Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 30 Jan 2026 11:50:03 -0800 Subject: [PATCH 2/5] linting --- packages/durabletask-js/test/orchestration_executor.spec.ts | 4 ++-- packages/durabletask-js/test/retry-policy.spec.ts | 2 +- packages/durabletask-js/test/retryable-task.spec.ts | 2 +- test/e2e-azuremanaged/orchestration.spec.ts | 6 ++++-- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/packages/durabletask-js/test/orchestration_executor.spec.ts b/packages/durabletask-js/test/orchestration_executor.spec.ts index acb73e2..1e840f0 100644 --- a/packages/durabletask-js/test/orchestration_executor.spec.ts +++ b/packages/durabletask-js/test/orchestration_executor.spec.ts @@ -865,7 +865,7 @@ describe("Orchestration Executor", () => { // Step 1: Start orchestration let executor = new OrchestrationExecutor(registry); - let allEvents = [ + const allEvents = [ newOrchestratorStartedEvent(startTime), newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(21)), ]; @@ -931,7 +931,7 @@ describe("Orchestration Executor", () => { // Step 1: Start orchestration let executor = new OrchestrationExecutor(registry); - let allEvents = [ + const allEvents = [ newOrchestratorStartedEvent(startTime), newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(21)), ]; diff --git a/packages/durabletask-js/test/retry-policy.spec.ts b/packages/durabletask-js/test/retry-policy.spec.ts index 2be2681..d7a87e0 100644 --- a/packages/durabletask-js/test/retry-policy.spec.ts +++ b/packages/durabletask-js/test/retry-policy.spec.ts @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft Corporation. +// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. import { RetryPolicy } from "../src/task/retry/retry-policy"; diff --git a/packages/durabletask-js/test/retryable-task.spec.ts b/packages/durabletask-js/test/retryable-task.spec.ts index 19cf327..0aeb801 100644 --- a/packages/durabletask-js/test/retryable-task.spec.ts +++ b/packages/durabletask-js/test/retryable-task.spec.ts @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft Corporation. +// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. import { RetryableTask } from "../src/task/retryable-task"; diff --git a/test/e2e-azuremanaged/orchestration.spec.ts b/test/e2e-azuremanaged/orchestration.spec.ts index efb1733..e09cfed 100644 --- a/test/e2e-azuremanaged/orchestration.spec.ts +++ b/test/e2e-azuremanaged/orchestration.spec.ts @@ -399,9 +399,10 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => { it("should retry sub-orchestration and succeed after transient failures", async () => { let attemptCount = 0; - const flakySubOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { + const flakySubOrchestrator: TOrchestrator = async function* (_ctx: OrchestrationContext, input: number): any { attemptCount++; if (attemptCount < 2) { + yield; // Required for generator throw new Error(`Sub-orchestration transient failure on attempt ${attemptCount}`); } return input * 3; @@ -435,8 +436,9 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => { it("should fail sub-orchestration after exhausting all retry attempts", async () => { let attemptCount = 0; - const alwaysFailsSubOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const alwaysFailsSubOrchestrator: TOrchestrator = async function* (_ctx: OrchestrationContext): any { attemptCount++; + yield; // Required for generator throw new Error(`Sub-orchestration permanent failure on attempt ${attemptCount}`); }; From b9f14eaaf04f7b8cacfb14720e603cb6a70fef70 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 30 Jan 2026 12:18:44 -0800 Subject: [PATCH 3/5] feedback --- packages/durabletask-js/src/task/retryable-task.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/durabletask-js/src/task/retryable-task.ts b/packages/durabletask-js/src/task/retryable-task.ts index 19b7c37..1c073a6 100644 --- a/packages/durabletask-js/src/task/retryable-task.ts +++ b/packages/durabletask-js/src/task/retryable-task.ts @@ -140,11 +140,7 @@ export class RetryableTask extends CompletableTask { details = details ?? new pb.TaskFailureDetails(); - // Create error message that includes task type and task ID - const taskTypeLabel = this._taskType === "activity" ? "Activity" : "Sub-orchestration"; - const fullMessage = `${taskTypeLabel} task #${this._action.getId()} failed: ${message}`; - - this._exception = new TaskFailedError(fullMessage, details); + this._exception = new TaskFailedError(message, details); this._isComplete = true; if (this._parent) { From 7c847c2a7ca3d2bc336f2beac301f6b13ee5dbab Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 30 Jan 2026 12:46:57 -0800 Subject: [PATCH 4/5] no need to yield --- test/e2e-azuremanaged/orchestration.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e-azuremanaged/orchestration.spec.ts b/test/e2e-azuremanaged/orchestration.spec.ts index e09cfed..a831ea6 100644 --- a/test/e2e-azuremanaged/orchestration.spec.ts +++ b/test/e2e-azuremanaged/orchestration.spec.ts @@ -399,10 +399,10 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => { it("should retry sub-orchestration and succeed after transient failures", async () => { let attemptCount = 0; + // eslint-disable-next-line require-yield const flakySubOrchestrator: TOrchestrator = async function* (_ctx: OrchestrationContext, input: number): any { attemptCount++; if (attemptCount < 2) { - yield; // Required for generator throw new Error(`Sub-orchestration transient failure on attempt ${attemptCount}`); } return input * 3; @@ -436,9 +436,9 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => { it("should fail sub-orchestration after exhausting all retry attempts", async () => { let attemptCount = 0; + // eslint-disable-next-line require-yield const alwaysFailsSubOrchestrator: TOrchestrator = async function* (_ctx: OrchestrationContext): any { attemptCount++; - yield; // Required for generator throw new Error(`Sub-orchestration permanent failure on attempt ${attemptCount}`); }; From 28be62840e5bb9330aeb0611f4ce62e1739d3eae Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Fri, 30 Jan 2026 13:01:20 -0800 Subject: [PATCH 5/5] fix flaky timer test --- tests/e2e/orchestration.spec.ts | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/e2e/orchestration.spec.ts b/tests/e2e/orchestration.spec.ts index b0ceb0c..56d9bcc 100644 --- a/tests/e2e/orchestration.spec.ts +++ b/tests/e2e/orchestration.spec.ts @@ -226,13 +226,13 @@ describe("Durable Functions", () => { const id = await taskHubClient.scheduleNewOrchestration(singleTimer); const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); - let expectedCompletionSecond = state?.createdAt?.getTime() ?? 0; - if (state && state.createdAt !== undefined) { - expectedCompletionSecond += delay * 1000; - } - expect(expectedCompletionSecond).toBeDefined(); - const actualCompletionSecond = state?.lastUpdatedAt?.getTime() ?? 0; - expect(actualCompletionSecond).toBeDefined(); + const createdAtMs = state?.createdAt?.getTime() ?? 0; + const lastUpdatedAtMs = state?.lastUpdatedAt?.getTime() ?? 0; + const actualDurationMs = lastUpdatedAtMs - createdAtMs; + const expectedMinDurationMs = delay * 1000; + // Allow 1 second tolerance for timing variations in test infrastructure + // (createdAt may not align exactly with when timer was scheduled) + const toleranceMs = 1000; expect(state); expect(state?.name).toEqual(getName(singleTimer)); @@ -241,7 +241,8 @@ describe("Durable Functions", () => { expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(state?.createdAt).toBeDefined(); expect(state?.lastUpdatedAt).toBeDefined(); - expect(expectedCompletionSecond).toBeLessThanOrEqual(actualCompletionSecond); + // Timer should fire after approximately the expected delay (with tolerance for timing variations) + expect(actualDurationMs).toBeGreaterThanOrEqual(expectedMinDurationMs - toleranceMs); }, 31000); it("should wait for external events with a timeout - true", async () => {