Skip to content

Commit eceb158

Browse files
committed
feat: Add E2E tests for AsyncRetryHandler with activity and sub-orchestration retries
1 parent 93d2af8 commit eceb158

File tree

1 file changed

+341
-0
lines changed

1 file changed

+341
-0
lines changed
Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* E2E tests for AsyncRetryHandler (custom retry logic) in Durable Task Scheduler (DTS).
6+
*
7+
* These tests cover:
8+
* - Activity retry with custom retry handler
9+
* - Sub-orchestration retry with custom retry handler
10+
* - Retry handler that filters by error type
11+
* - Retry handler that limits by attempt count
12+
* - Sync RetryHandler support
13+
*
14+
* Environment variables (choose one):
15+
* - DTS_CONNECTION_STRING: Full connection string
16+
* OR
17+
* - ENDPOINT: The endpoint for the DTS emulator (default: localhost:8080)
18+
* - TASKHUB: The task hub name (default: default)
19+
*/
20+
21+
import {
22+
TaskHubGrpcClient,
23+
TaskHubGrpcWorker,
24+
ProtoOrchestrationStatus as OrchestrationStatus,
25+
ActivityContext,
26+
OrchestrationContext,
27+
TOrchestrator,
28+
AsyncRetryHandler,
29+
RetryHandler,
30+
RetryContext,
31+
toAsyncRetryHandler,
32+
} from "@microsoft/durabletask-js";
33+
import {
34+
DurableTaskAzureManagedClientBuilder,
35+
DurableTaskAzureManagedWorkerBuilder,
36+
} from "@microsoft/durabletask-js-azuremanaged";
37+
38+
// Read environment variables
39+
const connectionString = process.env.DTS_CONNECTION_STRING;
40+
const endpoint = process.env.ENDPOINT || "localhost:8080";
41+
const taskHub = process.env.TASKHUB || "default";
42+
43+
function createClient(): TaskHubGrpcClient {
44+
if (connectionString) {
45+
return new DurableTaskAzureManagedClientBuilder().connectionString(connectionString).build();
46+
}
47+
return new DurableTaskAzureManagedClientBuilder()
48+
.endpoint(endpoint, taskHub, null) // null credential for emulator (no auth)
49+
.build();
50+
}
51+
52+
function createWorker(): TaskHubGrpcWorker {
53+
if (connectionString) {
54+
return new DurableTaskAzureManagedWorkerBuilder().connectionString(connectionString).build();
55+
}
56+
return new DurableTaskAzureManagedWorkerBuilder()
57+
.endpoint(endpoint, taskHub, null) // null credential for emulator (no auth)
58+
.build();
59+
}
60+
61+
describe("Retry Handler E2E Tests", () => {
62+
let taskHubClient: TaskHubGrpcClient;
63+
let taskHubWorker: TaskHubGrpcWorker;
64+
65+
beforeEach(async () => {
66+
taskHubClient = createClient();
67+
taskHubWorker = createWorker();
68+
});
69+
70+
afterEach(async () => {
71+
try {
72+
await taskHubWorker.stop();
73+
} catch {
74+
// Worker wasn't started, ignore the error
75+
}
76+
await taskHubClient.stop();
77+
});
78+
79+
// ==================== Activity Retry Handler Tests ====================
80+
81+
describe("activity retry with AsyncRetryHandler", () => {
82+
it("should retry activity when handler returns true and succeed after transient failures", async () => {
83+
let attemptCount = 0;
84+
85+
const flakyActivity = async (_: ActivityContext, input: number) => {
86+
attemptCount++;
87+
if (attemptCount < 3) {
88+
throw new Error(`Transient failure on attempt ${attemptCount}`);
89+
}
90+
return input * 2;
91+
};
92+
93+
// Retry handler: retry up to 5 attempts
94+
const retryHandler: AsyncRetryHandler = async (ctx: RetryContext) => {
95+
return ctx.lastAttemptNumber < 5;
96+
};
97+
98+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any {
99+
const result = yield ctx.callActivity(flakyActivity, input, { retry: retryHandler });
100+
return result;
101+
};
102+
103+
taskHubWorker.addActivity(flakyActivity);
104+
taskHubWorker.addOrchestrator(orchestrator);
105+
await taskHubWorker.start();
106+
107+
const id = await taskHubClient.scheduleNewOrchestration(orchestrator, 21);
108+
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
109+
110+
expect(state).toBeDefined();
111+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
112+
expect(state?.failureDetails).toBeUndefined();
113+
expect(state?.serializedOutput).toEqual(JSON.stringify(42));
114+
expect(attemptCount).toBe(3); // Failed twice, succeeded on third
115+
}, 31000);
116+
117+
it("should fail when retry handler returns false", async () => {
118+
let attemptCount = 0;
119+
120+
const failingActivity = async (_: ActivityContext) => {
121+
attemptCount++;
122+
const error = new Error("Permanent failure");
123+
error.name = "PermanentError";
124+
throw error;
125+
};
126+
127+
// Retry handler: don't retry PermanentError
128+
const retryHandler: AsyncRetryHandler = async (ctx: RetryContext) => {
129+
return ctx.lastFailure.errorType !== "PermanentError";
130+
};
131+
132+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
133+
const result = yield ctx.callActivity(failingActivity, undefined, { retry: retryHandler });
134+
return result;
135+
};
136+
137+
taskHubWorker.addActivity(failingActivity);
138+
taskHubWorker.addOrchestrator(orchestrator);
139+
await taskHubWorker.start();
140+
141+
const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
142+
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
143+
144+
expect(state).toBeDefined();
145+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
146+
expect(state?.failureDetails).toBeDefined();
147+
// Should only attempt once since handler returns false for PermanentError
148+
expect(attemptCount).toBe(1);
149+
}, 31000);
150+
151+
it("should exhaust retries when handler limits by attempt count", async () => {
152+
let attemptCount = 0;
153+
const maxAttempts = 3;
154+
155+
const alwaysFailsActivity = async (_: ActivityContext) => {
156+
attemptCount++;
157+
throw new Error(`Failure on attempt ${attemptCount}`);
158+
};
159+
160+
// Retry handler: retry up to maxAttempts
161+
const retryHandler: AsyncRetryHandler = async (ctx: RetryContext) => {
162+
return ctx.lastAttemptNumber < maxAttempts;
163+
};
164+
165+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
166+
const result = yield ctx.callActivity(alwaysFailsActivity, undefined, { retry: retryHandler });
167+
return result;
168+
};
169+
170+
taskHubWorker.addActivity(alwaysFailsActivity);
171+
taskHubWorker.addOrchestrator(orchestrator);
172+
await taskHubWorker.start();
173+
174+
const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
175+
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
176+
177+
expect(state).toBeDefined();
178+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
179+
expect(state?.failureDetails).toBeDefined();
180+
expect(attemptCount).toBe(maxAttempts);
181+
}, 31000);
182+
183+
it("should filter retries based on error type using handler", async () => {
184+
let attemptCount = 0;
185+
186+
const activityWithMixedErrors = async (_: ActivityContext) => {
187+
attemptCount++;
188+
if (attemptCount === 1) {
189+
const error = new Error("Connection timeout");
190+
error.name = "TransientError";
191+
throw error;
192+
} else {
193+
const error = new Error("Invalid input");
194+
error.name = "ValidationError";
195+
throw error;
196+
}
197+
};
198+
199+
// Retry handler: only retry TransientError, not ValidationError
200+
const retryHandler: AsyncRetryHandler = async (ctx: RetryContext) => {
201+
return ctx.lastFailure.errorType === "TransientError";
202+
};
203+
204+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
205+
const result = yield ctx.callActivity(activityWithMixedErrors, undefined, { retry: retryHandler });
206+
return result;
207+
};
208+
209+
taskHubWorker.addActivity(activityWithMixedErrors);
210+
taskHubWorker.addOrchestrator(orchestrator);
211+
await taskHubWorker.start();
212+
213+
const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
214+
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
215+
216+
expect(state).toBeDefined();
217+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
218+
expect(state?.failureDetails).toBeDefined();
219+
// First: TransientError (retry), Second: ValidationError (stop)
220+
expect(attemptCount).toBe(2);
221+
}, 31000);
222+
});
223+
224+
// ==================== Sub-Orchestration Retry Handler Tests ====================
225+
226+
describe("sub-orchestration retry with AsyncRetryHandler", () => {
227+
it("should retry sub-orchestration when handler returns true", async () => {
228+
let attemptCount = 0;
229+
230+
// eslint-disable-next-line require-yield
231+
const failingSubOrchestrator: TOrchestrator = async function* (_ctx: OrchestrationContext): any {
232+
attemptCount++;
233+
if (attemptCount < 3) {
234+
throw new Error(`Sub-orchestration failure on attempt ${attemptCount}`);
235+
}
236+
return "sub-orch-result";
237+
};
238+
239+
const retryHandler: AsyncRetryHandler = async (ctx: RetryContext) => {
240+
return ctx.lastAttemptNumber < 5;
241+
};
242+
243+
const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
244+
const result = yield ctx.callSubOrchestrator(failingSubOrchestrator, undefined, {
245+
retry: retryHandler,
246+
});
247+
return result;
248+
};
249+
250+
taskHubWorker.addOrchestrator(failingSubOrchestrator);
251+
taskHubWorker.addOrchestrator(parentOrchestrator);
252+
await taskHubWorker.start();
253+
254+
const id = await taskHubClient.scheduleNewOrchestration(parentOrchestrator);
255+
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
256+
257+
expect(state).toBeDefined();
258+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
259+
expect(state?.failureDetails).toBeUndefined();
260+
expect(state?.serializedOutput).toEqual(JSON.stringify("sub-orch-result"));
261+
expect(attemptCount).toBe(3);
262+
}, 31000);
263+
264+
it("should fail sub-orchestration when handler returns false", async () => {
265+
let attemptCount = 0;
266+
267+
// eslint-disable-next-line require-yield
268+
const alwaysFailsSubOrch: TOrchestrator = async function* (_ctx: OrchestrationContext): any {
269+
attemptCount++;
270+
const error = new Error("Fatal sub-orchestration error");
271+
error.name = "FatalError";
272+
throw error;
273+
};
274+
275+
// Don't retry FatalError
276+
const retryHandler: AsyncRetryHandler = async (ctx: RetryContext) => {
277+
return ctx.lastFailure.errorType !== "FatalError";
278+
};
279+
280+
const parentOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
281+
const result = yield ctx.callSubOrchestrator(alwaysFailsSubOrch, undefined, {
282+
retry: retryHandler,
283+
});
284+
return result;
285+
};
286+
287+
taskHubWorker.addOrchestrator(alwaysFailsSubOrch);
288+
taskHubWorker.addOrchestrator(parentOrchestrator);
289+
await taskHubWorker.start();
290+
291+
const id = await taskHubClient.scheduleNewOrchestration(parentOrchestrator);
292+
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
293+
294+
expect(state).toBeDefined();
295+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
296+
expect(state?.failureDetails).toBeDefined();
297+
// Should only attempt once since handler returns false for FatalError
298+
expect(attemptCount).toBe(1);
299+
}, 31000);
300+
});
301+
302+
// ==================== Sync RetryHandler Tests ====================
303+
304+
describe("sync RetryHandler support", () => {
305+
it("should support synchronous RetryHandler via toAsyncRetryHandler", async () => {
306+
let attemptCount = 0;
307+
308+
const flakyActivity = async (_: ActivityContext) => {
309+
attemptCount++;
310+
if (attemptCount < 3) {
311+
throw new Error(`Transient failure on attempt ${attemptCount}`);
312+
}
313+
return "success";
314+
};
315+
316+
// Use synchronous RetryHandler converted via toAsyncRetryHandler
317+
const syncHandler: RetryHandler = (ctx: RetryContext) => {
318+
return ctx.lastAttemptNumber < 5;
319+
};
320+
const retryHandler = toAsyncRetryHandler(syncHandler);
321+
322+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
323+
const result = yield ctx.callActivity(flakyActivity, undefined, { retry: retryHandler });
324+
return result;
325+
};
326+
327+
taskHubWorker.addActivity(flakyActivity);
328+
taskHubWorker.addOrchestrator(orchestrator);
329+
await taskHubWorker.start();
330+
331+
const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
332+
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
333+
334+
expect(state).toBeDefined();
335+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
336+
expect(state?.failureDetails).toBeUndefined();
337+
expect(state?.serializedOutput).toEqual(JSON.stringify("success"));
338+
expect(attemptCount).toBe(3);
339+
}, 31000);
340+
});
341+
});

0 commit comments

Comments
 (0)