Skip to content

Commit a7a1d91

Browse files
committed
Refactor entity locking and orchestration execution logic; improve test coverage for entity operations
1 parent 9c07ea8 commit a7a1d91

7 files changed

Lines changed: 200 additions & 124 deletions

File tree

packages/durabletask-js/src/client/client.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,7 +1031,7 @@ export class TaskHubGrpcClient {
10311031
getEntities<T = unknown>(query?: EntityQuery): AsyncPageable<EntityMetadata<T>> {
10321032
const includeState = query?.includeState ?? true;
10331033

1034-
return AsyncPageable.create(async (continuationToken?: string): Promise<Page<EntityMetadata<T>>> => {
1034+
return createAsyncPageable(async (continuationToken?: string): Promise<Page<EntityMetadata<T>>> => {
10351035
const req = new pb.QueryEntitiesRequest();
10361036
const protoQuery = new pb.EntityQuery();
10371037

@@ -1081,10 +1081,7 @@ export class TaskHubGrpcClient {
10811081
const entities = res.getEntitiesList();
10821082
const values = entities.map((protoMetadata) => this.convertEntityMetadata<T>(protoMetadata, includeState));
10831083

1084-
return {
1085-
values,
1086-
continuationToken: res.getContinuationtoken()?.getValue(),
1087-
};
1084+
return new Page(values, res.getContinuationtoken()?.getValue());
10881085
});
10891086
}
10901087

@@ -1152,7 +1149,7 @@ export class TaskHubGrpcClient {
11521149
return createEntityMetadataWithoutState(entityId, lastModifiedTime, backlogQueueSize, lockedBy) as EntityMetadata<T>;
11531150
}
11541151
}
1155-
1152+
11561153
/**
11571154
* Helper method to create an OrchestrationState from a protobuf OrchestrationState.
11581155
*/

packages/durabletask-js/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ export {
105105
// Entity types - Worker-side operation types (Step 3)
106106
export { SignalEntityOptions, CallEntityOptions } from "./entities/signal-entity-options";
107107
export { TaskEntityState } from "./entities/task-entity-state";
108-
export { TaskEntityContext, StartOrchestrationOptions } from "./entities/task-entity-context";
108+
export { TaskEntityContext } from "./entities/task-entity-context";
109109
export { TaskEntityOperation } from "./entities/task-entity-operation";
110110

111111
// Entity interface and base class (Step 4)

packages/durabletask-js/src/worker/orchestration-executor.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,26 @@ export class OrchestrationExecutor {
583583
}
584584
}
585585
break;
586+
// This history event confirms that the lock request was successfully scheduled.
587+
// Remove the action from the pending action list so we don't schedule it again.
588+
// The pending lock request in _entityFeature.pendingLockRequests remains to receive the granted event.
589+
case pb.HistoryEvent.EventtypeCase.ENTITYLOCKREQUESTED:
590+
{
591+
const eventId = event.getEventid();
592+
const action = ctx._pendingActions[eventId];
593+
delete ctx._pendingActions[eventId];
594+
595+
const isSendEntityMessageAction = action?.hasSendentitymessage();
596+
597+
if (!action) {
598+
throw getNonDeterminismError(eventId, "lockEntities");
599+
} else if (!isSendEntityMessageAction) {
600+
throw getWrongActionTypeError(eventId, "lockEntities", action);
601+
} else if (!action.getSendentitymessage()?.hasEntitylockrequested()) {
602+
throw getWrongActionTypeError(eventId, "lockEntities (EntityLockRequested)", action);
603+
}
604+
}
605+
break;
586606
case pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONCOMPLETED:
587607
{
588608
const completedEvent = event.getEntityoperationcompleted();

packages/durabletask-js/src/worker/runtime-orchestration-context.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
3737
_newGuidCounter: number;
3838
_currentUtcDatetime: Date;
3939
_instanceId: string;
40-
_executionId: string;
40+
_executionId: string = "";
4141
_version: string;
4242
_parent?: ParentOrchestrationInstance;
4343
_completionStatus?: pb.OrchestrationStatus;
@@ -69,7 +69,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
6969
this._newInput = undefined;
7070
this._saveEvents = false;
7171
this._customStatus = undefined;
72-
this._entityFeature = new RuntimeOrchestrationEntityFeature();
72+
this._entityFeature = new RuntimeOrchestrationEntityFeature(this);
7373
}
7474

7575
get instanceId(): string {

0 commit comments

Comments
 (0)