-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathLargePayloadOrchestration.cs
More file actions
128 lines (107 loc) · 4.98 KB
/
LargePayloadOrchestration.cs
File metadata and controls
128 lines (107 loc) · 4.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
using System.Net;
using System.Text;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
namespace LargePayload;
public static class LargePayloadOrchestration
{
private const int OneMiB = 1024 * 1024;
private const int DefaultPayloadSizeBytes = 1536 * 1024;
[Function("StartLargePayload")]
public static async Task<HttpResponseData> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger(nameof(HttpStart));
int payloadSizeBytes = GetPositiveIntSetting("PAYLOAD_SIZE_BYTES", DefaultPayloadSizeBytes);
string payload = CreatePayload(payloadSizeBytes);
LargePayloadRequest request = new(payload, payloadSizeBytes);
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(LargePayloadRoundTrip), request);
logger.LogInformation(
"Started orchestration {InstanceId} with payload size {PayloadSizeBytes} bytes.",
instanceId,
payloadSizeBytes);
return await client.CreateCheckStatusResponseAsync(req, instanceId);
}
[Function(nameof(LargePayloadRoundTrip))]
public static async Task<LargePayloadSummary> LargePayloadRoundTrip(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
LargePayloadRequest request = context.GetInput<LargePayloadRequest>()
?? throw new InvalidOperationException("The orchestration input payload was not provided.");
string echoedPayload = await context.CallActivityAsync<string>(nameof(EchoLargePayload), request.Payload)
?? throw new InvalidOperationException("The activity did not return a payload.");
int orchestrationInputBytes = GetUtf8ByteCount(request.Payload);
int activityOutputBytes = GetUtf8ByteCount(echoedPayload);
return new LargePayloadSummary(
RequestedPayloadBytes: request.RequestedPayloadBytes,
OrchestrationInputBytes: orchestrationInputBytes,
ActivityOutputBytes: activityOutputBytes,
ExceededOneMiB: Math.Max(orchestrationInputBytes, activityOutputBytes) > OneMiB,
PayloadsMatch: string.Equals(request.Payload, echoedPayload, StringComparison.Ordinal));
}
[Function(nameof(EchoLargePayload))]
public static string EchoLargePayload(
[ActivityTrigger] string payload,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger(nameof(EchoLargePayload));
int payloadBytes = GetUtf8ByteCount(payload);
logger.LogInformation(
"Echoing a payload with {PayloadBytes} bytes.",
payloadBytes);
if (payload.StartsWith("blob:v1:", StringComparison.Ordinal))
{
throw new InvalidOperationException("The activity received a payload token instead of the resolved payload.");
}
return payload;
}
[Function("Hello")]
public static async Task<HttpResponseData> Hello(
[HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequestData req)
{
HttpResponseData response = req.CreateResponse(HttpStatusCode.OK);
await response.WriteStringAsync("LargePayload sample is running. POST /api/StartLargePayload to start a >1 MB orchestration.");
return response;
}
private static string CreatePayload(int payloadSizeBytes)
{
// Use a deterministic, low-compressibility payload so stored blob sizes stay representative.
return string.Create(payloadSizeBytes, 0x00C0FFEEu, static (span, seed) =>
{
const string Alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
uint state = seed;
for (int i = 0; i < span.Length; i++)
{
state = (state * 1664525) + 1013904223;
span[i] = Alphabet[(int)(state >> 26)];
}
});
}
private static int GetPositiveIntSetting(string key, int defaultValue)
{
string? rawValue = Environment.GetEnvironmentVariable(key);
if (string.IsNullOrWhiteSpace(rawValue))
{
return defaultValue;
}
if (!int.TryParse(rawValue, out int parsedValue) || parsedValue <= 0)
{
throw new InvalidOperationException($"Environment variable '{key}' must be a positive integer. Value: {rawValue}");
}
return parsedValue;
}
private static int GetUtf8ByteCount(string payload) => Encoding.UTF8.GetByteCount(payload);
}
public sealed record LargePayloadRequest(string Payload, int RequestedPayloadBytes);
public sealed record LargePayloadSummary(
int RequestedPayloadBytes,
int OrchestrationInputBytes,
int ActivityOutputBytes,
bool ExceededOneMiB,
bool PayloadsMatch);