Skip to content

Commit 902f33d

Browse files
authored
Fix WhenAllTask crash when children complete after fail-fast (#123)
1 parent b74fd05 commit 902f33d

4 files changed

Lines changed: 252 additions & 1 deletion

File tree

packages/durabletask-js/src/task/when-all-task.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,16 @@ export class WhenAllTask<T> extends CompositeTask<T[]> {
2727

2828
onChildCompleted(task: Task<any>): void {
2929
if (this._isComplete) {
30-
throw new Error("Task is already completed");
30+
// Already completed (fail-fast or all children done). Ignore subsequent child completions.
31+
return;
3132
}
3233

3334
this._completedTasks++;
3435

3536
if (task.isFailed && !this._exception) {
3637
this._exception = task.getException();
3738
this._isComplete = true;
39+
return;
3840
}
3941

4042
if (this._completedTasks == this._tasks.length) {

packages/durabletask-js/src/worker/runtime-orchestration-context.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
130130
}
131131

132132
async resume() {
133+
// Don't resume if the orchestration is already complete
134+
if (this._isComplete) {
135+
return;
136+
}
137+
133138
// This is never expected unless maybe there's an issue with the history
134139
if (!this._generator) {
135140
throw new Error("The orchestrator generator is not initialized! Was the orchestration history corrupted?");

packages/durabletask-js/test/orchestration_executor.spec.ts

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,6 +1179,138 @@ describe("Orchestration Executor", () => {
11791179
it("should throw when whenAny is called with an empty task array", () => {
11801180
expect(() => whenAny([])).toThrow("whenAny requires at least one task");
11811181
});
1182+
1183+
it("should fail whenAll correctly when the failing task is the last to complete", async () => {
1184+
const printInt = (_: any, value: number) => {
1185+
return value.toString();
1186+
};
1187+
1188+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
1189+
const tasks: Task<string>[] = [];
1190+
1191+
for (let i = 0; i < 3; i++) {
1192+
tasks.push(ctx.callActivity(printInt, i));
1193+
}
1194+
1195+
const results = yield whenAll(tasks);
1196+
return results;
1197+
};
1198+
1199+
const registry = new Registry();
1200+
const orchestratorName = registry.addOrchestrator(orchestrator);
1201+
const activityName = registry.addActivity(printInt);
1202+
1203+
const oldEvents = [newOrchestratorStartedEvent(), newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID)];
1204+
1205+
for (let i = 0; i < 3; i++) {
1206+
oldEvents.push(newTaskScheduledEvent(i + 1, activityName, i.toString()));
1207+
}
1208+
1209+
// First two tasks succeed, last task fails
1210+
const ex = new Error("Last task failed");
1211+
const newEvents: any[] = [
1212+
newTaskCompletedEvent(1, printInt(null, 0)),
1213+
newTaskCompletedEvent(2, printInt(null, 1)),
1214+
newTaskFailedEvent(3, ex),
1215+
];
1216+
1217+
const executor = new OrchestrationExecutor(registry, testLogger);
1218+
const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents);
1219+
1220+
const completeAction = getAndValidateSingleCompleteOrchestrationAction(result);
1221+
expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
1222+
expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("TaskFailedError");
1223+
expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain(ex.message);
1224+
});
1225+
1226+
it("should not crash when additional tasks complete after whenAll fails fast", async () => {
1227+
const printInt = (_: any, value: number) => {
1228+
return value.toString();
1229+
};
1230+
1231+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
1232+
const tasks: Task<string>[] = [];
1233+
1234+
for (let i = 0; i < 3; i++) {
1235+
tasks.push(ctx.callActivity(printInt, i));
1236+
}
1237+
1238+
const results = yield whenAll(tasks);
1239+
return results;
1240+
};
1241+
1242+
const registry = new Registry();
1243+
const orchestratorName = registry.addOrchestrator(orchestrator);
1244+
const activityName = registry.addActivity(printInt);
1245+
1246+
const oldEvents = [newOrchestratorStartedEvent(), newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID)];
1247+
1248+
for (let i = 0; i < 3; i++) {
1249+
oldEvents.push(newTaskScheduledEvent(i + 1, activityName, i.toString()));
1250+
}
1251+
1252+
// First task fails, then remaining tasks complete in the same batch
1253+
const ex = new Error("First task failed");
1254+
const newEvents: any[] = [
1255+
newTaskFailedEvent(1, ex),
1256+
newTaskCompletedEvent(2, printInt(null, 1)),
1257+
newTaskCompletedEvent(3, printInt(null, 2)),
1258+
];
1259+
1260+
const executor = new OrchestrationExecutor(registry, testLogger);
1261+
const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents);
1262+
1263+
const completeAction = getAndValidateSingleCompleteOrchestrationAction(result);
1264+
expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
1265+
expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("TaskFailedError");
1266+
expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain(ex.message);
1267+
});
1268+
1269+
it("should preserve orchestration result when whenAll failure is caught and other tasks complete", async () => {
1270+
const printInt = (_: any, value: number) => {
1271+
return value.toString();
1272+
};
1273+
1274+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
1275+
const tasks: Task<string>[] = [];
1276+
1277+
for (let i = 0; i < 3; i++) {
1278+
tasks.push(ctx.callActivity(printInt, i));
1279+
}
1280+
1281+
try {
1282+
yield whenAll(tasks);
1283+
} catch {
1284+
// Intentionally catch the failure and return a fallback result
1285+
return "handled";
1286+
}
1287+
};
1288+
1289+
const registry = new Registry();
1290+
const orchestratorName = registry.addOrchestrator(orchestrator);
1291+
const activityName = registry.addActivity(printInt);
1292+
1293+
const oldEvents = [newOrchestratorStartedEvent(), newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID)];
1294+
1295+
for (let i = 0; i < 3; i++) {
1296+
oldEvents.push(newTaskScheduledEvent(i + 1, activityName, i.toString()));
1297+
}
1298+
1299+
// First task fails, then remaining tasks complete in the same batch
1300+
const ex = new Error("One task failed");
1301+
const newEvents: any[] = [
1302+
newTaskFailedEvent(1, ex),
1303+
newTaskCompletedEvent(2, printInt(null, 1)),
1304+
newTaskCompletedEvent(3, printInt(null, 2)),
1305+
];
1306+
1307+
const executor = new OrchestrationExecutor(registry, testLogger);
1308+
const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents);
1309+
1310+
const completeAction = getAndValidateSingleCompleteOrchestrationAction(result);
1311+
expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
1312+
expect(completeAction?.getResult()?.getValue()).toEqual(JSON.stringify("handled"));
1313+
});
11821314
});
11831315

