Skip to content

Commit 648b212

Browse files
committed
fix: track entity execution in pendingWorkItems for graceful shutdown
Entity execution methods (_executeEntity and _executeEntityV2) were async but their returned promises were not tracked in _pendingWorkItems. This meant entity operations could be silently dropped during worker shutdown, as stop() only waits for tracked pending work items to complete. This change mirrors the existing pattern used by _executeOrchestrator and _executeActivity: synchronous wrapper methods that track the async work promise in _pendingWorkItems and clean up via .finally(). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 29455dd commit 648b212

2 files changed

Lines changed: 222 additions & 5 deletions

File tree

packages/durabletask-js/src/worker/task-hub-grpc-worker.ts

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,23 @@ export class TaskHubGrpcWorker {
823823
}
824824

825825
/**
826-
* Executes an entity batch request.
826+
* Executes an entity batch request and tracks it as a pending work item.
827+
*/
828+
private _executeEntity(
829+
req: pb.EntityBatchRequest,
830+
completionToken: string,
831+
stub: stubs.TaskHubSidecarServiceClient,
832+
operationInfos?: pb.OperationInfo[],
833+
): void {
834+
const workPromise = this._executeEntityInternal(req, completionToken, stub, operationInfos);
835+
this._pendingWorkItems.add(workPromise);
836+
workPromise.finally(() => {
837+
this._pendingWorkItems.delete(workPromise);
838+
});
839+
}
840+
841+
/**
842+
* Internal implementation of entity batch execution.
827843
*
828844
* @param req - The entity batch request from the sidecar.
829845
* @param completionToken - The completion token for the work item.
@@ -834,7 +850,7 @@ export class TaskHubGrpcWorker {
834850
* This method looks up the entity by name, creates a TaskEntityShim, executes the batch,
835851
* and sends the result back to the sidecar.
836852
*/
837-
private async _executeEntity(
853+
private async _executeEntityInternal(
838854
req: pb.EntityBatchRequest,
839855
completionToken: string,
840856
stub: stubs.TaskHubSidecarServiceClient,
@@ -907,7 +923,22 @@ export class TaskHubGrpcWorker {
907923
}
908924

909925
/**
910-
* Executes an entity request (V2 format).
926+
* Executes an entity request (V2 format) and tracks it as a pending work item.
927+
*/
928+
private _executeEntityV2(
929+
req: pb.EntityRequest,
930+
completionToken: string,
931+
stub: stubs.TaskHubSidecarServiceClient,
932+
): void {
933+
const workPromise = this._executeEntityV2Internal(req, completionToken, stub);
934+
this._pendingWorkItems.add(workPromise);
935+
workPromise.finally(() => {
936+
this._pendingWorkItems.delete(workPromise);
937+
});
938+
}
939+
940+
/**
941+
* Internal implementation of V2 entity execution.
911942
*
912943
* @param req - The entity request (V2) from the sidecar.
913944
* @param completionToken - The completion token for the work item.
@@ -918,7 +949,7 @@ export class TaskHubGrpcWorker {
918949
* instead of OperationRequest. It converts the V2 format to V1 format
919950
* (EntityBatchRequest) and delegates to the existing execution logic.
920951
*/
921-
private async _executeEntityV2(
952+
private async _executeEntityV2Internal(
922953
req: pb.EntityRequest,
923954
completionToken: string,
924955
stub: stubs.TaskHubSidecarServiceClient,
@@ -1002,7 +1033,7 @@ export class TaskHubGrpcWorker {
10021033
batchRequest.setOperationsList(operations);
10031034

10041035
// Delegate to the V1 execution logic with V2 operationInfos
1005-
await this._executeEntity(batchRequest, completionToken, stub, operationInfos);
1036+
await this._executeEntityInternal(batchRequest, completionToken, stub, operationInfos);
10061037
}
10071038

10081039
/**

packages/durabletask-js/test/worker-entity.spec.ts

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import { TaskHubGrpcWorker } from "../src/worker/task-hub-grpc-worker";
55
import { TaskEntity } from "../src/entities/task-entity";
66
import { ITaskEntity, EntityFactory } from "../src/entities/task-entity";
77
import { TaskEntityOperation } from "../src/entities/task-entity-operation";
8+
import * as pb from "../src/proto/orchestrator_service_pb";
9+
import * as stubs from "../src/proto/orchestrator_service_grpc_pb";
10+
import { NoOpLogger } from "../src/types/logger.type";
811

912
/**
1013
* Test entity for worker tests.
@@ -20,6 +23,70 @@ class CounterEntity extends TaskEntity<number> {
2023
}
2124
}
2225

26+
const COMPLETION_TOKEN = "test-completion-token";
27+
28+
/**
29+
* Creates a mock gRPC stub that captures the EntityBatchResult passed to
30+
* completeEntityTask.
31+
*/
32+
function createMockStub(): {
33+
stub: stubs.TaskHubSidecarServiceClient;
34+
capturedResult: pb.EntityBatchResult | null;
35+
} {
36+
let capturedResult: pb.EntityBatchResult | null = null;
37+
38+
const stub = {
39+
completeEntityTask: (
40+
result: pb.EntityBatchResult,
41+
metadata: any,
42+
callback: (err: any, res: any) => void,
43+
) => {
44+
capturedResult = result;
45+
callback(null, {});
46+
},
47+
} as unknown as stubs.TaskHubSidecarServiceClient;
48+
49+
return {
50+
stub,
51+
get capturedResult() {
52+
return capturedResult;
53+
},
54+
};
55+
}
56+
57+
/**
58+
* Creates a minimal EntityBatchRequest for testing.
59+
*/
60+
function createEntityBatchRequest(entityName: string, entityKey: string): pb.EntityBatchRequest {
61+
const req = new pb.EntityBatchRequest();
62+
req.setInstanceid(`@${entityName}@${entityKey}`);
63+
64+
const opRequest = new pb.OperationRequest();
65+
opRequest.setOperation("increment");
66+
opRequest.setRequestid("req-1");
67+
req.setOperationsList([opRequest]);
68+
69+
return req;
70+
}
71+
72+
/**
73+
* Creates a minimal EntityRequest (V2) for testing.
74+
*/
75+
function createEntityRequestV2(entityName: string, entityKey: string): pb.EntityRequest {
76+
const req = new pb.EntityRequest();
77+
req.setInstanceid(`@${entityName}@${entityKey}`);
78+
79+
const historyEvent = new pb.HistoryEvent();
80+
const signaled = new pb.EntityOperationSignaledEvent();
81+
signaled.setOperation("increment");
82+
signaled.setRequestid("req-1");
83+
historyEvent.setEntityoperationsignaled(signaled);
84+
req.setOperationrequestsList([historyEvent]);
85+
86+
return req;
87+
}
88+
89+
2390
describe("TaskHubGrpcWorker", () => {
2491
describe("Entity Registration", () => {
2592
describe("addEntity", () => {
@@ -144,4 +211,123 @@ describe("TaskHubGrpcWorker", () => {
144211
expect(true).toBe(true);
145212
});
146213
});
214+
215+
describe("Entity Execution Tracking", () => {
216+
it("should track V1 entity execution in _pendingWorkItems", async () => {
217+
// Arrange
218+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
219+
const factory: EntityFactory = () => new CounterEntity();
220+
worker.addNamedEntity("counter", factory);
221+
222+
const mockStub = createMockStub();
223+
const req = createEntityBatchRequest("counter", "key1");
224+
225+
// Act - call _executeEntity via the wrapper (which tracks the work item)
226+
(worker as any)._executeEntity(req, COMPLETION_TOKEN, mockStub.stub);
227+
228+
// Assert - the promise should be tracked while executing
229+
const pendingWorkItems: Set<Promise<void>> = (worker as any)._pendingWorkItems;
230+
expect(pendingWorkItems.size).toBe(1);
231+
232+
// Wait for completion
233+
await Promise.all(pendingWorkItems);
234+
235+
// After completion, it should be removed
236+
expect(pendingWorkItems.size).toBe(0);
237+
});
238+
239+
it("should remove V1 entity execution from _pendingWorkItems after completion", async () => {
240+
// Arrange
241+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
242+
const factory: EntityFactory = () => new CounterEntity();
243+
worker.addNamedEntity("counter", factory);
244+
245+
const mockStub = createMockStub();
246+
const req = createEntityBatchRequest("counter", "key1");
247+
248+
// Act
249+
(worker as any)._executeEntity(req, COMPLETION_TOKEN, mockStub.stub);
250+
251+
const pendingWorkItems: Set<Promise<void>> = (worker as any)._pendingWorkItems;
252+
253+
// Wait for completion
254+
await Promise.all(pendingWorkItems);
255+
256+
// Assert - should have been cleaned up
257+
expect(pendingWorkItems.size).toBe(0);
258+
expect(mockStub.capturedResult).not.toBeNull();
259+
expect(mockStub.capturedResult!.getCompletiontoken()).toBe(COMPLETION_TOKEN);
260+
});
261+
262+
it("should track V2 entity execution in _pendingWorkItems", async () => {
263+
// Arrange
264+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
265+
const factory: EntityFactory = () => new CounterEntity();
266+
worker.addNamedEntity("counter", factory);
267+
268+
const mockStub = createMockStub();
269+
const req = createEntityRequestV2("counter", "key1");
270+
271+
// Act - call _executeEntityV2 via the wrapper (which tracks the work item)
272+
(worker as any)._executeEntityV2(req, COMPLETION_TOKEN, mockStub.stub);
273+
274+
// Assert - the promise should be tracked while executing
275+
const pendingWorkItems: Set<Promise<void>> = (worker as any)._pendingWorkItems;
276+
expect(pendingWorkItems.size).toBe(1);
277+
278+
// Wait for completion
279+
await Promise.all(pendingWorkItems);
280+
281+
// After completion, it should be removed
282+
expect(pendingWorkItems.size).toBe(0);
283+
});
284+
285+
it("should remove V1 entity execution from _pendingWorkItems even when entity is not found", async () => {
286+
// Arrange
287+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
288+
// Do NOT register any entity — the entity lookup will fail
289+
290+
const mockStub = createMockStub();
291+
const req = createEntityBatchRequest("nonexistent", "key1");
292+
293+
// Act
294+
(worker as any)._executeEntity(req, COMPLETION_TOKEN, mockStub.stub);
295+
296+
const pendingWorkItems: Set<Promise<void>> = (worker as any)._pendingWorkItems;
297+
expect(pendingWorkItems.size).toBe(1);
298+
299+
// Wait for completion
300+
await Promise.all(pendingWorkItems);
301+
302+
// Assert - should be cleaned up even on error path
303+
expect(pendingWorkItems.size).toBe(0);
304+
expect(mockStub.capturedResult).not.toBeNull();
305+
});
306+
307+
it("should track multiple concurrent entity executions in _pendingWorkItems", async () => {
308+
// Arrange
309+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
310+
const factory: EntityFactory = () => new CounterEntity();
311+
worker.addNamedEntity("counter", factory);
312+
313+
const mockStub1 = createMockStub();
314+
const mockStub2 = createMockStub();
315+
const req1 = createEntityBatchRequest("counter", "key1");
316+
const req2 = createEntityBatchRequest("counter", "key2");
317+
318+
// Act - fire two concurrent entity executions
319+
(worker as any)._executeEntity(req1, "token-1", mockStub1.stub);
320+
(worker as any)._executeEntity(req2, "token-2", mockStub2.stub);
321+
322+
// Assert - both should be tracked
323+
const pendingWorkItems: Set<Promise<void>> = (worker as any)._pendingWorkItems;
324+
expect(pendingWorkItems.size).toBe(2);
325+
326+
// Wait for all to complete
327+
await Promise.all(pendingWorkItems);
328+
329+
// Both should be cleaned up
330+
expect(pendingWorkItems.size).toBe(0);
331+
});
332+
});
147333
});

0 commit comments

Comments
 (0)