Skip to content

Commit 70f4370

Browse files
committed
feedback
1 parent dbcfd5c commit 70f4370

File tree

3 files changed

+21
-17
lines changed

3 files changed

+21
-17
lines changed

packages/durabletask-js/src/testing/in-memory-backend.ts

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ interface StateWaiter {
5858
export class InMemoryOrchestrationBackend {
5959
private readonly instances: Map<string, OrchestrationInstance> = new Map();
6060
private readonly orchestrationQueue: string[] = [];
61+
private readonly orchestrationQueueSet: Set<string> = new Set();
6162
private readonly activityQueue: ActivityWorkItem[] = [];
6263
private readonly stateWaiters: Map<string, StateWaiter[]> = new Map();
6364
private nextCompletionToken: number = 1;
@@ -133,7 +134,7 @@ export class InMemoryOrchestrationBackend {
133134
instance.lastUpdatedAt = new Date();
134135

135136
// Ensure instance is queued for processing
136-
if (!this.orchestrationQueue.includes(instanceId)) {
137+
if (!this.orchestrationQueueSet.has(instanceId)) {
137138
this.enqueueOrchestration(instanceId);
138139
}
139140
}
@@ -155,7 +156,7 @@ export class InMemoryOrchestrationBackend {
155156
instance.pendingEvents.push(event);
156157
instance.lastUpdatedAt = new Date();
157158

158-
if (!this.orchestrationQueue.includes(instanceId)) {
159+
if (!this.orchestrationQueueSet.has(instanceId)) {
159160
this.enqueueOrchestration(instanceId);
160161
}
161162
}
@@ -177,7 +178,7 @@ export class InMemoryOrchestrationBackend {
177178
instance.pendingEvents.push(event);
178179
instance.lastUpdatedAt = new Date();
179180

180-
if (!this.orchestrationQueue.includes(instanceId)) {
181+
if (!this.orchestrationQueueSet.has(instanceId)) {
181182
this.enqueueOrchestration(instanceId);
182183
}
183184
}
@@ -195,7 +196,7 @@ export class InMemoryOrchestrationBackend {
195196
instance.pendingEvents.push(event);
196197
instance.lastUpdatedAt = new Date();
197198

198-
if (!this.orchestrationQueue.includes(instanceId)) {
199+
if (!this.orchestrationQueueSet.has(instanceId)) {
199200
this.enqueueOrchestration(instanceId);
200201
}
201202
}
@@ -224,6 +225,7 @@ export class InMemoryOrchestrationBackend {
224225
getNextOrchestrationWorkItem(): OrchestrationInstance | undefined {
225226
while (this.orchestrationQueue.length > 0) {
226227
const instanceId = this.orchestrationQueue.shift()!;
228+
this.orchestrationQueueSet.delete(instanceId);
227229
const instance = this.instances.get(instanceId);
228230

229231
if (instance && instance.pendingEvents.length > 0) {
@@ -259,6 +261,15 @@ export class InMemoryOrchestrationBackend {
259261
return;
260262
}
261263

264+
// Check history size limit before adding events
265+
const projectedSize = instance.history.length + instance.pendingEvents.length;
266+
if (projectedSize > this.maxHistorySize) {
267+
throw new Error(
268+
`Orchestration '${instanceId}' would exceed maximum history size of ${this.maxHistorySize} ` +
269+
`(current: ${instance.history.length}, pending: ${instance.pendingEvents.length})`,
270+
);
271+
}
272+
262273
// Move pending events to history
263274
instance.history.push(...instance.pendingEvents);
264275
instance.pendingEvents = [];
@@ -268,11 +279,6 @@ export class InMemoryOrchestrationBackend {
268279
instance.customStatus = customStatus;
269280
}
270281

271-
// Check history size limit
272-
if (instance.history.length > this.maxHistorySize) {
273-
throw new Error(`Orchestration '${instanceId}' exceeded maximum history size of ${this.maxHistorySize}`);
274-
}
275-
276282
// Transition to RUNNING once the orchestration has been processed for the first time
277283
if (instance.status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_PENDING) {
278284
instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING;
@@ -410,8 +416,9 @@ export class InMemoryOrchestrationBackend {
410416
}
411417

412418
private enqueueOrchestration(instanceId: string): void {
413-
if (!this.orchestrationQueue.includes(instanceId)) {
419+
if (!this.orchestrationQueueSet.has(instanceId)) {
414420
this.orchestrationQueue.push(instanceId);
421+
this.orchestrationQueueSet.add(instanceId);
415422
}
416423
}
417424

packages/durabletask-js/src/testing/test-worker.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,12 @@ export class TestOrchestrationWorker {
8787
}
8888

8989
/**
90-
* Stops the worker.
90+
* Stops the worker. This method is idempotent and can be safely called
91+
* even if the worker is not running.
9192
*/
9293
async stop(): Promise<void> {
9394
if (!this.isRunning) {
94-
throw new Error("The worker is not running.");
95+
return; // Already stopped, nothing to do
9596
}
9697

9798
this.stopRequested = true;

tests/e2e/orchestration.spec.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,7 @@ describe("Durable Functions", () => {
2929
});
3030

3131
afterEach(async () => {
32-
try {
33-
await taskHubWorker.stop();
34-
} catch {
35-
// Ignore if not running
36-
}
32+
await taskHubWorker.stop();
3733
await taskHubClient.stop();
3834
backend.reset();
3935
});

0 commit comments

Comments
 (0)