Skip to content

Commit 43ab9df

Browse files
committed
Gate VersionStateMachine behavior correction behind new sdk flag
`VERSION_WAIT_FOR_MARKER`
1 parent c7b2d39 commit 43ab9df

File tree

3 files changed

+36
-8
lines changed

3 files changed

+36
-8
lines changed

temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ public enum SdkFlag {
2323
* condition is resolved before the timeout.
2424
*/
2525
CANCEL_AWAIT_TIMER_ON_CONDITION(4),
26+
/*
27+
* Changes replay behavior of GetVersion to wait for the matching marker event before executing
28+
* the callback.
29+
*/
30+
VERSION_WAIT_FOR_MARKER(5),
2631
UNKNOWN(Integer.MAX_VALUE);
2732

2833
private final int value;

temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,17 +133,20 @@ class InvocationStateMachine
133133

134134
private final int minSupported;
135135
private final int maxSupported;
136+
private final boolean waitForMarkerRecordedReplaying;
136137
private final Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback;
137138
private final Functions.Proc2<Integer, RuntimeException> resultCallback;
138139

139140
InvocationStateMachine(
140141
int minSupported,
141142
int maxSupported,
143+
boolean waitForMarkerRecordedReplaying,
142144
Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback,
143145
Functions.Proc2<Integer, RuntimeException> callback) {
144146
super(STATE_MACHINE_DEFINITION, VersionStateMachine.this.commandSink, stateMachineSink);
145147
this.minSupported = minSupported;
146148
this.maxSupported = maxSupported;
149+
this.waitForMarkerRecordedReplaying = waitForMarkerRecordedReplaying;
147150
this.upsertSearchAttributeCallback = upsertSearchAttributeCallback;
148151
this.resultCallback = Objects.requireNonNull(callback);
149152
}
@@ -264,8 +267,20 @@ void notifySkippedExecuting() {
264267
}
265268

266269
void notifyMarkerCreatedReplaying() {
267-
// Replay already preloads the version value, so delay the callback until the real marker
268-
// event is matched.
270+
if (waitForMarkerRecordedReplaying) {
271+
// Replay already preloads the version value, so delay the callback until the real marker
272+
// event is matched.
273+
return;
274+
}
275+
try {
276+
// It's a replay and the version to return from the getVersion call should be preloaded
277+
// from the history.
278+
final boolean usePreloadedVersion = true;
279+
validateVersionAndThrow(usePreloadedVersion);
280+
notifyFromVersion(usePreloadedVersion);
281+
} catch (RuntimeException ex) {
282+
notifyFromException(ex);
283+
}
269284
}
270285

271286
State createMarkerReplaying() {
@@ -288,11 +303,13 @@ void flushPreloadedVersionAndUpdateFromEventReplaying() {
288303
Preconditions.checkState(
289304
preloadedVersion != null, "preloadedVersion is expected to be initialized");
290305
flushPreloadedVersionAndUpdateFromEvent(currentEvent);
291-
try {
292-
validateVersionAndThrow(false);
293-
notifyFromVersion(false);
294-
} catch (RuntimeException ex) {
295-
notifyFromException(ex);
306+
if (waitForMarkerRecordedReplaying) {
307+
try {
308+
validateVersionAndThrow(false);
309+
notifyFromVersion(false);
310+
} catch (RuntimeException ex) {
311+
notifyFromException(ex);
312+
}
296313
}
297314
}
298315

@@ -392,11 +409,16 @@ private VersionStateMachine(
392409
public Integer getVersion(
393410
int minSupported,
394411
int maxSupported,
412+
boolean waitForMarkerRecordedReplaying,
395413
Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback,
396414
Functions.Proc2<Integer, RuntimeException> callback) {
397415
InvocationStateMachine ism =
398416
new InvocationStateMachine(
399-
minSupported, maxSupported, upsertSearchAttributeCallback, callback);
417+
minSupported,
418+
maxSupported,
419+
waitForMarkerRecordedReplaying,
420+
upsertSearchAttributeCallback,
421+
callback);
400422
ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
401423
ism.explicitEvent(ExplicitEvent.SCHEDULE);
402424
// If the state is SKIPPED_REPLAYING that means we:

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,6 +1252,7 @@ public Integer getVersion(
12521252
return stateMachine.getVersion(
12531253
minSupported,
12541254
maxSupported,
1255+
checkSdkFlag(SdkFlag.VERSION_WAIT_FOR_MARKER),
12551256
(version) -> {
12561257
if (!workflowImplOptions.isEnableUpsertVersionSearchAttributes()) {
12571258
return null;

0 commit comments

Comments
 (0)