Skip to content

Commit 3c8d82e

Browse files
committed
Delay flagged version replay callback to marker match
Run a constrained experiment for the interleaved update replay bug by changing VersionStateMachine replay timing only for histories with SKIP_YIELD_ON_VERSION set. In that path, getVersion still returns synchronously, but the replay callback is no longer fired at fake RECORD_MARKER command creation and is instead delayed until the real MARKER_RECORDED event is matched. The goal of the experiment was to verify that flagged histories do not depend on the current early replay callback or its extra eventLoop scheduling. The legacy interleaved update repro history does not have SKIP_YIELD_ON_VERSION, so it continues to fail unchanged and serves as the control case. Verified with: ./gradlew --offline :temporal-sdk:test --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingRemoveTest" --tests "io.temporal.workflow.versionTests.GetVersionRemovedInReplayTest" --tests "io.temporal.workflow.versionTests.GetVersionWithoutCommandEventTest" --tests "io.temporal.workflow.versionTests.GetVersionAndTimerTest" --tests "io.temporal.workflow.versionTests.GetVersionMultipleCallsTest" --tests "io.temporal.workflow.versionTests.GetVersionInSignalTest" --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingTest" --tests "io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest" --tests "io.temporal.internal.replay.GetVersionInterleavedUpdateReplayTaskHandlerTest" ./gradlew --offline :temporal-sdk:test --tests "io.temporal.workflow.versionTests.GetVersionRemovedInReplayTest" --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingRemoveTest" --tests "io.temporal.workflow.versionTests.GetVersionMultipleCallsTest" --tests "io.temporal.workflow.versionTests.GetVersionMultithreadingTest"
1 parent c436443 commit 3c8d82e

File tree

2 files changed

+29
-3
lines changed

2 files changed

+29
-3
lines changed

temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ final class VersionStateMachine {
2424

2525
private final String changeId;
2626
private final Functions.Func<Boolean> replaying;
27+
private final Functions.Func<Boolean> notifyOnMarkerRecordedReplaying;
2728
private final Functions.Proc1<CancellableCommand> commandSink;
2829
private final Functions.Proc1<StateMachine> stateMachineSink;
2930

@@ -264,6 +265,11 @@ void notifySkippedExecuting() {
264265
}
265266

266267
void notifyMarkerCreatedReplaying() {
268+
if (notifyOnMarkerRecordedReplaying.apply()) {
269+
// Flagged histories already get the version synchronously from getVersion(), so delay the
270+
// replay callback until the real marker event is matched.
271+
return;
272+
}
267273
try {
268274
// it's a replay and the version to return from the getVersion call should be preloaded from
269275
// the history
@@ -295,6 +301,14 @@ void flushPreloadedVersionAndUpdateFromEventReplaying() {
295301
Preconditions.checkState(
296302
preloadedVersion != null, "preloadedVersion is expected to be initialized");
297303
flushPreloadedVersionAndUpdateFromEvent(currentEvent);
304+
if (notifyOnMarkerRecordedReplaying.apply()) {
305+
try {
306+
validateVersionAndThrow(false);
307+
notifyFromVersion(false);
308+
} catch (RuntimeException ex) {
309+
notifyFromException(ex);
310+
}
311+
}
298312
}
299313

300314
void notifySkippedReplaying() {
@@ -366,18 +380,22 @@ void flushPreloadedVersionAndUpdateFromEvent(HistoryEvent event) {
366380
public static VersionStateMachine newInstance(
367381
String id,
368382
Functions.Func<Boolean> replaying,
383+
Functions.Func<Boolean> notifyOnMarkerRecordedReplaying,
369384
Functions.Proc1<CancellableCommand> commandSink,
370385
Functions.Proc1<StateMachine> stateMachineSink) {
371-
return new VersionStateMachine(id, replaying, commandSink, stateMachineSink);
386+
return new VersionStateMachine(
387+
id, replaying, notifyOnMarkerRecordedReplaying, commandSink, stateMachineSink);
372388
}
373389

374390
private VersionStateMachine(
375391
String changeId,
376392
Functions.Func<Boolean> replaying,
393+
Functions.Func<Boolean> notifyOnMarkerRecordedReplaying,
377394
Functions.Proc1<CancellableCommand> commandSink,
378395
Functions.Proc1<StateMachine> stateMachineSink) {
379396
this.changeId = Objects.requireNonNull(changeId);
380397
this.replaying = Objects.requireNonNull(replaying);
398+
this.notifyOnMarkerRecordedReplaying = Objects.requireNonNull(notifyOnMarkerRecordedReplaying);
381399
this.commandSink = Objects.requireNonNull(commandSink);
382400
this.stateMachineSink = stateMachineSink;
383401
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,11 @@ private void preloadVersionMarker(HistoryEvent event) {
660660
changeId,
661661
(idKey) ->
662662
VersionStateMachine.newInstance(
663-
changeId, this::isReplaying, commandSink, stateMachineSink));
663+
changeId,
664+
this::isReplaying,
665+
() -> checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION),
666+
commandSink,
667+
stateMachineSink));
664668
Integer version = versionStateMachine.handleMarkersPreload(event);
665669
if (versionStateMachine.isWriteVersionChangeSA()) {
666670
changeVersions.put(changeId, version);
@@ -1248,7 +1252,11 @@ public Integer getVersion(
12481252
changeId,
12491253
(idKey) ->
12501254
VersionStateMachine.newInstance(
1251-
changeId, this::isReplaying, commandSink, stateMachineSink));
1255+
changeId,
1256+
this::isReplaying,
1257+
() -> checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION),
1258+
commandSink,
1259+
stateMachineSink));
12521260
return stateMachine.getVersion(
12531261
minSupported,
12541262
maxSupported,

0 commit comments

Comments
 (0)