11841316
function getAndValidateSingleCompleteOrchestrationAction(

test/e2e-azuremanaged/orchestration.spec.ts

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,118 @@ describe("Durable Task Scheduler (DTS) E2E Tests", () => {
190190
expect(activityCounter).toEqual(10);
191191
}, 31000);
192192

193+
it("should remain completed when whenAll fail-fast is caught and other children complete later", async () => {
194+
let failActivityCounter = 0;
195+
let slowActivityCounter = 0;
196+
197+
const fastFail = async (_: ActivityContext): Promise<void> => {
198+
failActivityCounter++;
199+
throw new Error("fast failure for whenAll fail-fast test");
200+
};
201+
202+
const slowSuccess = async (_: ActivityContext, _input: string): Promise<void> => {
203+
slowActivityCounter++;
204+
await new Promise((resolve) => setTimeout(resolve, 1200));
205+
};
206+
207+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
208+
try {
209+
yield whenAll([
210+
ctx.callActivity(fastFail),
211+
ctx.callActivity(slowSuccess, "a"),
212+
ctx.callActivity(slowSuccess, "b"),
213+
]);
214+
} catch {
215+
return "handled-failure";
216+
}
217+
};
218+
219+
taskHubWorker.addActivity(fastFail);
220+
taskHubWorker.addActivity(slowSuccess);
221+
taskHubWorker.addOrchestrator(orchestrator);
222+
await taskHubWorker.start();
223+
224+
const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
225+
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
226+
227+
expect(state).toBeDefined();
228+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
229+
expect(state?.failureDetails).toBeUndefined();
230+
expect(state?.serializedOutput).toEqual(JSON.stringify("handled-failure"));
231+
expect(failActivityCounter).toEqual(1);
232+
expect(slowActivityCounter).toEqual(2);
233+
234+
await new Promise((resolve) => setTimeout(resolve, 2000));
235+
const finalState = await taskHubClient.getOrchestrationState(id);
236+
expect(finalState).toBeDefined();
237+
expect(finalState?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
238+
expect(finalState?.serializedOutput).toEqual(JSON.stringify("handled-failure"));
239+
}, 31000);
240+
241+
it("should preserve original whenAll failure details when not caught", async () => {
242+
const fastFail = async (_: ActivityContext): Promise<void> => {
243+
throw new Error("fast failure for whenAll uncaught test");
244+
};
245+
246+
const slowSuccess = async (_: ActivityContext): Promise<void> => {
247+
await new Promise((resolve) => setTimeout(resolve, 1200));
248+
};
249+
250+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
251+
yield whenAll([
252+
ctx.callActivity(fastFail),
253+
ctx.callActivity(slowSuccess),
254+
ctx.callActivity(slowSuccess),
255+
]);
256+
};
257+
258+
taskHubWorker.addActivity(fastFail);
259+
taskHubWorker.addActivity(slowSuccess);
260+
taskHubWorker.addOrchestrator(orchestrator);
261+
await taskHubWorker.start();
262+
263+
const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
264+
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
265+
266+
expect(state).toBeDefined();
267+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
268+
expect(state?.failureDetails).toBeDefined();
269+
expect(state?.failureDetails?.message).toContain("fast failure for whenAll uncaught test");
270+
expect(state?.failureDetails?.message).not.toContain("Task is already completed");
271+
}, 31000);
272+
273+
it("should fail whenAll correctly when the failing task is the last to complete", async () => {
274+
const fastSuccess = async (_: ActivityContext): Promise<void> => {
275+
// completes immediately
276+
};
277+
278+
const slowFail = async (_: ActivityContext): Promise<void> => {
279+
await new Promise((resolve) => setTimeout(resolve, 1200));
280+
throw new Error("slow failure as last task");
281+
};
282+
283+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
284+
yield whenAll([
285+
ctx.callActivity(fastSuccess),
286+
ctx.callActivity(fastSuccess),
287+
ctx.callActivity(slowFail),
288+
]);
289+
};
290+
291+
taskHubWorker.addActivity(fastSuccess);
292+
taskHubWorker.addActivity(slowFail);
293+
taskHubWorker.addOrchestrator(orchestrator);
294+
await taskHubWorker.start();
295+
296+
const id = await taskHubClient.scheduleNewOrchestration(orchestrator);
297+
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
298+
299+
expect(state).toBeDefined();
300+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED);
301+
expect(state?.failureDetails).toBeDefined();
302+
expect(state?.failureDetails?.message).toContain("slow failure as last task");
303+
}, 31000);
304+
193305
it("should be able to use the sub-orchestration", async () => {
194306
let activityCounter = 0;
195307

0 commit comments

Comments
 (0)