66import com .google .common .annotations .VisibleForTesting ;
77import com .google .common .base .Preconditions ;
88import com .google .common .base .Strings ;
9+ import io .temporal .api .command .v1 .Command ;
910import io .temporal .api .command .v1 .RecordMarkerCommandAttributes ;
11+ import io .temporal .api .common .v1 .SearchAttributes ;
1012import io .temporal .api .enums .v1 .CommandType ;
1113import io .temporal .api .enums .v1 .EventType ;
1214import io .temporal .api .history .v1 .HistoryEvent ;
@@ -27,6 +29,16 @@ final class VersionStateMachine {
2729
2830 @ Nullable private Integer version ;
2931
32+ /** This flag is used to determine if the search attribute for the version change was written. */
33+ @ Nullable private Boolean writeVersionChangeSA = false ;
34+
35+ /**
36+ * This flag is used to determine if the search attribute for the version change has been written
37+ * by this state machine. This is used to prevent writing the search attribute multiple times if
38+ * getVersion is called repeatedly.
39+ */
40+ private boolean hasWrittenVersionChangeSA = false ;
41+
3042 /**
3143 * This variable is used for replay only. When we replay, we look one workflow task ahead and
3244 * preload all version markers to be able to return from Workflow.getVersion called in the event
@@ -121,14 +133,18 @@ class InvocationStateMachine
121133
122134 private final int minSupported ;
123135 private final int maxSupported ;
124-
136+ private final Functions . Func1 < Integer , SearchAttributes > upsertSearchAttributeCallback ;
125137 private final Functions .Proc2 <Integer , RuntimeException > resultCallback ;
126138
127139 InvocationStateMachine (
128- int minSupported , int maxSupported , Functions .Proc2 <Integer , RuntimeException > callback ) {
140+ int minSupported ,
141+ int maxSupported ,
142+ Functions .Func1 <Integer , SearchAttributes > upsertSearchAttributeCallback ,
143+ Functions .Proc2 <Integer , RuntimeException > callback ) {
129144 super (STATE_MACHINE_DEFINITION , VersionStateMachine .this .commandSink , stateMachineSink );
130145 this .minSupported = minSupported ;
131146 this .maxSupported = maxSupported ;
147+ this .upsertSearchAttributeCallback = upsertSearchAttributeCallback ;
132148 this .resultCallback = Objects .requireNonNull (callback );
133149 }
134150
@@ -220,9 +236,16 @@ State createMarkerExecuting() {
220236 return State .SKIPPED ;
221237 } else {
222238 version = maxSupported ;
239+ SearchAttributes sa = upsertSearchAttributeCallback .apply (version );
240+ writeVersionChangeSA = sa != null ;
223241 RecordMarkerCommandAttributes markerAttributes =
224- VersionMarkerUtils .createMarkerAttributes (changeId , version );
225- addCommand (StateMachineCommandUtils .createRecordMarker (markerAttributes ));
242+ VersionMarkerUtils .createMarkerAttributes (changeId , version , writeVersionChangeSA );
243+ Command markerCommand = StateMachineCommandUtils .createRecordMarker (markerAttributes );
244+ addCommand (markerCommand );
245+ if (writeVersionChangeSA ) {
246+ hasWrittenVersionChangeSA = true ;
247+ UpsertSearchAttributesStateMachine .newInstance (sa , commandSink , stateMachineSink );
248+ }
226249 return State .MARKER_COMMAND_CREATED ;
227250 }
228251 }
@@ -255,6 +278,13 @@ void notifyMarkerCreatedReplaying() {
255278 State createMarkerReplaying () {
256279 createFakeCommand ();
257280 if (preloadedVersion != null ) {
281+ if (writeVersionChangeSA && !hasWrittenVersionChangeSA ) {
282+ hasWrittenVersionChangeSA = true ;
283+ if (writeVersionChangeSA ) {
284+ UpsertSearchAttributesStateMachine .newInstance (
285+ SearchAttributes .newBuilder ().build (), commandSink , stateMachineSink );
286+ }
287+ }
258288 return State .MARKER_COMMAND_CREATED_REPLAYING ;
259289 } else {
260290 return State .SKIPPED_REPLAYING ;
@@ -311,7 +341,7 @@ private void updateVersionFromEvent(HistoryEvent event) {
311341 version = getVersionFromEvent (event );
312342 }
313343
314- private void preloadVersionFromEvent (HistoryEvent event ) {
344+ private Integer preloadVersionFromEvent (HistoryEvent event ) {
315345 if (version != null ) {
316346 throw new NonDeterministicException (
317347 "Version is already set to " + version + ". " + RETROACTIVE_ADDITION_ERROR_STRING );
@@ -324,6 +354,7 @@ private void preloadVersionFromEvent(HistoryEvent event) {
324354 preloadedVersion );
325355
326356 preloadedVersion = getVersionFromEvent (event );
357+ return preloadedVersion ;
327358 }
328359
329360 void flushPreloadedVersionAndUpdateFromEvent (HistoryEvent event ) {
@@ -360,8 +391,13 @@ private VersionStateMachine(
360391 * @return True if the identifier is not present in history
361392 */
362393 public Integer getVersion (
363- int minSupported , int maxSupported , Functions .Proc2 <Integer , RuntimeException > callback ) {
364- InvocationStateMachine ism = new InvocationStateMachine (minSupported , maxSupported , callback );
394+ int minSupported ,
395+ int maxSupported ,
396+ Functions .Func1 <Integer , SearchAttributes > upsertSearchAttributeCallback ,
397+ Functions .Proc2 <Integer , RuntimeException > callback ) {
398+ InvocationStateMachine ism =
399+ new InvocationStateMachine (
400+ minSupported , maxSupported , upsertSearchAttributeCallback , callback );
365401 ism .explicitEvent (ExplicitEvent .CHECK_EXECUTION_STATE );
366402 ism .explicitEvent (ExplicitEvent .SCHEDULE );
367403 // If the state is SKIPPED_REPLAYING that means we:
@@ -373,12 +409,16 @@ public Integer getVersion(
373409 return version == null ? preloadedVersion : version ;
374410 }
375411
412+ public boolean isWriteVersionChangeSA () {
413+ return writeVersionChangeSA ;
414+ }
415+
376416 public void handleNonMatchingEvent (HistoryEvent event ) {
377417 flushPreloadedVersionAndUpdateFromEvent (event );
378418 }
379419
380- public void handleMarkersPreload (HistoryEvent event ) {
381- preloadVersionFromEvent (event );
420+ public Integer handleMarkersPreload (HistoryEvent event ) {
421+ return preloadVersionFromEvent (event );
382422 }
383423
384424 private int getVersionFromEvent (HistoryEvent event ) {
@@ -398,6 +438,13 @@ private int getVersionFromEvent(HistoryEvent event) {
398438 Integer version = VersionMarkerUtils .getVersion (event .getMarkerRecordedEventAttributes ());
399439 Preconditions .checkArgument (version != null , "Marker details missing required version key" );
400440
441+ writeVersionChangeSA =
442+ VersionMarkerUtils .getUpsertVersionSA (event .getMarkerRecordedEventAttributes ());
443+ // Old SDKs didn't write the version change search attribute. So, if it is not present then
444+ // default to false.
445+ if (writeVersionChangeSA == null ) {
446+ writeVersionChangeSA = false ;
447+ }
401448 return version ;
402449 }
403450}
0 commit comments