-
Notifications
You must be signed in to change notification settings - Fork 503
Expand file tree
/
Copy pathDurableContext.cs
More file actions
181 lines (159 loc) · 7.31 KB
/
Copy pathDurableContext.cs
File metadata and controls
181 lines (159 loc) · 7.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
using Amazon.Lambda.Core;
using Amazon.Lambda.DurableExecution.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace Amazon.Lambda.DurableExecution;
/// <summary>
/// Implementation of <see cref="IDurableContext"/>. Constructs and dispatches
/// per-operation classes (<see cref="StepOperation{T}"/>, <see cref="WaitOperation"/>);
/// the replay logic lives in those classes.
/// </summary>
internal sealed class DurableContext : IDurableContext
{
private readonly ExecutionState _state;
private readonly TerminationManager _terminationManager;
private readonly OperationIdGenerator _idGenerator;
private readonly string _durableExecutionArn;
private readonly CheckpointBatcher? _batcher;
public DurableContext(
ExecutionState state,
TerminationManager terminationManager,
OperationIdGenerator idGenerator,
string durableExecutionArn,
ILambdaContext lambdaContext,
CheckpointBatcher? batcher = null)
{
_state = state;
_terminationManager = terminationManager;
_idGenerator = idGenerator;
_durableExecutionArn = durableExecutionArn;
_batcher = batcher;
LambdaContext = lambdaContext;
}
// Replay-safe logger ships in a follow-up PR; see IDurableContext.Logger doc.
public ILogger Logger => NullLogger.Instance;
public IExecutionContext ExecutionContext => new DurableExecutionContext(_durableExecutionArn);
public ILambdaContext LambdaContext { get; }
public Task<T> StepAsync<T>(
Func<IStepContext, Task<T>> func,
string? name = null,
StepConfig? config = null,
CancellationToken cancellationToken = default)
=> RunStep(func, name, config, cancellationToken);
public async Task StepAsync(
Func<IStepContext, Task> func,
string? name = null,
StepConfig? config = null,
CancellationToken cancellationToken = default)
{
// Void steps don't carry a meaningful payload — wrap with an object?-typed
// step that always returns null. The serializer isn't actually invoked
// with a non-null value, so any registered ILambdaSerializer suffices.
await RunStep<object?>(
async (ctx) => { await func(ctx); return null; },
name, config, cancellationToken);
}
private Task<T> RunStep<T>(
Func<IStepContext, Task<T>> func,
string? name,
StepConfig? config,
CancellationToken cancellationToken)
{
var serializer = LambdaContext.Serializer
?? throw new InvalidOperationException(
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
"In the class library programming model, register one with " +
"[assembly: LambdaSerializer(typeof(...))]. In an executable / custom " +
"runtime, pass it to LambdaBootstrapBuilder.Create(handler, serializer). " +
"In tests, set TestLambdaContext.Serializer.");
var operationId = _idGenerator.NextId();
var op = new StepOperation<T>(
operationId, name, _idGenerator.ParentId, func, config, serializer, Logger,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
public Task WaitAsync(
TimeSpan duration,
string? name = null,
CancellationToken cancellationToken = default)
{
// Service timer granularity is 1 second; sub-second waits would round to 0.
// WaitOptions.WaitSeconds is integer in [1, 31_622_400] (1 second to ~1 year).
if (duration < TimeSpan.FromSeconds(1))
throw new ArgumentOutOfRangeException(nameof(duration), duration, "Wait duration must be at least 1 second.");
if (duration > TimeSpan.FromSeconds(31_622_400))
throw new ArgumentOutOfRangeException(nameof(duration), duration, "Wait duration must be at most 31,622,400 seconds (~1 year).");
cancellationToken.ThrowIfCancellationRequested();
var operationId = _idGenerator.NextId();
var waitSeconds = (int)Math.Max(1, Math.Ceiling(duration.TotalSeconds));
var op = new WaitOperation(
operationId, name, _idGenerator.ParentId, waitSeconds,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
public Task<T> RunInChildContextAsync<T>(
Func<IDurableContext, Task<T>> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
=> RunChildContext(func, name, config, cancellationToken);
public async Task RunInChildContextAsync(
Func<IDurableContext, Task> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
{
// Void child contexts don't carry a meaningful payload; the wrapper
// returns null so the registered ILambdaSerializer is never asked to
// serialize a real value.
await RunChildContext<object?>(
async (ctx) => { await func(ctx); return null; },
name, config, cancellationToken);
}
private Task<T> RunChildContext<T>(
Func<IDurableContext, Task<T>> func,
string? name,
ChildContextConfig? config,
CancellationToken cancellationToken)
{
var serializer = LambdaContext.Serializer
?? throw new InvalidOperationException(
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
"In the class library programming model, register one with " +
"[assembly: LambdaSerializer(typeof(...))]. In an executable / custom " +
"runtime, pass it to LambdaBootstrapBuilder.Create(handler, serializer). " +
"In tests, set TestLambdaContext.Serializer.");
var operationId = _idGenerator.NextId();
// Capture this DurableContext's collaborators; the child shares state,
// termination, batcher, ARN, and Lambda context — but uses a child
// OperationIdGenerator so its operation IDs are deterministically
// namespaced under the parent op ID.
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
_durableExecutionArn, LambdaContext, _batcher);
var op = new ChildContextOperation<T>(
operationId, name, _idGenerator.ParentId, func, config, serializer, ChildFactory,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
}
internal sealed class DurableExecutionContext : IExecutionContext
{
public DurableExecutionContext(string durableExecutionArn)
{
DurableExecutionArn = durableExecutionArn;
}
public string DurableExecutionArn { get; }
}
internal sealed class StepContext : IStepContext
{
public StepContext(string operationId, int attemptNumber, ILogger logger)
{
OperationId = operationId;
AttemptNumber = attemptNumber;
Logger = logger;
}
public ILogger Logger { get; }
public int AttemptNumber { get; }
public string OperationId { get; }
}