-
Notifications
You must be signed in to change notification settings - Fork 42
Expand file tree
/
Copy pathProgram.cs
More file actions
181 lines (155 loc) · 7.82 KB
/
Program.cs
File metadata and controls
181 lines (155 loc) · 7.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
using Temporalio.Api.WorkflowService.V1;
using Temporalio.Client;
using Temporalio.Common;
using Temporalio.Common.EnvConfig;
namespace TemporalioSamples.WorkerVersioning;
public static class Program
{
public const string TaskQueue = "worker-versioning";
public const string DeploymentName = "my-deployment";
private static readonly string[] DoActivitySignal = { "do-activity" };
private static readonly string[] SomeSignal = { "some-signal" };
private static readonly string[] ConcludeSignal = { "conclude" };
public static async Task Main(string[] args)
{
if (args.Length == 0)
{
Console.WriteLine("Usage: WorkerVersioning <command> [options]");
Console.WriteLine();
Console.WriteLine("Commands:");
Console.WriteLine(" worker-v1 - Start worker with version 1.0");
Console.WriteLine(" worker-v1.1 - Start worker with version 1.1");
Console.WriteLine(" worker-v2 - Start worker with version 2.0");
Console.WriteLine(" demo - Run the complete versioning demonstration");
return;
}
var connectOptions = ClientEnvConfig.LoadClientConnectOptions();
connectOptions.TargetHost ??= "localhost:7233";
var client = await TemporalClient.ConnectAsync(connectOptions);
switch (args[0].ToLower())
{
case "worker-v1":
await WorkerV1.RunAsync(client);
break;
case "worker-v1.1":
await WorkerV1Dot1.RunAsync(client);
break;
case "worker-v2":
await WorkerV2.RunAsync(client);
break;
case "demo":
await RunDemoAsync(client);
break;
default:
Console.WriteLine($"Unknown command: {args[0]}");
break;
}
}
public static async Task RunDemoAsync(TemporalClient client)
{
// Wait for v1 worker and set as current version
var workerV1Version = new WorkerDeploymentVersion(Program.DeploymentName, "1.0");
Console.WriteLine("Waiting for v1 worker to appear. Run `dotnet run worker-v1` in another terminal");
await WaitForWorkerVersionAsync(client, workerV1Version);
await SetCurrentVersionAsync(client, workerV1Version);
// Start auto-upgrading and pinned workflows. Importantly, note that when we start the workflows,
// we are using a workflow type name which does *not* include the version number. We defined them
// with versioned names so we could show changes to the code, but here when the client invokes
// them, we're demonstrating that the client remains version-agnostic.
var autoUpgradingId = $"worker-versioning-versioning-autoupgrade_{Guid.NewGuid()}";
var autoUpgradingHandle = await client.StartWorkflowAsync(
"AutoUpgradingWorkflow",
Array.Empty<object>(),
new(id: autoUpgradingId, taskQueue: Program.TaskQueue));
Console.WriteLine($"Started auto-upgrading workflow: {autoUpgradingHandle.Id}");
var pinnedId = $"worker-versioning-versioning-pinned_{Guid.NewGuid()}";
var pinnedHandle = await client.StartWorkflowAsync(
"PinnedWorkflow",
Array.Empty<object>(),
new(id: pinnedId, taskQueue: Program.TaskQueue));
Console.WriteLine($"Started pinned workflow: {pinnedHandle.Id}");
// Signal both workflows a few times to drive them
await SignalWorkflowsAsync(autoUpgradingHandle, pinnedHandle);
// Now wait for the v1.1 worker to appear and become current
var workerV1_1Version = new WorkerDeploymentVersion(Program.DeploymentName, "1.1");
Console.WriteLine("Waiting for v1.1 worker to appear. Run `dotnet run worker-v1.1` in another terminal");
await WaitForWorkerVersionAsync(client, workerV1_1Version);
await SetCurrentVersionAsync(client, workerV1_1Version);
// Once it has, we will continue to advance the workflows.
// The auto-upgrade workflow will now make progress on the new worker, while the pinned one will
// keep progressing on the old worker.
await SignalWorkflowsAsync(autoUpgradingHandle, pinnedHandle);
// Finally we'll start the v2 worker, and again it'll become the new current version
var workerV2Version = new WorkerDeploymentVersion(Program.DeploymentName, "2.0");
Console.WriteLine("Waiting for v2 worker to appear. Run `dotnet run worker-v2` in another terminal");
await WaitForWorkerVersionAsync(client, workerV2Version);
await SetCurrentVersionAsync(client, workerV2Version);
// Once it has we'll start one more new workflow, another pinned one, to demonstrate that new
// pinned workflows start on the current version.
var pinnedV2Id = $"worker-versioning-versioning-pinned-2_{Guid.NewGuid()}";
var pinnedV2Handle = await client.StartWorkflowAsync(
"PinnedWorkflow",
Array.Empty<object>(),
new(id: pinnedV2Id, taskQueue: Program.TaskQueue));
Console.WriteLine($"Started pinned workflow v2: {pinnedV2Handle.Id}");
// Now we'll conclude all workflows. You should be able to see in your server UI that the pinned
// workflow always stayed on 1.0, while the auto-upgrading workflow migrated.
foreach (var handle in new[] { autoUpgradingHandle, pinnedHandle, pinnedV2Handle })
{
await handle.SignalAsync("DoNextSignal", ConcludeSignal);
await handle.GetResultAsync();
}
Console.WriteLine("All workflows completed");
}
/// <summary>Signal both workflows a few times to drive them.</summary>
private static async Task SignalWorkflowsAsync(WorkflowHandle autoUpgradingHandle, WorkflowHandle pinnedHandle)
{
await autoUpgradingHandle.SignalAsync("DoNextSignal", DoActivitySignal);
await pinnedHandle.SignalAsync("DoNextSignal", SomeSignal);
}
private static async Task WaitForWorkerVersionAsync(TemporalClient client, WorkerDeploymentVersion version)
{
while (true)
{
try
{
var request = new DescribeWorkerDeploymentRequest
{
Namespace = client.Options.Namespace,
DeploymentName = version.DeploymentName,
};
var response = await client.WorkflowService.DescribeWorkerDeploymentAsync(request);
var versionInfo = response.WorkerDeploymentInfo.VersionSummaries
.FirstOrDefault(v => v.DeploymentVersion?.BuildId == version.BuildId);
if (versionInfo != null)
{
return;
}
}
catch (Temporalio.Exceptions.RpcException)
{
// Deployment not found yet
}
await Task.Delay(1000);
}
}
private static async Task SetCurrentVersionAsync(TemporalClient client, WorkerDeploymentVersion version)
{
// First get the current deployment info for the conflict token
var describeRequest = new DescribeWorkerDeploymentRequest
{
Namespace = client.Options.Namespace,
DeploymentName = version.DeploymentName,
};
var describeResponse = await client.WorkflowService.DescribeWorkerDeploymentAsync(describeRequest);
// Set the current version
var setRequest = new SetWorkerDeploymentCurrentVersionRequest
{
Namespace = client.Options.Namespace,
DeploymentName = version.DeploymentName,
BuildId = version.BuildId,
ConflictToken = describeResponse.ConflictToken,
};
await client.WorkflowService.SetWorkerDeploymentCurrentVersionAsync(setRequest);
}
}