Skip to content

Commit 08d8194

Browse files
committed
Implement versioning override
1 parent 5bd4516 commit 08d8194

7 files changed

Lines changed: 173 additions & 5 deletions

File tree

temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public static WorkflowOptions merge(
6666
.setLinks(o.getLinks())
6767
.setOnConflictOptions(o.getOnConflictOptions())
6868
.setPriority(o.getPriority())
69+
.setVersioningOverride(o.getVersioningOverride())
6970
.validateBuildWithDefaults();
7071
}
7172

@@ -115,6 +116,8 @@ public static final class Builder {
115116

116117
private Priority priority;
117118

119+
private VersioningOverride versioningOverride;
120+
118121
private Builder() {}
119122

120123
private Builder(WorkflowOptions options) {
@@ -143,6 +146,7 @@ private Builder(WorkflowOptions options) {
143146
this.links = options.links;
144147
this.onConflictOptions = options.onConflictOptions;
145148
this.priority = options.priority;
149+
this.versioningOverride = options.versioningOverride;
146150
}
147151

148152
/**
@@ -472,6 +476,13 @@ public Builder setPriority(Priority priority) {
472476
return this;
473477
}
474478

479+
/** Sets the versioning override to use when starting this workflow. */
480+
@Experimental
481+
public Builder setVersioningOverride(VersioningOverride versioningOverride) {
482+
this.versioningOverride = versioningOverride;
483+
return this;
484+
}
485+
475486
public WorkflowOptions build() {
476487
return new WorkflowOptions(
477488
workflowId,
@@ -495,7 +506,8 @@ public WorkflowOptions build() {
495506
completionCallbacks,
496507
links,
497508
onConflictOptions,
498-
priority);
509+
priority,
510+
versioningOverride);
499511
}
500512

501513
/**
@@ -524,7 +536,8 @@ public WorkflowOptions validateBuildWithDefaults() {
524536
completionCallbacks,
525537
links,
526538
onConflictOptions,
527-
priority);
539+
priority,
540+
versioningOverride);
528541
}
529542
}
530543

@@ -569,6 +582,7 @@ public WorkflowOptions validateBuildWithDefaults() {
569582
private final List<Link> links;
570583
private final OnConflictOptions onConflictOptions;
571584
private final Priority priority;
585+
private final VersioningOverride versioningOverride;
572586

573587
private WorkflowOptions(
574588
String workflowId,
@@ -592,7 +606,8 @@ private WorkflowOptions(
592606
List<Callback> completionCallbacks,
593607
List<Link> links,
594608
OnConflictOptions onConflictOptions,
595-
Priority priority) {
609+
Priority priority,
610+
VersioningOverride versioningOverride) {
596611
this.workflowId = workflowId;
597612
this.workflowIdReusePolicy = workflowIdReusePolicy;
598613
this.workflowRunTimeout = workflowRunTimeout;
@@ -615,6 +630,7 @@ private WorkflowOptions(
615630
this.links = links;
616631
this.onConflictOptions = onConflictOptions;
617632
this.priority = priority;
633+
this.versioningOverride = versioningOverride;
618634
}
619635

620636
public String getWorkflowId() {
@@ -721,6 +737,11 @@ public Priority getPriority() {
721737
return priority;
722738
}
723739

740+
@Experimental
741+
public VersioningOverride getVersioningOverride() {
742+
return versioningOverride;
743+
}
744+
724745
public Builder toBuilder() {
725746
return new Builder(this);
726747
}
@@ -751,7 +772,8 @@ public boolean equals(Object o) {
751772
&& Objects.equal(completionCallbacks, that.completionCallbacks)
752773
&& Objects.equal(links, that.links)
753774
&& Objects.equal(onConflictOptions, that.onConflictOptions)
754-
&& Objects.equal(priority, that.priority);
775+
&& Objects.equal(priority, that.priority)
776+
&& Objects.equal(versioningOverride, that.versioningOverride);
755777
}
756778

757779
@Override
@@ -778,7 +800,8 @@ public int hashCode() {
778800
completionCallbacks,
779801
links,
780802
onConflictOptions,
781-
priority);
803+
priority,
804+
versioningOverride);
782805
}
783806

784807
@Override
@@ -831,6 +854,8 @@ public String toString() {
831854
+ onConflictOptions
832855
+ ", priority="
833856
+ priority
857+
+ ", versioningOverride="
858+
+ versioningOverride
834859
+ '}';
835860
}
836861
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package io.temporal.common;
2+
3+
import io.temporal.api.enums.v1.VersioningBehavior;
4+
import javax.annotation.Nonnull;
5+
6+
/**
7+
* Represents the override of a worker's versioning behavior for a workflow execution. Exactly one
8+
* of the subtypes must be used.
9+
*/
10+
@Experimental
11+
public abstract class VersioningOverride {
12+
private VersioningOverride() {}
13+
14+
/**
15+
* Converts this override to a protobuf message.
16+
*
17+
* @return The proto representation.
18+
*/
19+
@SuppressWarnings("deprecation")
20+
public io.temporal.api.workflow.v1.VersioningOverride toProto() {
21+
if (this instanceof PinnedVersioningOverride) {
22+
PinnedVersioningOverride pv = (PinnedVersioningOverride) this;
23+
io.temporal.api.workflow.v1.VersioningOverride.PinnedOverride.Builder pinnedBuilder =
24+
io.temporal.api.workflow.v1.VersioningOverride.PinnedOverride.newBuilder()
25+
.setVersion(pv.getVersion().toProto());
26+
27+
pinnedBuilder.setBehavior(
28+
io.temporal.api.workflow.v1.VersioningOverride.PinnedOverrideBehavior
29+
.PINNED_OVERRIDE_BEHAVIOR_PINNED);
30+
31+
return io.temporal.api.workflow.v1.VersioningOverride.newBuilder()
32+
.setBehavior(VersioningBehavior.VERSIONING_BEHAVIOR_PINNED)
33+
.setPinnedVersion(pv.version.toCanonicalString())
34+
.setPinned(pinnedBuilder.build())
35+
.build();
36+
} else {
37+
return io.temporal.api.workflow.v1.VersioningOverride.newBuilder()
38+
.setBehavior(VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE)
39+
.setAutoUpgrade(true)
40+
.build();
41+
}
42+
}
43+
44+
/** Workflow will be pinned to a specific deployment version. */
45+
public static final class PinnedVersioningOverride extends VersioningOverride {
46+
private final WorkerDeploymentVersion version;
47+
48+
/**
49+
* Creates a new PinnedVersioningOverride.
50+
*
51+
* @param version The worker deployment version to pin the workflow to.
52+
*/
53+
public PinnedVersioningOverride(@Nonnull WorkerDeploymentVersion version) {
54+
this.version = version;
55+
}
56+
57+
/**
58+
* @return The worker deployment version to pin the workflow to.
59+
*/
60+
public WorkerDeploymentVersion getVersion() {
61+
return version;
62+
}
63+
}
64+
65+
/** The workflow will auto-upgrade to the current deployment version on the next workflow task. */
66+
public static final class AutoUpgradeVersioningOverride extends VersioningOverride {
67+
public AutoUpgradeVersioningOverride() {}
68+
}
69+
}

temporal-sdk/src/main/java/io/temporal/common/WorkerDeploymentVersion.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,16 @@ public String toString() {
8080
+ '\''
8181
+ '}';
8282
}
83+
84+
/**
85+
* Converts this version to a proto.
86+
*
87+
* @return The proto representation.
88+
*/
89+
public io.temporal.api.deployment.v1.WorkerDeploymentVersion toProto() {
90+
return io.temporal.api.deployment.v1.WorkerDeploymentVersion.newBuilder()
91+
.setBuildId(buildId)
92+
.setDeploymentName(deploymentName)
93+
.build();
94+
}
8395
}

temporal-sdk/src/main/java/io/temporal/internal/client/ScheduleProtoUtil.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ public ScheduleAction actionToProto(io.temporal.client.schedules.ScheduleAction
162162
if (wfOptions.getPriority() != null) {
163163
workflowRequest.setPriority(PriorityUtils.toProto(wfOptions.getPriority()));
164164
}
165+
if (wfOptions.getVersioningOverride() != null) {
166+
workflowRequest.setVersioningOverride(wfOptions.getVersioningOverride().toProto());
167+
}
165168

166169
return ScheduleAction.newBuilder().setStartWorkflow(workflowRequest.build()).build();
167170
}

temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest(
122122
request.setPriority(PriorityUtils.toProto(options.getPriority()));
123123
}
124124

125+
if (options.getVersioningOverride() != null) {
126+
request.setVersioningOverride(options.getVersioningOverride().toProto());
127+
}
128+
125129
if (options.getSearchAttributes() != null && !options.getSearchAttributes().isEmpty()) {
126130
if (options.getTypedSearchAttributes() != null) {
127131
throw new IllegalArgumentException(
@@ -202,6 +206,10 @@ SignalWithStartWorkflowExecutionRequest.Builder newSignalWithStartWorkflowExecut
202206
request.setUserMetadata(startParameters.getUserMetadata());
203207
}
204208

209+
if (startParameters.hasVersioningOverride()) {
210+
request.setVersioningOverride(startParameters.getVersioningOverride());
211+
}
212+
205213
return request;
206214
}
207215

temporal-sdk/src/test/java/io/temporal/worker/WorkerVersioningTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,18 @@
99
import io.temporal.client.WorkflowClient;
1010
import io.temporal.client.WorkflowStub;
1111
import io.temporal.common.VersioningBehavior;
12+
import io.temporal.common.VersioningOverride;
1213
import io.temporal.common.WorkerDeploymentVersion;
1314
import io.temporal.common.WorkflowExecutionHistory;
1415
import io.temporal.common.converter.EncodedValues;
1516
import io.temporal.testUtils.Eventually;
17+
import io.temporal.testing.internal.SDKTestOptions;
1618
import io.temporal.testing.internal.SDKTestWorkflowRule;
1719
import io.temporal.workflow.*;
1820
import io.temporal.workflow.shared.TestWorkflows;
1921
import java.time.Duration;
2022
import java.util.HashSet;
23+
import java.util.UUID;
2124
import org.junit.Assert;
2225
import org.junit.Rule;
2326
import org.junit.Test;
@@ -384,6 +387,51 @@ public void testAnnotationNotAllowedOnInterface() {
384387
e.getMessage());
385388
}
386389

390+
@SuppressWarnings("deprecation")
391+
@Test
392+
public void testWorkflowsCanUseVersioningOverride() {
393+
assumeTrue("Test Server doesn't support versioning", SDKTestWorkflowRule.useExternalService);
394+
395+
Worker w1 = testWorkflowRule.newWorkerWithBuildID("1.0");
396+
WorkerDeploymentVersion v1 = w1.getWorkerOptions().getDeploymentOptions().getVersion();
397+
w1.registerWorkflowImplementationTypes(TestWorkerVersioningAutoUpgradeV1.class);
398+
w1.start();
399+
400+
DescribeWorkerDeploymentResponse describeResp1 = waitUntilWorkerDeploymentVisible(v1);
401+
setCurrentVersion(v1, describeResp1.getConflictToken());
402+
403+
String workflowId = "versioning-override-" + UUID.randomUUID();
404+
TestWorkflows.QueryableWorkflow wf =
405+
testWorkflowRule
406+
.getWorkflowClient()
407+
.newWorkflowStub(
408+
TestWorkflows.QueryableWorkflow.class,
409+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue())
410+
.toBuilder()
411+
.setWorkflowId(workflowId)
412+
.setVersioningOverride(new VersioningOverride.PinnedVersioningOverride(v1))
413+
.build());
414+
WorkflowExecution we = WorkflowClient.start(wf::execute);
415+
wf.mySignal("done");
416+
testWorkflowRule
417+
.getWorkflowClient()
418+
.newUntypedWorkflowStub(we.getWorkflowId())
419+
.getResult(String.class);
420+
421+
WorkflowExecutionHistory hist = testWorkflowRule.getExecutionHistory(we.getWorkflowId());
422+
Assert.assertTrue(
423+
hist.getHistory().getEventsList().stream()
424+
.anyMatch(
425+
e ->
426+
e.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
427+
&& e.getWorkflowExecutionStartedEventAttributes()
428+
.getVersioningOverride()
429+
.getBehavior()
430+
== io.temporal.api.enums.v1.VersioningBehavior
431+
.VERSIONING_BEHAVIOR_PINNED));
432+
}
433+
434+
@SuppressWarnings("deprecation")
387435
private DescribeWorkerDeploymentResponse waitUntilWorkerDeploymentVisible(
388436
WorkerDeploymentVersion v) {
389437
DescribeWorkerDeploymentRequest req =
@@ -419,6 +467,7 @@ private String runWorkflow(String idPrefix) {
419467
.getResult(String.class);
420468
}
421469

470+
@SuppressWarnings("deprecation")
422471
private SetWorkerDeploymentCurrentVersionResponse setCurrentVersion(
423472
WorkerDeploymentVersion v, ByteString conflictToken) {
424473
return testWorkflowRule
@@ -434,6 +483,7 @@ private SetWorkerDeploymentCurrentVersionResponse setCurrentVersion(
434483
.build());
435484
}
436485

486+
@SuppressWarnings("deprecation")
437487
private SetWorkerDeploymentRampingVersionResponse setRampingVersion(
438488
WorkerDeploymentVersion v, float percent, ByteString conflictToken) {
439489
return testWorkflowRule

temporal-sdk/src/test/java/io/temporal/workflow/BinaryChecksumSetWhenTaskCompletedTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class BinaryChecksumSetWhenTaskCompletedTest {
3131
.build();
3232

3333
@Test
34+
@SuppressWarnings("deprecation")
3435
public void testBinaryChecksumSetWhenTaskCompleted() {
3536
TestWorkflow1 client = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
3637
WorkflowExecution execution =

0 commit comments

Comments
 (0)