Skip to content

Commit 157f357

Browse files
committed
feat: Implement AsyncRetryHandler and refactor retry task handling
- Introduced RetryHandlerTask to support imperative retry control using AsyncRetryHandler. - Refactored RuntimeOrchestrationContext to utilize createRetryTaskOrDefault for task creation. - Enhanced retry logic to handle both RetryableTask and RetryHandlerTask. - Added comprehensive tests for RetryHandlerTask, covering various scenarios including success, failure, and retry logic. - Updated orchestration executor tests to validate new retry handler functionality.
1 parent b05cd0f commit 157f357

8 files changed

Lines changed: 887 additions & 330 deletions

File tree

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
import * as pb from "../proto/orchestrator_service_pb";
5+
import { RetryTaskBase } from "./retry-task-base";
6+
import { AsyncRetryHandler } from "./retry/retry-handler";
7+
import { createRetryContext } from "./retry/retry-context";
8+
import { TaskFailureDetails } from "./failure-details";
9+
10+
/**
11+
* A task that uses an AsyncRetryHandler for imperative retry control.
12+
*
13+
* @remarks
14+
* Unlike RetryableTask which uses a declarative RetryPolicy, this task delegates
15+
* all retry decisions to a user-provided handler function. The handler receives
16+
* a RetryContext with failure details, attempt count, and elapsed time, and
17+
* returns true to retry or false to stop.
18+
*
19+
* This mirrors the .NET SDK's InvokeWithCustomRetryHandler pattern, where the
20+
* retry handler runs as orchestrator code (subject to replay).
21+
*/
22+
export class RetryHandlerTask<T> extends RetryTaskBase<T> {
23+
private readonly _handler: AsyncRetryHandler;
24+
25+
/**
26+
* Creates a new RetryHandlerTask instance.
27+
*
28+
* @param handler - The async retry handler for imperative retry decisions
29+
* @param action - The orchestrator action associated with this task
30+
* @param startTime - The time when the task was first scheduled
31+
* @param taskType - The type of task (activity or sub-orchestration)
32+
*/
33+
constructor(
34+
handler: AsyncRetryHandler,
35+
action: pb.OrchestratorAction,
36+
startTime: Date,
37+
taskType: "activity" | "subOrchestration",
38+
) {
39+
super(action, startTime, taskType);
40+
this._handler = handler;
41+
}
42+
43+
/**
44+
* Gets the async retry handler for this task.
45+
*/
46+
get handler(): AsyncRetryHandler {
47+
return this._handler;
48+
}
49+
50+
/**
51+
* Invokes the async retry handler to determine whether to retry.
52+
*
53+
* @param currentTime - The current orchestration time (for deterministic replay)
54+
* @returns A Promise that resolves to true if the handler says to retry, false otherwise
55+
*/
56+
async shouldRetry(currentTime: Date): Promise<boolean> {
57+
if (!this.lastFailure) {
58+
return false;
59+
}
60+
61+
// Check for non-retriable failures (e.g., activity not found)
62+
if (this.lastFailure.getIsnonretriable()) {
63+
return false;
64+
}
65+
66+
const failureDetails: TaskFailureDetails = {
67+
errorType: this.lastFailure.getErrortype() || "Error",
68+
message: this.lastFailure.getErrormessage() || "",
69+
stackTrace: this.lastFailure.getStacktrace()?.getValue(),
70+
};
71+
72+
const totalRetryTimeMs = currentTime.getTime() - this.startTime.getTime();
73+
74+
const retryContext = createRetryContext(
75+
this.attemptCount,
76+
failureDetails,
77+
totalRetryTimeMs,
78+
false, // isCancelled - not yet supported
79+
);
80+
81+
return this._handler(retryContext);
82+
}
83+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
import * as pb from "../proto/orchestrator_service_pb";
5+
import { TaskFailedError } from "./exception/task-failed-error";
6+
import { CompletableTask } from "./completable-task";
7+
8+
/**
9+
* Represents the type of retryable task - activity or sub-orchestration.
10+
*/
11+
export type RetryTaskType = "activity" | "subOrchestration";
12+
13+
/**
14+
* Abstract base class for tasks that support retry behavior.
15+
*
16+
* @remarks
17+
* This class provides shared state and behavior for both policy-based retry
18+
* (RetryableTask) and handler-based retry (RetryHandlerTask). It manages the
19+
* action, attempt count, start time, task type, and failure tracking that are
20+
* common to both retry strategies.
21+
*/
22+
export abstract class RetryTaskBase<T> extends CompletableTask<T> {
23+
private readonly _action: pb.OrchestratorAction;
24+
private readonly _startTime: Date;
25+
private readonly _taskType: RetryTaskType;
26+
private _attemptCount: number;
27+
private _lastFailure: pb.TaskFailureDetails | undefined;
28+
29+
/**
30+
* Creates a new RetryTaskBase instance.
31+
*
32+
* @param action - The orchestrator action associated with this task
33+
* @param startTime - The time when the task was first scheduled
34+
* @param taskType - The type of task (activity or sub-orchestration)
35+
*/
36+
constructor(
37+
action: pb.OrchestratorAction,
38+
startTime: Date,
39+
taskType: RetryTaskType,
40+
) {
41+
super();
42+
this._action = action;
43+
this._startTime = startTime;
44+
this._taskType = taskType;
45+
this._attemptCount = 1;
46+
}
47+
48+
/**
49+
* Gets the orchestrator action associated with this task.
50+
*/
51+
get action(): pb.OrchestratorAction {
52+
return this._action;
53+
}
54+
55+
/**
56+
* Gets the current attempt count.
57+
*/
58+
get attemptCount(): number {
59+
return this._attemptCount;
60+
}
61+
62+
/**
63+
* Gets the time when the task was first scheduled.
64+
*/
65+
get startTime(): Date {
66+
return this._startTime;
67+
}
68+
69+
/**
70+
* Gets the type of task (activity or sub-orchestration).
71+
*/
72+
get taskType(): RetryTaskType {
73+
return this._taskType;
74+
}
75+
76+
/**
77+
* Gets the last failure details.
78+
*/
79+
get lastFailure(): pb.TaskFailureDetails | undefined {
80+
return this._lastFailure;
81+
}
82+
83+
/**
84+
* Increments the attempt count.
85+
*/
86+
incrementAttemptCount(): void {
87+
this._attemptCount++;
88+
}
89+
90+
/**
91+
* Records a failure for potential retry.
92+
*
93+
* @param message - The failure message
94+
* @param details - The failure details from the protobuf
95+
*/
96+
recordFailure(message: string, details?: pb.TaskFailureDetails): void {
97+
details = details ?? new pb.TaskFailureDetails();
98+
this._lastFailure = details;
99+
100+
// Store the exception for later if we exhaust retries
101+
this._exception = new TaskFailedError(message, details);
102+
}
103+
104+
/**
105+
* Completes the task with the given result.
106+
* Clears any previously recorded failure since the retry succeeded.
107+
*
108+
* @param result - The result of the task
109+
*/
110+
override complete(result: T): void {
111+
// Clear any previously recorded failure since the retry succeeded
112+
this._exception = undefined;
113+
this._lastFailure = undefined;
114+
115+
// Call the parent implementation
116+
super.complete(result);
117+
}
118+
119+
/**
120+
* Marks the task as failed after all retry attempts are exhausted.
121+
*
122+
* @param message - The failure message
123+
* @param details - Optional failure details
124+
*/
125+
override fail(message: string, details?: pb.TaskFailureDetails): void {
126+
if (this._isComplete) {
127+
throw new Error("Task is already completed");
128+
}
129+
130+
details = details ?? new pb.TaskFailureDetails();
131+
132+
this._exception = new TaskFailedError(message, details);
133+
this._isComplete = true;
134+
135+
if (this._parent) {
136+
this._parent.onChildCompleted(this);
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)