Skip to content

Commit da5e8a1

Browse files
THardy98claude
andauthored
Add upgrade-on-CAN support (#2811)
* Add upgrade-on-CAN support - Add isTargetWorkerDeploymentVersionChanged() to WorkflowInfo, plumbed through WorkflowTaskStateMachine → WorkflowStateMachines → ReplayWorkflowContext → WorkflowInfoImpl → WorkflowInfo - Add initialVersioningBehavior on ContinueAsNewOptions, allowing workflows to CAN with AUTO_UPGRADE to move to the new version - Wire initialVersioningBehavior encoding in SyncWorkflowContext - Update API proto submodule to include api#709 and #721 changes - Fix NexusWorker deprecation warnings from proto update Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Replace QueryableWorkflow with dedicated CAN workflow interface in versioning test The CAN test workflows no longer inherit unnecessary mySignal/getState methods from QueryableWorkflow. Instead they implement a dedicated ContinueAsNewVersionUpgradeWorkflow interface with execute(int attempt), matching the pattern used by other SDKs (Python/Go/TS/.NET). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Suppress deprecation warnings in NexusWorkflowTest to fix CI Add @SuppressWarnings("deprecation") to test methods calling the deprecated setOperationError() API, preventing -Werror from failing the build. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * remove excess supression warning from merge conflict * remove excess supression warning from merge conflict * remove conflict markers... --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0e1a7a2 commit da5e8a1

File tree

14 files changed

+342
-7
lines changed

14 files changed

+342
-7
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.temporal.common;
2+
3+
/**
4+
* Specifies the versioning behavior for the first task of a new workflow run started via
5+
* continue-as-new.
6+
*/
7+
@Experimental
8+
public enum InitialVersioningBehavior {
9+
/**
10+
* Start the new run with {@link VersioningBehavior#AUTO_UPGRADE} behavior for the first task,
11+
* upgrading to the latest version. After the first workflow task completes, the workflow uses
12+
* whatever versioning behavior is specified in the workflow code.
13+
*/
14+
AUTO_UPGRADE
15+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.temporal.common;
2+
3+
/**
4+
* Reason(s) why the server suggests a workflow should continue-as-new. Multiple reasons can be true
5+
* at the same time.
6+
*/
7+
@Experimental
8+
public enum SuggestContinueAsNewReason {
9+
/** Workflow history size is getting too large. */
10+
HISTORY_SIZE_TOO_LARGE,
11+
/** Workflow history has too many events. */
12+
TOO_MANY_HISTORY_EVENTS,
13+
/** Workflow's count of completed plus in-flight updates is too large. */
14+
TOO_MANY_UPDATES
15+
}

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88
import io.temporal.api.sdk.v1.UserMetadata;
99
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
1010
import io.temporal.common.RetryOptions;
11+
import io.temporal.common.SuggestContinueAsNewReason;
1112
import io.temporal.internal.common.SdkFlag;
1213
import io.temporal.internal.statemachines.*;
1314
import io.temporal.workflow.Functions;
1415
import io.temporal.workflow.Functions.Func;
1516
import io.temporal.workflow.Functions.Func1;
1617
import java.time.Duration;
18+
import java.util.List;
1719
import java.util.Map;
1820
import java.util.Optional;
1921
import java.util.Random;
@@ -357,6 +359,17 @@ Integer getVersion(
357359
*/
358360
boolean isContinueAsNewSuggested();
359361

362+
/**
363+
* @return the reasons why continue-as-new is suggested, or an empty list if not suggested. This
364+
* value changes during the lifetime of a Workflow Execution.
365+
*/
366+
List<SuggestContinueAsNewReason> getSuggestContinueAsNewReasons();
367+
368+
/**
369+
* @return true if the target worker deployment version has changed for this workflow.
370+
*/
371+
boolean isTargetWorkerDeploymentVersionChanged();
372+
360373
/**
361374
* @return true if cancellation of the workflow is requested.
362375
*/

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
1111
import io.temporal.api.sdk.v1.UserMetadata;
1212
import io.temporal.common.RetryOptions;
13+
import io.temporal.common.SuggestContinueAsNewReason;
1314
import io.temporal.failure.CanceledFailure;
1415
import io.temporal.internal.common.ProtobufTimeUtils;
1516
import io.temporal.internal.common.SdkFlag;
@@ -416,6 +417,16 @@ public boolean isContinueAsNewSuggested() {
416417
return workflowStateMachines.isContinueAsNewSuggested();
417418
}
418419

420+
@Override
421+
public List<SuggestContinueAsNewReason> getSuggestContinueAsNewReasons() {
422+
return workflowStateMachines.getSuggestContinueAsNewReasons();
423+
}
424+
425+
@Override
426+
public boolean isTargetWorkerDeploymentVersionChanged() {
427+
return workflowStateMachines.isTargetWorkerDeploymentVersionChanged();
428+
}
429+
419430
/*
420431
* MUTABLE STATE OPERATIONS
421432
*/

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.temporal.api.protocol.v1.Message;
2121
import io.temporal.api.sdk.v1.UserMetadata;
2222
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
23+
import io.temporal.common.SuggestContinueAsNewReason;
2324
import io.temporal.failure.CanceledFailure;
2425
import io.temporal.internal.common.*;
2526
import io.temporal.internal.history.LocalActivityMarkerUtils;
@@ -88,6 +89,10 @@ enum HandleEventStatus {
8889

8990
private boolean isContinueAsNewSuggested;
9091

92+
private List<SuggestContinueAsNewReason> suggestContinueAsNewReasons = Collections.emptyList();
93+
94+
private boolean isTargetWorkerDeploymentVersionChanged;
95+
9196
/**
9297
* EventId of the last event seen by these state machines. Events earlier than this one will be
9398
* discarded.
@@ -276,6 +281,14 @@ public boolean isContinueAsNewSuggested() {
276281
return isContinueAsNewSuggested;
277282
}
278283

284+
public List<SuggestContinueAsNewReason> getSuggestContinueAsNewReasons() {
285+
return suggestContinueAsNewReasons;
286+
}
287+
288+
public boolean isTargetWorkerDeploymentVersionChanged() {
289+
return isTargetWorkerDeploymentVersionChanged;
290+
}
291+
279292
public void setReplaying(boolean replaying) {
280293
this.replaying = replaying;
281294
}
@@ -1493,7 +1506,9 @@ public void workflowTaskStarted(
14931506
long currentTimeMillis,
14941507
boolean nonProcessedWorkflowTask,
14951508
long historySize,
1496-
boolean isContinueAsNewSuggested) {
1509+
boolean isContinueAsNewSuggested,
1510+
List<SuggestContinueAsNewReason> suggestContinueAsNewReasons,
1511+
boolean isTargetWorkerDeploymentVersionChanged) {
14971512
setCurrentTimeMillis(currentTimeMillis);
14981513
for (CancellableCommand cancellableCommand : commands) {
14991514
cancellableCommand.handleWorkflowTaskStarted();
@@ -1509,6 +1524,9 @@ public void workflowTaskStarted(
15091524
WorkflowStateMachines.this.lastWFTStartedEventId = startedEventId;
15101525
WorkflowStateMachines.this.historySize = historySize;
15111526
WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested;
1527+
WorkflowStateMachines.this.suggestContinueAsNewReasons = suggestContinueAsNewReasons;
1528+
WorkflowStateMachines.this.isTargetWorkerDeploymentVersionChanged =
1529+
isTargetWorkerDeploymentVersionChanged;
15121530

15131531
eventLoop();
15141532
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowTaskStateMachine.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
import io.temporal.api.enums.v1.EventType;
55
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
66
import io.temporal.api.history.v1.WorkflowTaskFailedEventAttributes;
7+
import io.temporal.common.SuggestContinueAsNewReason;
8+
import java.util.ArrayList;
9+
import java.util.Collections;
10+
import java.util.List;
711
import java.util.Objects;
812

913
final class WorkflowTaskStateMachine
@@ -32,7 +36,9 @@ void workflowTaskStarted(
3236
long currentTimeMillis,
3337
boolean nonProcessedWorkflowTask,
3438
long historySize,
35-
boolean isContinueAsNewSuggested);
39+
boolean isContinueAsNewSuggested,
40+
List<SuggestContinueAsNewReason> suggestContinueAsNewReasons,
41+
boolean isTargetWorkerDeploymentVersionChanged);
3642

3743
void updateRunId(String currentRunId);
3844
}
@@ -46,6 +52,8 @@ void workflowTaskStarted(
4652
private long startedEventId;
4753
private long historySize;
4854
private boolean isContinueAsNewSuggested;
55+
private List<SuggestContinueAsNewReason> suggestContinueAsNewReasons = Collections.emptyList();
56+
private boolean isTargetWorkerDeploymentVersionChanged;
4957

5058
public static WorkflowTaskStateMachine newInstance(
5159
long workflowTaskStartedEventId, Listener listener) {
@@ -103,6 +111,15 @@ private void handleStarted() {
103111
historySize = currentEvent.getWorkflowTaskStartedEventAttributes().getHistorySizeBytes();
104112
isContinueAsNewSuggested =
105113
currentEvent.getWorkflowTaskStartedEventAttributes().getSuggestContinueAsNew();
114+
suggestContinueAsNewReasons =
115+
convertSuggestContinueAsNewReasons(
116+
currentEvent
117+
.getWorkflowTaskStartedEventAttributes()
118+
.getSuggestContinueAsNewReasonsList());
119+
isTargetWorkerDeploymentVersionChanged =
120+
currentEvent
121+
.getWorkflowTaskStartedEventAttributes()
122+
.getTargetWorkerDeploymentVersionChanged();
106123

107124
// The last started event in the history. So no completed is expected.
108125
if (currentEvent.getEventId() >= workflowTaskStartedEventId && !hasNextEvent) {
@@ -121,7 +138,33 @@ private void handleCompleted() {
121138
eventTimeOfTheLastWorkflowStartTask,
122139
lastTaskInHistory,
123140
historySize,
124-
isContinueAsNewSuggested);
141+
isContinueAsNewSuggested,
142+
suggestContinueAsNewReasons,
143+
isTargetWorkerDeploymentVersionChanged);
144+
}
145+
146+
private static List<SuggestContinueAsNewReason> convertSuggestContinueAsNewReasons(
147+
List<io.temporal.api.enums.v1.SuggestContinueAsNewReason> protoReasons) {
148+
if (protoReasons.isEmpty()) {
149+
return Collections.emptyList();
150+
}
151+
List<SuggestContinueAsNewReason> reasons = new ArrayList<>(protoReasons.size());
152+
for (io.temporal.api.enums.v1.SuggestContinueAsNewReason proto : protoReasons) {
153+
switch (proto) {
154+
case SUGGEST_CONTINUE_AS_NEW_REASON_HISTORY_SIZE_TOO_LARGE:
155+
reasons.add(SuggestContinueAsNewReason.HISTORY_SIZE_TOO_LARGE);
156+
break;
157+
case SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_HISTORY_EVENTS:
158+
reasons.add(SuggestContinueAsNewReason.TOO_MANY_HISTORY_EVENTS);
159+
break;
160+
case SUGGEST_CONTINUE_AS_NEW_REASON_TOO_MANY_UPDATES:
161+
reasons.add(SuggestContinueAsNewReason.TOO_MANY_UPDATES);
162+
break;
163+
default:
164+
break;
165+
}
166+
}
167+
return Collections.unmodifiableList(reasons);
125168
}
126169

127170
private void handleFailed() {

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,6 +1415,15 @@ public void continueAsNew(ContinueAsNewInput input) {
14151415
.determineUseCompatibleFlag(
14161416
replayContext.getTaskQueue().equals(options.getTaskQueue())));
14171417
}
1418+
if (options.getInitialVersioningBehavior() != null) {
1419+
switch (options.getInitialVersioningBehavior()) {
1420+
case AUTO_UPGRADE:
1421+
attributes.setInitialVersioningBehavior(
1422+
io.temporal.api.enums.v1.ContinueAsNewVersioningBehavior
1423+
.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE);
1424+
break;
1425+
}
1426+
}
14181427
}
14191428

14201429
if (options == null && replayContext.getRetryOptions() != null) {

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInfoImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import io.temporal.api.common.v1.WorkflowExecution;
55
import io.temporal.common.Priority;
66
import io.temporal.common.RetryOptions;
7+
import io.temporal.common.SuggestContinueAsNewReason;
78
import io.temporal.internal.common.ProtoConverters;
89
import io.temporal.internal.replay.ReplayWorkflowContext;
910
import io.temporal.workflow.WorkflowInfo;
1011
import java.time.Duration;
12+
import java.util.List;
1113
import java.util.Optional;
1214
import javax.annotation.Nonnull;
1315
import javax.annotation.Nullable;
@@ -147,6 +149,16 @@ public boolean isContinueAsNewSuggested() {
147149
return context.isContinueAsNewSuggested();
148150
}
149151

152+
@Override
153+
public List<SuggestContinueAsNewReason> getSuggestContinueAsNewReasons() {
154+
return context.getSuggestContinueAsNewReasons();
155+
}
156+
157+
@Override
158+
public boolean isTargetWorkerDeploymentVersionChanged() {
159+
return context.isTargetWorkerDeploymentVersionChanged();
160+
}
161+
150162
@Override
151163
public Optional<String> getCurrentBuildId() {
152164
return context.getCurrentBuildId();

temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ public Throwable wrapFailure(NexusTask task, Throwable failure) {
287287
"Failure processing nexus response: " + response.getRequest().toString(), failure);
288288
}
289289

290-
@SuppressWarnings("deprecation") // Uses deprecated operationError
290+
@SuppressWarnings("deprecation") // Uses hasOperationError()/getOperationError() for compat
291291
private void handleNexusTask(NexusTask task, Scope metricsScope) {
292292
PollNexusTaskQueueResponseOrBuilder pollResponse = task.getResponse();
293293
ByteString taskToken = pollResponse.getTaskToken();

temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public static final class Builder {
4545
@SuppressWarnings("deprecation")
4646
private VersioningIntent versioningIntent;
4747

48+
private InitialVersioningBehavior initialVersioningBehavior;
49+
4850
private Builder() {}
4951

5052
private Builder(ContinueAsNewOptions options) {
@@ -60,6 +62,7 @@ private Builder(ContinueAsNewOptions options) {
6062
this.typedSearchAttributes = options.getTypedSearchAttributes();
6163
this.contextPropagators = options.getContextPropagators();
6264
this.versioningIntent = options.versioningIntent;
65+
this.initialVersioningBehavior = options.initialVersioningBehavior;
6366
}
6467

6568
public Builder setWorkflowRunTimeout(Duration workflowRunTimeout) {
@@ -131,6 +134,18 @@ public Builder setVersioningIntent(VersioningIntent versioningIntent) {
131134
return this;
132135
}
133136

137+
/**
138+
* Specifies the versioning behavior for the first task of the new workflow run. For example,
139+
* set to AUTO_UPGRADE to upgrade to the latest version on continue-as-new instead of inheriting
140+
* the pinned version from the previous run.
141+
*/
142+
@Experimental
143+
public Builder setInitialVersioningBehavior(
144+
InitialVersioningBehavior initialVersioningBehavior) {
145+
this.initialVersioningBehavior = initialVersioningBehavior;
146+
return this;
147+
}
148+
134149
public ContinueAsNewOptions build() {
135150
return new ContinueAsNewOptions(
136151
workflowRunTimeout,
@@ -141,7 +156,8 @@ public ContinueAsNewOptions build() {
141156
searchAttributes,
142157
typedSearchAttributes,
143158
contextPropagators,
144-
versioningIntent);
159+
versioningIntent,
160+
initialVersioningBehavior);
145161
}
146162
}
147163

@@ -157,6 +173,8 @@ public ContinueAsNewOptions build() {
157173
@SuppressWarnings("deprecation")
158174
private final @Nullable VersioningIntent versioningIntent;
159175

176+
private final @Nullable InitialVersioningBehavior initialVersioningBehavior;
177+
160178
public ContinueAsNewOptions(
161179
@Nullable Duration workflowRunTimeout,
162180
@Nullable String taskQueue,
@@ -166,7 +184,8 @@ public ContinueAsNewOptions(
166184
@Nullable Map<String, Object> searchAttributes,
167185
@Nullable SearchAttributes typedSearchAttributes,
168186
@Nullable List<ContextPropagator> contextPropagators,
169-
@SuppressWarnings("deprecation") @Nullable VersioningIntent versioningIntent) {
187+
@SuppressWarnings("deprecation") @Nullable VersioningIntent versioningIntent,
188+
@Nullable InitialVersioningBehavior initialVersioningBehavior) {
170189
this.workflowRunTimeout = workflowRunTimeout;
171190
this.taskQueue = taskQueue;
172191
this.retryOptions = retryOptions;
@@ -176,6 +195,7 @@ public ContinueAsNewOptions(
176195
this.typedSearchAttributes = typedSearchAttributes;
177196
this.contextPropagators = contextPropagators;
178197
this.versioningIntent = versioningIntent;
198+
this.initialVersioningBehavior = initialVersioningBehavior;
179199
}
180200

181201
public @Nullable Duration getWorkflowRunTimeout() {
@@ -223,4 +243,13 @@ public RetryOptions getRetryOptions() {
223243
public @Nullable VersioningIntent getVersioningIntent() {
224244
return versioningIntent;
225245
}
246+
247+
/**
248+
* @return the initial versioning behavior for the first task of the new workflow run, or null if
249+
* unset.
250+
*/
251+
@Experimental
252+
public @Nullable InitialVersioningBehavior getInitialVersioningBehavior() {
253+
return initialVersioningBehavior;
254+
}
226255
}

0 commit comments

Comments
 (0)