Skip to content

Commit fb2defd

Browse files
Fix time skipping for dotnet SDK (#2805)
Fix time skipping for dotnet SDK
1 parent 9516965 commit fb2defd

File tree

5 files changed

+393
-302
lines changed

5 files changed

+393
-302
lines changed

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ void startNexusOperation(
107107

108108
void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref);
109109

110+
void failNexusOperationCancelRequest(NexusOperationRef ref, Failure failure);
111+
110112
void completeNexusOperation(NexusOperationRef ref, Payload result);
111113

112114
void completeAsyncNexusOperation(

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ private interface UpdateProcedure {
105105
private final Map<String, Long> activityById = new HashMap<>();
106106
private final Map<Long, StateMachine<ChildWorkflowData>> childWorkflows = new HashMap<>();
107107
private final Map<Long, StateMachine<NexusOperationData>> nexusOperations = new HashMap<>();
108+
// Tracks cancelRequestedEventId by scheduledEventId, persists after operation removal.
109+
private final Map<Long, Long> nexusCancelRequestedEventIds = new HashMap<>();
108110
private final Map<String, StateMachine<TimerData>> timers = new HashMap<>();
109111
private final Map<String, StateMachine<SignalExternalData>> externalSignals = new HashMap<>();
110112
private final Map<String, StateMachine<CancelExternalData>> externalCancellations =
@@ -899,6 +901,11 @@ private void processRequestCancelNexusOperation(
899901
ctx.setNeedWorkflowTask(true);
900902
} else {
901903
operation.action(Action.REQUEST_CANCELLATION, ctx, null, workflowTaskCompletedId);
904+
ctx.onCommit(
905+
historySize -> {
906+
nexusCancelRequestedEventIds.put(
907+
scheduleEventId, operation.getData().cancelRequestedEventId);
908+
});
902909
ctx.addTimer(
903910
ProtobufTimeUtils.toJavaDuration(operation.getData().requestTimeout),
904911
() ->
@@ -2339,6 +2346,10 @@ public void startNexusOperation(
23392346
update(
23402347
ctx -> {
23412348
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
2349+
if (operation.getState() == State.STARTED) {
2350+
// Operation was already started (e.g. from a previous attempt before retry).
2351+
return;
2352+
}
23422353
operation.action(StateMachines.Action.START, ctx, resp, 0);
23432354
operation.getData().identity = clientIdentity;
23442355

@@ -2378,13 +2389,30 @@ public void cancelNexusOperation(NexusOperationRef ref, Failure failure) {
23782389
});
23792390
}
23802391

2392+
/**
2393+
* Resolves the cancelRequestedEventId for a nexus operation, checking both the active operations
2394+
* map and the persisted cancel request IDs (for operations that have already completed/removed).
2395+
*/
2396+
private long resolveCancelRequestedEventId(long scheduledEventId) {
2397+
StateMachine<NexusOperationData> operation = nexusOperations.get(scheduledEventId);
2398+
if (operation != null) {
2399+
return operation.getData().cancelRequestedEventId;
2400+
}
2401+
Long stored = nexusCancelRequestedEventIds.get(scheduledEventId);
2402+
return stored != null ? stored : 0;
2403+
}
2404+
23812405
@Override
23822406
public void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref) {
23832407
update(
23842408
ctx -> {
23852409
StateMachine<NexusOperationData> operation =
2386-
getPendingNexusOperation(ref.getScheduledEventId());
2387-
if (!operationInFlight(operation.getState())) {
2410+
nexusOperations.get(ref.getScheduledEventId());
2411+
if (operation != null && !operationInFlight(operation.getState())) {
2412+
return;
2413+
}
2414+
long cancelRequestedEventId = resolveCancelRequestedEventId(ref.getScheduledEventId());
2415+
if (cancelRequestedEventId == 0) {
23882416
return;
23892417
}
23902418
ctx.addEvent(
@@ -2393,12 +2421,35 @@ public void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref) {
23932421
.setNexusOperationCancelRequestCompletedEventAttributes(
23942422
NexusOperationCancelRequestCompletedEventAttributes.newBuilder()
23952423
.setScheduledEventId(ref.getScheduledEventId())
2396-
.setRequestedEventId(operation.getData().cancelRequestedEventId))
2424+
.setRequestedEventId(cancelRequestedEventId))
23972425
.build());
2426+
scheduleWorkflowTask(ctx);
23982427
ctx.unlockTimer("cancelNexusOperationRequestAcknowledge");
23992428
});
24002429
}
24012430

2431+
@Override
2432+
public void failNexusOperationCancelRequest(NexusOperationRef ref, Failure failure) {
2433+
update(
2434+
ctx -> {
2435+
long cancelRequestedEventId = resolveCancelRequestedEventId(ref.getScheduledEventId());
2436+
if (cancelRequestedEventId == 0) {
2437+
return;
2438+
}
2439+
ctx.addEvent(
2440+
HistoryEvent.newBuilder()
2441+
.setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED)
2442+
.setNexusOperationCancelRequestFailedEventAttributes(
2443+
NexusOperationCancelRequestFailedEventAttributes.newBuilder()
2444+
.setScheduledEventId(ref.getScheduledEventId())
2445+
.setRequestedEventId(cancelRequestedEventId)
2446+
.setFailure(failure))
2447+
.build());
2448+
scheduleWorkflowTask(ctx);
2449+
ctx.unlockTimer("failNexusOperationCancelRequest");
2450+
});
2451+
}
2452+
24022453
@Override
24032454
public void completeNexusOperation(NexusOperationRef ref, Payload result) {
24042455
update(
@@ -2496,7 +2547,8 @@ private void timeoutNexusRequest(long scheduledEventId, String requestMethod, in
24962547
ctx -> {
24972548
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
24982549
if (attempt != operation.getData().getAttempt()
2499-
|| isTerminalState(operation.getState())) {
2550+
|| isTerminalState(operation.getState())
2551+
|| operation.getState() == State.STARTED) {
25002552
throw Status.NOT_FOUND.withDescription("Timer fired earlier").asRuntimeException();
25012553
}
25022554

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1119,7 +1119,11 @@ public void respondNexusTaskFailed(
11191119
NexusTaskToken tt = NexusTaskToken.fromBytes(request.getTaskToken());
11201120
TestWorkflowMutableState mutableState =
11211121
getMutableState(tt.getOperationRef().getExecutionId());
1122-
if (mutableState.validateOperationTaskToken(tt)) {
1122+
if (tt.isCancel()) {
1123+
// For cancel failures, the operation may already be completed/removed,
1124+
// so skip token validation and record the event directly.
1125+
mutableState.failNexusOperationCancelRequest(tt.getOperationRef(), failure);
1126+
} else if (mutableState.validateOperationTaskToken(tt)) {
11231127
mutableState.failNexusOperation(tt.getOperationRef(), failure);
11241128
}
11251129
responseObserver.onNext(RespondNexusTaskFailedResponse.getDefaultInstance());
@@ -1193,6 +1197,7 @@ public void completeNexusOperation(
11931197

11941198
private static Failure handlerErrorToFailure(HandlerError err) {
11951199
return Failure.newBuilder()
1200+
.setMessage(err.getFailure().getMessage())
11961201
.setNexusHandlerFailureInfo(
11971202
NexusHandlerFailureInfo.newBuilder()
11981203
.setType(err.getErrorType())

0 commit comments

Comments
 (0)