-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathOrderProcessingParallelTest.cs
More file actions
135 lines (116 loc) · 5.66 KB
/
OrderProcessingParallelTest.cs
File metadata and controls
135 lines (116 loc) · 5.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
using Company.Function.Models;
using Company.Function.Activities;
using System.Collections.Concurrent;
using System.Diagnostics;
using Microsoft.DurableTask.Entities;
using System.Reflection.Metadata.Ecma335;
namespace Company.Function
{
public static partial class OrderProcessingOrchestration
{
[Function("OrderProcessingOrchestration_ParallelTest")]
public static async Task<HttpResponseData> ParallelTest(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "run/{count}")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
int count,
FunctionContext executionContext,
CancellationToken cancellationToken)
{
// this tests starts and runs multiple order ochestrations in parallel, logging progress over time.
// note that it is not well suited for very large counts, especially when deployed, since both the http call and
// the function will time out at some point.
Stopwatch stopwatch = Stopwatch.StartNew();
try
{
ConcurrentBag<Task<string>> startTasks = [];
ILogger logger = executionContext.GetLogger("OrderProcessingOrchestration_ParallelTest");
// we are using warnings for test progress so we can filter them easily
logger.LogWarning($"starting {count} orchestrations");
Task bgtask = Task.Run(() => Parallel.For(
0,
count,
(int i) => startTasks.Add(
client.ScheduleNewOrchestrationInstanceAsync(
nameof(OrderProcessingOrchestration),
new OrderPayload("milk", TotalCost: 5, Quantity: 1)))));
while (true)
{
await Task.Delay(TimeSpan.FromSeconds(5));
int startedCount = startTasks.Count(t => t.IsCompleted);
int faultedCount = startTasks.Count(t => t.IsFaulted);
if (startedCount == count || faultedCount > 0)
{
break;
}
logger.LogWarning($"{stopwatch.Elapsed} started {startedCount}/{count} orchestrations so far");
}
// await the tasks to make sure we observe any exceptions
await Task.WhenAll(startTasks);
logger.LogWarning($"{stopwatch.Elapsed} started all {count} orchestrations");
ConcurrentBag<Task<OrchestrationMetadata>> completionTasks = [];
Task bgtask2 = Task.Run(() => Parallel.ForEach(
startTasks.Select(t => t.Result),
(string instanceId) => completionTasks.Add(WaitForInstanceWithoutTimeout(instanceId))));
while (true)
{
await Task.Delay(TimeSpan.FromSeconds(5));
int completedCount = completionTasks.Count(t => t.IsCompleted);
int faultedCount = completionTasks.Count(t => t.IsFaulted);
if (completedCount == count || faultedCount > 0)
{
break;
}
logger.LogWarning($"{stopwatch.Elapsed} completed {completedCount}/{count} orchestrations so far");
}
// await the tasks to make sure we observe any exceptions
await Task.WhenAll(completionTasks);
logger.LogWarning($"{stopwatch.Elapsed} completed all {count} orchestrations");
var httpResponse = req.CreateResponse(System.Net.HttpStatusCode.OK);
await httpResponse.WriteStringAsync($"completed all {count} orchestrations in approximately {stopwatch.Elapsed}\n");
return httpResponse;
}
catch(Exception e)
{
var httpResponse = req.CreateResponse(System.Net.HttpStatusCode.InternalServerError);
await httpResponse.WriteStringAsync($"encountered exception after approximately {stopwatch.Elapsed}:\n {e}\n");
return httpResponse;
}
async Task<OrchestrationMetadata> WaitForInstanceWithoutTimeout(string instanceId)
{
while (true)
{
try
{
return await client.WaitForInstanceCompletionAsync(instanceId, CancellationToken.None);
}
catch (Grpc.Core.RpcException grpcException) when (grpcException.StatusCode == Grpc.Core.StatusCode.Unavailable)
{
await Task.Delay(TimeSpan.FromSeconds(5)); // retry after a delay
}
catch (OperationCanceledException exception) when (IsTimeoutException(exception))
{
// retry immediately
}
}
}
// To tell whether an exception is a timeout we search through the inner exceptions to see if any of them is a TimeoutException
static bool IsTimeoutException(Exception? e)
{
while (e is not null)
{
if (e is TimeoutException)
{
return true;
}
e = e.InnerException;
}
return false;
}
}
}
}