Skip to content

Commit 7a36960

Browse files
committed
Delay flagged version replay callback to marker match
Run a constrained experiment for the interleaved update replay bug by changing VersionStateMachine replay timing only for histories with SKIP_YIELD_ON_VERSION set. In that path, getVersion still returns synchronously, but the replay callback is no longer fired at fake RECORD_MARKER command creation and is instead delayed until the real MARKER_RECORDED event is matched. The goal of the experiment was to verify that flagged histories do not depend on the current early replay callback or its extra eventLoop scheduling. The legacy interleaved update repro history does not have SKIP_YIELD_ON_VERSION, so it continues to fail unchanged and serves as the control case. Verified with: ./gradlew --offline :temporal-sdk:test --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingRemoveTest" --tests "io.temporal.workflow.versionTests.GetVersionRemovedInReplayTest" --tests "io.temporal.workflow.versionTests.GetVersionWithoutCommandEventTest" --tests "io.temporal.workflow.versionTests.GetVersionAndTimerTest" --tests "io.temporal.workflow.versionTests.GetVersionMultipleCallsTest" --tests "io.temporal.workflow.versionTests.GetVersionInSignalTest" --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingTest" --tests "io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest" --tests "io.temporal.internal.replay.GetVersionInterleavedUpdateReplayTaskHandlerTest" ./gradlew --offline :temporal-sdk:test --tests "io.temporal.workflow.versionTests.GetVersionRemovedInReplayTest" --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingRemoveTest" --tests "io.temporal.workflow.versionTests.GetVersionMultipleCallsTest" --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingTest"
1 parent b9e5376 commit 7a36960

File tree

2 files changed

+29
-3
lines changed

2 files changed

+29
-3
lines changed

temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ final class VersionStateMachine {
2424

2525
private final String changeId;
2626
private final Functions.Func<Boolean> replaying;
27+
private final Functions.Func<Boolean> notifyOnMarkerRecordedReplaying;
2728
private final Functions.Proc1<CancellableCommand> commandSink;
2829
private final Functions.Proc1<StateMachine> stateMachineSink;
2930

@@ -264,6 +265,11 @@ void notifySkippedExecuting() {
264265
}
265266

266267
void notifyMarkerCreatedReplaying() {
268+
if (notifyOnMarkerRecordedReplaying.apply()) {
269+
// Flagged histories already get the version synchronously from getVersion(), so delay the
270+
// replay callback until the real marker event is matched.
271+
return;
272+
}
267273
try {
268274
// it's a replay and the version to return from the getVersion call should be preloaded from
269275
// the history
@@ -295,6 +301,14 @@ void flushPreloadedVersionAndUpdateFromEventReplaying() {
295301
Preconditions.checkState(
296302
preloadedVersion != null, "preloadedVersion is expected to be initialized");
297303
flushPreloadedVersionAndUpdateFromEvent(currentEvent);
304+
if (notifyOnMarkerRecordedReplaying.apply()) {
305+
try {
306+
validateVersionAndThrow(false);
307+
notifyFromVersion(false);
308+
} catch (RuntimeException ex) {
309+
notifyFromException(ex);
310+
}
311+
}
298312
}
299313

300314
void notifySkippedReplaying() {
@@ -366,18 +380,22 @@ void flushPreloadedVersionAndUpdateFromEvent(HistoryEvent event) {
366380
public static VersionStateMachine newInstance(
367381
String id,
368382
Functions.Func<Boolean> replaying,
383+
Functions.Func<Boolean> notifyOnMarkerRecordedReplaying,
369384
Functions.Proc1<CancellableCommand> commandSink,
370385
Functions.Proc1<StateMachine> stateMachineSink) {
371-
return new VersionStateMachine(id, replaying, commandSink, stateMachineSink);
386+
return new VersionStateMachine(
387+
id, replaying, notifyOnMarkerRecordedReplaying, commandSink, stateMachineSink);
372388
}
373389

374390
private VersionStateMachine(
375391
String changeId,
376392
Functions.Func<Boolean> replaying,
393+
Functions.Func<Boolean> notifyOnMarkerRecordedReplaying,
377394
Functions.Proc1<CancellableCommand> commandSink,
378395
Functions.Proc1<StateMachine> stateMachineSink) {
379396
this.changeId = Objects.requireNonNull(changeId);
380397
this.replaying = Objects.requireNonNull(replaying);
398+
this.notifyOnMarkerRecordedReplaying = Objects.requireNonNull(notifyOnMarkerRecordedReplaying);
381399
this.commandSink = Objects.requireNonNull(commandSink);
382400
this.stateMachineSink = stateMachineSink;
383401
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,11 @@ private void preloadVersionMarker(HistoryEvent event) {
647647
changeId,
648648
(idKey) ->
649649
VersionStateMachine.newInstance(
650-
changeId, this::isReplaying, commandSink, stateMachineSink));
650+
changeId,
651+
this::isReplaying,
652+
() -> checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION),
653+
commandSink,
654+
stateMachineSink));
651655
Integer version = versionStateMachine.handleMarkersPreload(event);
652656
if (versionStateMachine.isWriteVersionChangeSA()) {
653657
changeVersions.put(changeId, version);
@@ -1235,7 +1239,11 @@ public Integer getVersion(
12351239
changeId,
12361240
(idKey) ->
12371241
VersionStateMachine.newInstance(
1238-
changeId, this::isReplaying, commandSink, stateMachineSink));
1242+
changeId,
1243+
this::isReplaying,
1244+
() -> checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION),
1245+
commandSink,
1246+
stateMachineSink));
12391247
return stateMachine.getVersion(
12401248
minSupported,
12411249
maxSupported,

0 commit comments

Comments
 (0)