Skip to content

Commit d2e8476

Browse files
authored
fix: cancel pending timers when purging orchestration instances (#197)
1 parent d63d91e commit d2e8476

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

packages/durabletask-js/src/testing/in-memory-backend.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ export class InMemoryOrchestrationBackend {
6262
private readonly activityQueue: ActivityWorkItem[] = [];
6363
private readonly stateWaiters: Map<string, StateWaiter[]> = new Map();
6464
private readonly pendingTimers: Set<ReturnType<typeof setTimeout>> = new Set();
65+
private readonly instanceTimers: Map<string, Set<ReturnType<typeof setTimeout>>> = new Map();
6566
private nextCompletionToken: number = 1;
6667
private readonly maxHistorySize: number;
6768

@@ -217,6 +218,7 @@ export class InMemoryOrchestrationBackend {
217218

218219
this.instances.delete(instanceId);
219220
this.stateWaiters.delete(instanceId);
221+
this.cancelInstanceTimers(instanceId);
220222
return true;
221223
}
222224

@@ -394,6 +396,7 @@ export class InMemoryOrchestrationBackend {
394396
clearTimeout(timer);
395397
}
396398
this.pendingTimers.clear();
399+
this.instanceTimers.clear();
397400
}
398401

399402
/**
@@ -543,6 +546,7 @@ export class InMemoryOrchestrationBackend {
543546

544547
const timerHandle = setTimeout(() => {
545548
this.pendingTimers.delete(timerHandle);
549+
this.removeInstanceTimer(instance.instanceId, timerHandle);
546550
const currentInstance = this.instances.get(instance.instanceId);
547551
if (currentInstance && !this.isTerminalStatus(currentInstance.status)) {
548552
const timerFiredEvent = pbh.newTimerFiredEvent(timerId, fireAt);
@@ -552,6 +556,7 @@ export class InMemoryOrchestrationBackend {
552556
}
553557
}, delay);
554558
this.pendingTimers.add(timerHandle);
559+
this.addInstanceTimer(instance.instanceId, timerHandle);
555560
}
556561

557562
private processCreateSubOrchestrationAction(instance: OrchestrationInstance, action: pb.OrchestratorAction): void {
@@ -638,6 +643,36 @@ export class InMemoryOrchestrationBackend {
638643
}
639644
}
640645

646+
private addInstanceTimer(instanceId: string, timerHandle: ReturnType<typeof setTimeout>): void {
647+
let timers = this.instanceTimers.get(instanceId);
648+
if (!timers) {
649+
timers = new Set();
650+
this.instanceTimers.set(instanceId, timers);
651+
}
652+
timers.add(timerHandle);
653+
}
654+
655+
private removeInstanceTimer(instanceId: string, timerHandle: ReturnType<typeof setTimeout>): void {
656+
const timers = this.instanceTimers.get(instanceId);
657+
if (timers) {
658+
timers.delete(timerHandle);
659+
if (timers.size === 0) {
660+
this.instanceTimers.delete(instanceId);
661+
}
662+
}
663+
}
664+
665+
private cancelInstanceTimers(instanceId: string): void {
666+
const timers = this.instanceTimers.get(instanceId);
667+
if (timers) {
668+
for (const timer of timers) {
669+
clearTimeout(timer);
670+
this.pendingTimers.delete(timer);
671+
}
672+
this.instanceTimers.delete(instanceId);
673+
}
674+
}
675+
641676
private notifyWaiters(instanceId: string): void {
642677
const instance = this.instances.get(instanceId);
643678
const waiters = this.stateWaiters.get(instanceId);

packages/durabletask-js/test/in-memory-backend.spec.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,73 @@ describe("In-Memory Backend", () => {
356356
expect(state).toBeUndefined();
357357
});
358358

359+
it("should cancel pending timers when purging a terminated orchestration", async () => {
360+
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
361+
// Create a timer far in the future — it will still be pending when we terminate
362+
yield ctx.createTimer(3600);
363+
return "done";
364+
};
365+
366+
worker.addOrchestrator(orchestrator);
367+
await worker.start();
368+
369+
const id = await client.scheduleNewOrchestration(orchestrator);
370+
// Wait for the orchestration to start so the timer action is processed by the backend
371+
await client.waitForOrchestrationStart(id, false, 5);
372+
373+
// Terminate while the long timer is still pending
374+
await client.terminateOrchestration(id, "terminated");
375+
const state = await client.waitForOrchestrationCompletion(id, true, 10);
376+
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.TERMINATED);
377+
378+
// Timer should still be pending before purge
379+
const pendingTimersBefore = (backend as any).pendingTimers.size;
380+
expect(pendingTimersBefore).toBeGreaterThan(0);
381+
382+
// Purge the terminated orchestration
383+
const result = await client.purgeOrchestration(id);
384+
expect(result.deletedInstanceCount).toEqual(1);
385+
386+
// After purge, pending timers for this instance should be cancelled
387+
expect((backend as any).pendingTimers.size).toBe(0);
388+
expect((backend as any).instanceTimers.size).toBe(0);
389+
});
390+
391+
it("should cancel pending timers for only the purged orchestration", async () => {
392+
const timerOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
393+
yield ctx.createTimer(3600);
394+
return "done";
395+
};
396+
397+
const waitOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
398+
yield ctx.createTimer(7200);
399+
return "done";
400+
};
401+
402+
worker.addOrchestrator(timerOrchestrator);
403+
worker.addOrchestrator(waitOrchestrator);
404+
await worker.start();
405+
406+
// Start two orchestrations that both create long timers
407+
const id1 = await client.scheduleNewOrchestration(timerOrchestrator);
408+
const id2 = await client.scheduleNewOrchestration(waitOrchestrator);
409+
410+
await client.waitForOrchestrationStart(id1, false, 5);
411+
await client.waitForOrchestrationStart(id2, false, 5);
412+
413+
// Terminate and purge only the first orchestration
414+
await client.terminateOrchestration(id1, "terminated");
415+
await client.waitForOrchestrationCompletion(id1, false, 10);
416+
417+
const result = await client.purgeOrchestration(id1);
418+
expect(result.deletedInstanceCount).toEqual(1);
419+
420+
// The second orchestration's timer should still be pending
421+
expect((backend as any).pendingTimers.size).toBe(1);
422+
expect((backend as any).instanceTimers.has(id2)).toBe(true);
423+
expect((backend as any).instanceTimers.has(id1)).toBe(false);
424+
});
425+
359426
it("should allow reusing instance IDs after reset", async () => {
360427
const orchestrator: TOrchestrator = async (_: OrchestrationContext, input: number) => {
361428
return input * 2;

0 commit comments

Comments
 (0)