Skip to content

Commit 069700f

Browse files
authored
fix: Add guard clause to handleSubOrchestrationCompleted to prevent unconditional resume (#183)
1 parent 255790f commit 069700f

2 files changed

Lines changed: 53 additions & 20 deletions

File tree

packages/durabletask-js/src/worker/orchestration-executor.ts

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -386,25 +386,11 @@ export class OrchestrationExecutor {
386386
}
387387

388388
private async handleSubOrchestrationCompleted(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise<void> {
389-
const subOrchestrationInstanceCompletedEvent = event.getSuborchestrationinstancecompleted();
390-
const taskId = subOrchestrationInstanceCompletedEvent
391-
? subOrchestrationInstanceCompletedEvent.getTaskscheduledid()
392-
: undefined;
393-
394-
let subOrchTask;
395-
396-
if (taskId !== undefined) {
397-
subOrchTask = ctx._pendingTasks[taskId];
398-
delete ctx._pendingTasks[taskId];
399-
}
400-
401-
const result = parseJsonField(subOrchestrationInstanceCompletedEvent?.getResult());
402-
403-
if (subOrchTask) {
404-
subOrchTask.complete(result);
405-
}
406-
407-
await ctx.resume();
389+
const completedEvent = event.getSuborchestrationinstancecompleted();
390+
const taskId = completedEvent ? completedEvent.getTaskscheduledid() : undefined;
391+
const result = completedEvent?.getResult();
392+
const normalizedResult = isEmpty(result) ? undefined : result;
393+
await this.handleCompletedTask(ctx, taskId, normalizedResult, "subOrchestrationInstanceCompleted");
408394
}
409395

410396
private async handleSubOrchestrationFailed(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise<void> {

packages/durabletask-js/test/orchestration_executor.spec.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ import { OrchestrationExecutor, OrchestrationExecutionResult } from "../src/work
2323
import * as pb from "../src/proto/orchestrator_service_pb";
2424
import { Registry } from "../src/worker/registry";
2525
import { TOrchestrator } from "../src/types/orchestrator.type";
26-
import { NoOpLogger } from "../src/types/logger.type";
26+
import { NoOpLogger, StructuredLogger, LogEvent } from "../src/types/logger.type";
27+
import { EVENT_ORCHESTRATION_UNEXPECTED_EVENT } from "../src/worker/logs";
2728
import { ActivityContext } from "../src/task/context/activity-context";
2829
import { CompletableTask } from "../src/task/completable-task";
2930
import { Task } from "../src/task/task";
@@ -570,6 +571,52 @@ describe("Orchestration Executor", () => {
570571
// assert user_code_statement in complete_action.failureDetails.stackTrace.value
571572
});
572573

574+
it("should produce no actions and log a warning when sub-orchestration completion has unmatched taskId", async () => {
575+
const subOrchestrator = async (_: OrchestrationContext) => {
576+
// do nothing
577+
};
578+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any {
579+
const res = yield ctx.callSubOrchestrator(subOrchestrator);
580+
return res;
581+
};
582+
const registry = new Registry();
583+
const subOrchestratorName = registry.addOrchestrator(subOrchestrator);
584+
const orchestratorName = registry.addOrchestrator(orchestrator);
585+
const oldEvents = [
586+
newOrchestratorStartedEvent(),
587+
newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID, undefined),
588+
newSubOrchestrationCreatedEvent(1, subOrchestratorName, "sub-orch-123"),
589+
];
590+
// Send a completion event with a taskId (999) that does not match any pending task.
591+
const newEvents = [newSubOrchestrationCompletedEvent(999, JSON.stringify("unexpected"))];
592+
593+
// Use a spy logger to verify the warning log is emitted via handleCompletedTask's guard clause
594+
const loggedEvents: LogEvent[] = [];
595+
const spyLogger: StructuredLogger = {
596+
error: () => {},
597+
warn: () => {},
598+
info: () => {},
599+
debug: () => {},
600+
logEvent: (_level, event, _message) => {
601+
loggedEvents.push(event);
602+
},
603+
};
604+
605+
const executor = new OrchestrationExecutor(registry, spyLogger);
606+
const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents);
607+
608+
// The orchestration should still be waiting for the real sub-orchestration to complete.
609+
expect(result.actions.length).toEqual(0);
610+
611+
// Verify the unexpected event warning was logged (proves the guard clause was hit)
612+
const unexpectedEvents = loggedEvents.filter(
613+
(e) => e.eventId === EVENT_ORCHESTRATION_UNEXPECTED_EVENT,
614+
);
615+
expect(unexpectedEvents.length).toEqual(1);
616+
expect(unexpectedEvents[0].properties?.eventType).toEqual("subOrchestrationInstanceCompleted");
617+
expect(unexpectedEvents[0].properties?.eventId).toEqual(999);
618+
});
619+
573620
it("should test that an orchestration can wait for and process an external event sent by a client", async () => {
574621
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any {
575622
const res = yield ctx.waitForExternalEvent("my_event");

0 commit comments

Comments
 (0)