Skip to content

Commit 0164b45

Browse files
committed
Delay version replay callback until marker match
Change VersionStateMachine replay semantics so getVersion no longer resumes workflow code when the fake RECORD_MARKER command is created. Replay now waits until the real MARKER_RECORDED event is matched before firing the version callback, which makes version-marker ordering consistent with replayed side effects. This fixes the interleaved update replay bug reproduced by testGetVersionInterleavedUpdateReplayHistory.json. That history previously failed replay with [TMPRL1100] because the second getVersion callback ran ahead of update completion protocol handling. After this change, the same recorded history replays successfully through both WorkflowReplayer and the lower-level direct-query replay task handler. The earlier flag-gated experiment showed that delaying the callback was already safe for histories with SKIP_YIELD_ON_VERSION. This commit removes that temporary gating and applies the same replay ordering to all histories. Verified with: ./gradlew --offline :temporal-sdk:test --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingRemoveTest" --tests "io.temporal.workflow.versionTests.GetVersionRemovedInReplayTest" --tests "io.temporal.workflow.versionTests.GetVersionWithoutCommandEventTest" --tests "io.temporal.workflow.versionTests.GetVersionAndTimerTest" --tests "io.temporal.workflow.versionTests.GetVersionMultipleCallsTest" --tests "io.temporal.workflow.versionTests.GetVersionInSignalTest" --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingTest" --tests "io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest" --tests "io.temporal.internal.replay.GetVersionInterleavedUpdateReplayTaskHandlerTest" ./gradlew --offline :temporal-sdk:test --tests "io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest" --tests "io.temporal.internal.replay.GetVersionInterleavedUpdateReplayTaskHandlerTest"
1 parent 3c8d82e commit 0164b45

File tree

4 files changed

+21
-112
lines changed

4 files changed

+21
-112
lines changed

temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ final class VersionStateMachine {
2424

2525
private final String changeId;
2626
private final Functions.Func<Boolean> replaying;
27-
private final Functions.Func<Boolean> notifyOnMarkerRecordedReplaying;
2827
private final Functions.Proc1<CancellableCommand> commandSink;
2928
private final Functions.Proc1<StateMachine> stateMachineSink;
3029

@@ -265,20 +264,8 @@ void notifySkippedExecuting() {
265264
}
266265

267266
void notifyMarkerCreatedReplaying() {
268-
if (notifyOnMarkerRecordedReplaying.apply()) {
269-
// Flagged histories already get the version synchronously from getVersion(), so delay the
270-
// replay callback until the real marker event is matched.
271-
return;
272-
}
273-
try {
274-
// it's a replay and the version to return from the getVersion call should be preloaded from
275-
// the history
276-
final boolean usePreloadedVersion = true;
277-
validateVersionAndThrow(usePreloadedVersion);
278-
notifyFromVersion(usePreloadedVersion);
279-
} catch (RuntimeException ex) {
280-
notifyFromException(ex);
281-
}
267+
// Replay already preloads the version value, so delay the callback until the real marker
268+
// event is matched.
282269
}
283270

284271
State createMarkerReplaying() {
@@ -301,13 +288,11 @@ void flushPreloadedVersionAndUpdateFromEventReplaying() {
301288
Preconditions.checkState(
302289
preloadedVersion != null, "preloadedVersion is expected to be initialized");
303290
flushPreloadedVersionAndUpdateFromEvent(currentEvent);
304-
if (notifyOnMarkerRecordedReplaying.apply()) {
305-
try {
306-
validateVersionAndThrow(false);
307-
notifyFromVersion(false);
308-
} catch (RuntimeException ex) {
309-
notifyFromException(ex);
310-
}
291+
try {
292+
validateVersionAndThrow(false);
293+
notifyFromVersion(false);
294+
} catch (RuntimeException ex) {
295+
notifyFromException(ex);
311296
}
312297
}
313298

@@ -380,22 +365,18 @@ void flushPreloadedVersionAndUpdateFromEvent(HistoryEvent event) {
380365
public static VersionStateMachine newInstance(
381366
String id,
382367
Functions.Func<Boolean> replaying,
383-
Functions.Func<Boolean> notifyOnMarkerRecordedReplaying,
384368
Functions.Proc1<CancellableCommand> commandSink,
385369
Functions.Proc1<StateMachine> stateMachineSink) {
386-
return new VersionStateMachine(
387-
id, replaying, notifyOnMarkerRecordedReplaying, commandSink, stateMachineSink);
370+
return new VersionStateMachine(id, replaying, commandSink, stateMachineSink);
388371
}
389372

390373
private VersionStateMachine(
391374
String changeId,
392375
Functions.Func<Boolean> replaying,
393-
Functions.Func<Boolean> notifyOnMarkerRecordedReplaying,
394376
Functions.Proc1<CancellableCommand> commandSink,
395377
Functions.Proc1<StateMachine> stateMachineSink) {
396378
this.changeId = Objects.requireNonNull(changeId);
397379
this.replaying = Objects.requireNonNull(replaying);
398-
this.notifyOnMarkerRecordedReplaying = Objects.requireNonNull(notifyOnMarkerRecordedReplaying);
399380
this.commandSink = Objects.requireNonNull(commandSink);
400381
this.stateMachineSink = stateMachineSink;
401382
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -660,11 +660,7 @@ private void preloadVersionMarker(HistoryEvent event) {
660660
changeId,
661661
(idKey) ->
662662
VersionStateMachine.newInstance(
663-
changeId,
664-
this::isReplaying,
665-
() -> checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION),
666-
commandSink,
667-
stateMachineSink));
663+
changeId, this::isReplaying, commandSink, stateMachineSink));
668664
Integer version = versionStateMachine.handleMarkersPreload(event);
669665
if (versionStateMachine.isWriteVersionChangeSA()) {
670666
changeVersions.put(changeId, version);
@@ -1252,11 +1248,7 @@ public Integer getVersion(
12521248
changeId,
12531249
(idKey) ->
12541250
VersionStateMachine.newInstance(
1255-
changeId,
1256-
this::isReplaying,
1257-
() -> checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION),
1258-
commandSink,
1259-
stateMachineSink));
1251+
changeId, this::isReplaying, commandSink, stateMachineSink));
12601252
return stateMachine.getVersion(
12611253
minSupported,
12621254
maxSupported,

temporal-sdk/src/test/java/io/temporal/internal/replay/GetVersionInterleavedUpdateReplayTaskHandlerTest.java

Lines changed: 8 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
package io.temporal.internal.replay;
22

33
import static org.junit.Assert.assertEquals;
4-
import static org.junit.Assert.assertNotEquals;
5-
import static org.junit.Assert.assertThrows;
6-
import static org.junit.Assert.assertTrue;
4+
import static org.junit.Assert.assertFalse;
5+
import static org.junit.Assert.assertNotNull;
76

87
import com.uber.m3.tally.NoopScope;
9-
import io.temporal.api.command.v1.Command;
10-
import io.temporal.api.enums.v1.CommandType;
118
import io.temporal.api.history.v1.HistoryEvent;
129
import io.temporal.api.query.v1.WorkflowQuery;
1310
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
@@ -30,16 +27,12 @@
3027
public class GetVersionInterleavedUpdateReplayTaskHandlerTest {
3128
private static final String HISTORY_RESOURCE =
3229
"testGetVersionInterleavedUpdateReplayHistory.json";
33-
private static final String EXPECTED_NON_DETERMINISTIC_MESSAGE =
34-
"[TMPRL1100] getVersion call before the existing version marker event. The most probable cause is retroactive addition of a getVersion call with an existing 'changeId'";
35-
private static final String EXPECTED_NON_DETERMINISTIC_FRAGMENT =
36-
"io.temporal.worker.NonDeterministicException: " + EXPECTED_NON_DETERMINISTIC_MESSAGE;
3730
private static final String EXPECTED_FIRST_CHANGE_ID = "ChangeId1";
3831
private static final String EXPECTED_SECOND_CHANGE_ID = "ChangeId2";
3932
private static final String TEST_TASK_QUEUE = "get-version-interleaved-update-replay";
4033

4134
@Test
42-
public void testReplayQueuesSecondVersionMarkerBeforeUpdateCompletionCommands() throws Exception {
35+
public void testReplayDirectQueryWorkflowTaskSucceeds() throws Throwable {
4336
WorkflowExecutionHistory history =
4437
WorkflowHistoryLoader.readHistoryFromResource(HISTORY_RESOURCE);
4538
assertEquals(
@@ -62,27 +55,11 @@ public void testReplayQueuesSecondVersionMarkerBeforeUpdateCompletionCommands()
6255
ServiceWorkflowHistoryIterator historyIterator =
6356
new ServiceWorkflowHistoryIterator(service, namespace, replayTask, new NoopScope());
6457

65-
ReplayWorkflowRunTaskHandler replayHandler = runTaskHandler;
66-
RuntimeException thrown =
67-
assertThrows(
68-
RuntimeException.class,
69-
() -> replayHandler.handleDirectQueryWorkflowTask(replayTask, historyIterator));
70-
assertTrue(
71-
"Expected replay failure to contain the nondeterminism marker, but got: " + thrown,
72-
throwableChainContains(thrown, EXPECTED_NON_DETERMINISTIC_FRAGMENT)
73-
|| throwableChainContains(thrown, EXPECTED_NON_DETERMINISTIC_MESSAGE));
74-
75-
List<Command> pendingCommands = runTaskHandler.getWorkflowStateMachines().takeCommands();
76-
int versionMarkerIndex = indexOfVersionMarker(pendingCommands);
77-
int protocolMessageIndex =
78-
indexOfCommandType(pendingCommands, CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE);
79-
80-
assertNotEquals(
81-
"Expected a pending Version marker command after replay failure", -1, versionMarkerIndex);
82-
assertTrue(
83-
"Expected the pending Version marker to be queued before any update completion protocol command: "
84-
+ pendingCommands,
85-
protocolMessageIndex == -1 || versionMarkerIndex < protocolMessageIndex);
58+
QueryResult result =
59+
runTaskHandler.handleDirectQueryWorkflowTask(replayTask, historyIterator);
60+
assertNotNull(result);
61+
assertFalse(result.isWorkflowMethodCompleted());
62+
assertFalse(result.getResponsePayloads().isPresent());
8663
} finally {
8764
if (runTaskHandler != null) {
8865
runTaskHandler.close();
@@ -139,35 +116,6 @@ private static List<String> extractVersionChangeIds(List<HistoryEvent> events) {
139116
return changeIds;
140117
}
141118

142-
private static int indexOfVersionMarker(List<Command> commands) {
143-
for (int i = 0; i < commands.size(); i++) {
144-
if (VersionMarkerUtils.hasVersionMarkerStructure(commands.get(i))) {
145-
return i;
146-
}
147-
}
148-
return -1;
149-
}
150-
151-
private static int indexOfCommandType(List<Command> commands, CommandType commandType) {
152-
for (int i = 0; i < commands.size(); i++) {
153-
if (commands.get(i).getCommandType() == commandType) {
154-
return i;
155-
}
156-
}
157-
return -1;
158-
}
159-
160-
private static boolean throwableChainContains(Throwable throwable, String expected) {
161-
Throwable current = throwable;
162-
while (current != null) {
163-
if (String.valueOf(current).contains(expected)) {
164-
return true;
165-
}
166-
current = current.getCause();
167-
}
168-
return false;
169-
}
170-
171119
private static <T> T getField(Object target, String fieldName, Class<T> expectedType)
172120
throws Exception {
173121
Field field = target.getClass().getDeclaredField(fieldName);

temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInterleavedUpdateReplayTest.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package io.temporal.workflow.versionTests;
22

3-
import static org.junit.Assert.assertThrows;
4-
import static org.junit.Assert.assertTrue;
5-
63
import io.temporal.activity.ActivityInterface;
74
import io.temporal.activity.ActivityMethod;
85
import io.temporal.activity.ActivityOptions;
@@ -26,20 +23,11 @@
2623
public class GetVersionInterleavedUpdateReplayTest {
2724
private static final String HISTORY_RESOURCE =
2825
"testGetVersionInterleavedUpdateReplayHistory.json";
29-
private static final String EXPECTED_NON_DETERMINISTIC_MESSAGE =
30-
"[TMPRL1100] getVersion call before the existing version marker event. The most probable cause is retroactive addition of a getVersion call with an existing 'changeId'";
31-
private static final String EXPECTED_NON_DETERMINISTIC_FRAGMENT =
32-
"io.temporal.worker.NonDeterministicException: " + EXPECTED_NON_DETERMINISTIC_MESSAGE;
3326

3427
@Test
35-
public void testReplayHistory() {
36-
RuntimeException thrown =
37-
assertThrows(
38-
RuntimeException.class,
39-
() ->
40-
WorkflowReplayer.replayWorkflowExecutionFromResource(
41-
HISTORY_RESOURCE, GreetingWorkflowImpl.class));
42-
assertTrue(thrown.getMessage().contains(EXPECTED_NON_DETERMINISTIC_FRAGMENT));
28+
public void testReplayHistory() throws Exception {
29+
WorkflowReplayer.replayWorkflowExecutionFromResource(
30+
HISTORY_RESOURCE, GreetingWorkflowImpl.class);
4331
}
4432

4533
public static class Request {

0 commit comments

Comments
 (0)