Skip to content

Commit 63980a8

Browse files
Fix time skipping for dotnet SDK
1 parent 6ba0947 commit 63980a8

5 files changed

Lines changed: 390 additions & 305 deletions

File tree

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ void startNexusOperation(
107107

108108
void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref);
109109

110+
void failNexusOperationCancelRequest(NexusOperationRef ref, Failure failure);
111+
110112
void completeNexusOperation(NexusOperationRef ref, Payload result);
111113

112114
void completeAsyncNexusOperation(

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ private interface UpdateProcedure {
105105
private final Map<String, Long> activityById = new HashMap<>();
106106
private final Map<Long, StateMachine<ChildWorkflowData>> childWorkflows = new HashMap<>();
107107
private final Map<Long, StateMachine<NexusOperationData>> nexusOperations = new HashMap<>();
108+
// Tracks cancelRequestedEventId by scheduledEventId, persists after operation removal.
109+
private final Map<Long, Long> nexusCancelRequestedEventIds = new HashMap<>();
110+
// Tracks scheduledEventIds of nexus cancel requests that have not yet received a response.
111+
private final Set<Long> unresolvedNexusCancelRequests = new HashSet<>();
108112
private final Map<String, StateMachine<TimerData>> timers = new HashMap<>();
109113
private final Map<String, StateMachine<SignalExternalData>> externalSignals = new HashMap<>();
110114
private final Map<String, StateMachine<CancelExternalData>> externalCancellations =
@@ -486,10 +490,13 @@ public void completeWorkflowTask(
486490
.asRuntimeException();
487491
}
488492

489-
if (unhandledCommand(request) || unhandledMessages(request)) {
493+
if (unhandledCommand(request)
494+
|| unhandledMessages(request)
495+
|| hasUnresolvedNexusCancelWithCompletion(request)) {
490496
// Fail the workflow task if there are new events or messages and a command tries to
491-
// complete the workflow. Record the failure in history, then throw an error to the
492-
// caller (matching real server behavior).
497+
// complete the workflow, or if there are unresolved nexus cancel requests. Record the
498+
// failure in history, then throw an error to the caller (matching real server
499+
// behavior).
493500
failWorkflowTaskWithAReason(
494501
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_UNHANDLED_COMMAND,
495502
null,
@@ -673,6 +680,12 @@ private boolean unhandledMessages(RespondWorkflowTaskCompletedRequest request) {
673680
&& hasCompletionCommand(request.getCommandsList()));
674681
}
675682

683+
private boolean hasUnresolvedNexusCancelWithCompletion(
684+
RespondWorkflowTaskCompletedRequest request) {
685+
return !unresolvedNexusCancelRequests.isEmpty()
686+
&& hasCompletionCommand(request.getCommandsList());
687+
}
688+
676689
private boolean hasCompletionCommand(List<Command> commands) {
677690
for (Command command : commands) {
678691
if (WorkflowExecutionUtils.isWorkflowExecutionCompleteCommand(command)) {
@@ -899,6 +912,12 @@ private void processRequestCancelNexusOperation(
899912
ctx.setNeedWorkflowTask(true);
900913
} else {
901914
operation.action(Action.REQUEST_CANCELLATION, ctx, null, workflowTaskCompletedId);
915+
ctx.onCommit(
916+
historySize -> {
917+
nexusCancelRequestedEventIds.put(
918+
scheduleEventId, operation.getData().cancelRequestedEventId);
919+
unresolvedNexusCancelRequests.add(scheduleEventId);
920+
});
902921
ctx.addTimer(
903922
ProtobufTimeUtils.toJavaDuration(operation.getData().requestTimeout),
904923
() ->
@@ -2339,6 +2358,10 @@ public void startNexusOperation(
23392358
update(
23402359
ctx -> {
23412360
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
2361+
if (operation.getState() == State.STARTED) {
2362+
// Operation was already started (e.g. from a previous attempt before retry).
2363+
return;
2364+
}
23422365
operation.action(StateMachines.Action.START, ctx, resp, 0);
23432366
operation.getData().identity = clientIdentity;
23442367

@@ -2378,13 +2401,30 @@ public void cancelNexusOperation(NexusOperationRef ref, Failure failure) {
23782401
});
23792402
}
23802403

2404+
/**
2405+
* Resolves the cancelRequestedEventId for a nexus operation, checking both the active operations
2406+
* map and the persisted cancel request IDs (for operations that have already completed/removed).
2407+
*/
2408+
private long resolveCancelRequestedEventId(long scheduledEventId) {
2409+
StateMachine<NexusOperationData> operation = nexusOperations.get(scheduledEventId);
2410+
if (operation != null) {
2411+
return operation.getData().cancelRequestedEventId;
2412+
}
2413+
Long stored = nexusCancelRequestedEventIds.get(scheduledEventId);
2414+
return stored != null ? stored : 0;
2415+
}
2416+
23812417
@Override
23822418
public void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref) {
23832419
update(
23842420
ctx -> {
23852421
StateMachine<NexusOperationData> operation =
2386-
getPendingNexusOperation(ref.getScheduledEventId());
2387-
if (!operationInFlight(operation.getState())) {
2422+
nexusOperations.get(ref.getScheduledEventId());
2423+
if (operation != null && !operationInFlight(operation.getState())) {
2424+
return;
2425+
}
2426+
long cancelRequestedEventId = resolveCancelRequestedEventId(ref.getScheduledEventId());
2427+
if (cancelRequestedEventId == 0) {
23882428
return;
23892429
}
23902430
ctx.addEvent(
@@ -2393,12 +2433,39 @@ public void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref) {
23932433
.setNexusOperationCancelRequestCompletedEventAttributes(
23942434
NexusOperationCancelRequestCompletedEventAttributes.newBuilder()
23952435
.setScheduledEventId(ref.getScheduledEventId())
2396-
.setRequestedEventId(operation.getData().cancelRequestedEventId))
2436+
.setRequestedEventId(cancelRequestedEventId))
23972437
.build());
2438+
ctx.onCommit(
2439+
historySize -> unresolvedNexusCancelRequests.remove(ref.getScheduledEventId()));
2440+
scheduleWorkflowTask(ctx);
23982441
ctx.unlockTimer("cancelNexusOperationRequestAcknowledge");
23992442
});
24002443
}
24012444

2445+
@Override
2446+
public void failNexusOperationCancelRequest(NexusOperationRef ref, Failure failure) {
2447+
update(
2448+
ctx -> {
2449+
long cancelRequestedEventId = resolveCancelRequestedEventId(ref.getScheduledEventId());
2450+
if (cancelRequestedEventId == 0) {
2451+
return;
2452+
}
2453+
ctx.addEvent(
2454+
HistoryEvent.newBuilder()
2455+
.setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED)
2456+
.setNexusOperationCancelRequestFailedEventAttributes(
2457+
NexusOperationCancelRequestFailedEventAttributes.newBuilder()
2458+
.setScheduledEventId(ref.getScheduledEventId())
2459+
.setRequestedEventId(cancelRequestedEventId)
2460+
.setFailure(failure))
2461+
.build());
2462+
ctx.onCommit(
2463+
historySize -> unresolvedNexusCancelRequests.remove(ref.getScheduledEventId()));
2464+
scheduleWorkflowTask(ctx);
2465+
ctx.unlockTimer("failNexusOperationCancelRequest");
2466+
});
2467+
}
2468+
24022469
@Override
24032470
public void completeNexusOperation(NexusOperationRef ref, Payload result) {
24042471
update(
@@ -2471,6 +2538,8 @@ private void timeoutNexusOperation(
24712538
}
24722539
operation.action(StateMachines.Action.TIME_OUT, ctx, timeoutType, 0);
24732540
nexusOperations.remove(scheduledEventId);
2541+
// The cancel response won't matter after a timeout, so clear the unresolved cancel.
2542+
unresolvedNexusCancelRequests.remove(scheduledEventId);
24742543
scheduleWorkflowTask(ctx);
24752544
});
24762545
} catch (StatusRuntimeException e) {
@@ -2496,7 +2565,8 @@ private void timeoutNexusRequest(long scheduledEventId, String requestMethod, in
24962565
ctx -> {
24972566
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
24982567
if (attempt != operation.getData().getAttempt()
2499-
|| isTerminalState(operation.getState())) {
2568+
|| isTerminalState(operation.getState())
2569+
|| operation.getState() == State.STARTED) {
25002570
throw Status.NOT_FOUND.withDescription("Timer fired earlier").asRuntimeException();
25012571
}
25022572

@@ -2510,6 +2580,8 @@ private void timeoutNexusRequest(long scheduledEventId, String requestMethod, in
25102580

25112581
if (isTerminalState(operation.getState())) {
25122582
nexusOperations.remove(scheduledEventId);
2583+
// Cancel response won't arrive after terminal state, unblock workflow completion.
2584+
unresolvedNexusCancelRequests.remove(scheduledEventId);
25132585
scheduleWorkflowTask(ctx);
25142586
} else {
25152587
retryNexusTask(ctx, operation);

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1119,7 +1119,12 @@ public void respondNexusTaskFailed(
11191119
NexusTaskToken tt = NexusTaskToken.fromBytes(request.getTaskToken());
11201120
TestWorkflowMutableState mutableState =
11211121
getMutableState(tt.getOperationRef().getExecutionId());
1122-
if (mutableState.validateOperationTaskToken(tt)) {
1122+
Failure failure = handlerErrorToFailure(request.getError());
1123+
if (tt.isCancel()) {
1124+
// For cancel failures, the operation may already be completed/removed,
1125+
// so skip token validation and record the event directly.
1126+
mutableState.failNexusOperationCancelRequest(tt.getOperationRef(), failure);
1127+
} else if (mutableState.validateOperationTaskToken(tt)) {
11231128
mutableState.failNexusOperation(tt.getOperationRef(), failure);
11241129
}
11251130
responseObserver.onNext(RespondNexusTaskFailedResponse.getDefaultInstance());

0 commit comments

Comments
 (0)