Skip to content

Commit fe03624

Browse files
committed
Add durable execution Step + Wait end-to-end
Implements the minimum viable slice of the Amazon.Lambda.DurableExecution SDK: a workflow can run StepAsync and WaitAsync against a real Lambda, with replay-aware checkpointing wired through to the AWS service. Public API surface introduced: - DurableFunction.WrapAsync — entry point that handles the durable execution envelope (input hydration, output construction, status mapping) - IDurableContext.StepAsync / WaitAsync (4 Step overloads, 1 Wait) - StepConfig with retry strategy, semantics, and serializer hooks - IRetryStrategy + ExponentialRetryStrategy + retry decision factories - ICheckpointSerializer + DefaultJsonCheckpointSerializer - [DurableExecution] attribute (recognized by future source generator) - DurableExecutionException base + StepException Internals: - DurableExecutionHandler — Task.WhenAny race between user code and the suspension signal, returning Succeeded/Failed/Pending - ExecutionState — replay-aware operation lookup and pending checkpoint buffer - OperationIdGenerator — deterministic, replay-stable IDs - TerminationManager — TaskCompletionSource-based suspension trigger - LambdaDurableServiceClient — wraps AWSSDK.Lambda's checkpoint and state APIs Tests: - 86 unit tests covering enums, exceptions, models, configs, retry, ID generation, termination, execution state, the handler race, the context (Step + Wait paths), and the WrapAsync entry point - 8 end-to-end integration tests deploying real Lambdas via Docker on the provided.al2023 runtime: StepWaitStep, MultipleSteps, WaitOnly, LongerWait, ReplayDeterminism, RetrySucceeds, RetryExhausts, StepFails Out of scope (follow-up PRs): - Callbacks, InvokeAsync, ParallelAsync, MapAsync, RunInChildContextAsync, WaitForConditionAsync — interface intentionally does not declare them - DurableLogger replay-suppression - Annotations source-generator integration - DurableTestRunner / Amazon.Lambda.DurableExecution.Testing package - dotnet new lambda.DurableFunction blueprint stack-info: PR: #2360, branch: GarrettBeatty/stack/2
1 parent 040265a commit fe03624

66 files changed

Lines changed: 4742 additions & 18 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Libraries/src/Amazon.Lambda.DurableExecution/Amazon.Lambda.DurableExecution.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
<EnableTrimAnalyzer>true</EnableTrimAnalyzer>
1515
<Nullable>enable</Nullable>
1616
<ImplicitUsings>enable</ImplicitUsings>
17+
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
18+
<WarningsAsErrors>IL2026,IL2067,IL2075,IL3050</WarningsAsErrors>
1719
</PropertyGroup>
1820

1921
<ItemGroup>

Libraries/src/Amazon.Lambda.DurableExecution/AssemblyMarker.cs

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Configuration for step execution.
5+
/// </summary>
6+
public class StepConfig
7+
{
8+
/// <summary>
9+
/// Custom serializer for the step result. Default is System.Text.Json.
10+
/// </summary>
11+
public ICheckpointSerializer? Serializer { get; set; }
12+
13+
// TODO: Retry support is deferred to a follow-up PR. When added, this is
14+
// where RetryStrategy and Semantics (AtLeastOncePerRetry / AtMostOncePerRetry)
15+
// will live. The follow-up needs to use service-mediated retries (checkpoint
16+
// a RETRY operation + suspend the Lambda) rather than an in-process Task.Delay
17+
// loop, to match the Python and JS reference SDKs and to avoid billing Lambda
18+
// compute time during retry backoff.
19+
}
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
using System.Diagnostics.CodeAnalysis;
2+
using System.Text.Json;
3+
using Amazon.Lambda.Core;
4+
using Amazon.Lambda.DurableExecution.Internal;
5+
using Microsoft.Extensions.Logging;
6+
using Microsoft.Extensions.Logging.Abstractions;
7+
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;
8+
using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
9+
using SdkWaitOptions = Amazon.Lambda.Model.WaitOptions;
10+
11+
namespace Amazon.Lambda.DurableExecution;
12+
13+
/// <summary>
14+
/// Core implementation of IDurableContext providing replay-aware step
15+
/// execution and waits.
16+
/// </summary>
17+
internal sealed class DurableContext : IDurableContext
18+
{
19+
private readonly ExecutionState _state;
20+
private readonly TerminationManager _terminationManager;
21+
private readonly OperationIdGenerator _idGenerator;
22+
private readonly string _durableExecutionArn;
23+
24+
public DurableContext(
25+
ExecutionState state,
26+
TerminationManager terminationManager,
27+
OperationIdGenerator idGenerator,
28+
string durableExecutionArn,
29+
ILambdaContext lambdaContext)
30+
{
31+
_state = state;
32+
_terminationManager = terminationManager;
33+
_idGenerator = idGenerator;
34+
_durableExecutionArn = durableExecutionArn;
35+
LambdaContext = lambdaContext;
36+
}
37+
38+
public ILogger Logger => NullLogger.Instance;
39+
public IExecutionContext ExecutionContext => new DurableExecutionContext(_durableExecutionArn);
40+
public ILambdaContext LambdaContext { get; }
41+
42+
#region StepAsync
43+
44+
public async Task<T> StepAsync<T>(
45+
Func<IStepContext, Task<T>> func,
46+
string? name = null,
47+
StepConfig? config = null,
48+
CancellationToken cancellationToken = default)
49+
{
50+
var operationId = _idGenerator.NextId(name);
51+
52+
var existingOp = _state.GetOperation(operationId, OperationAction.Succeed);
53+
if (existingOp != null)
54+
{
55+
_state.MarkVisited(operationId, wasReplayed: true);
56+
return DeserializeResult<T>(existingOp.StepDetails?.Result, config?.Serializer, operationId);
57+
}
58+
59+
var failedOp = _state.GetOperation(operationId, OperationAction.Fail);
60+
if (failedOp != null)
61+
{
62+
_state.MarkVisited(operationId, wasReplayed: true);
63+
throw CreateStepException(failedOp);
64+
}
65+
66+
_state.MarkVisited(operationId, wasReplayed: false);
67+
68+
cancellationToken.ThrowIfCancellationRequested();
69+
70+
try
71+
{
72+
var stepContext = new StepContext(operationId, attemptNumber: 1, Logger);
73+
var result = await func(stepContext);
74+
75+
var serializedResult = SerializeResult(result, config?.Serializer, operationId);
76+
_state.AddPendingCheckpoint(new SdkOperationUpdate
77+
{
78+
Id = operationId,
79+
Type = OperationTypes.Step,
80+
Action = "SUCCEED",
81+
SubType = "Step",
82+
Name = name,
83+
Payload = serializedResult
84+
});
85+
86+
return result;
87+
}
88+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
89+
{
90+
throw;
91+
}
92+
catch (Exception ex)
93+
{
94+
_state.AddPendingCheckpoint(new SdkOperationUpdate
95+
{
96+
Id = operationId,
97+
Type = OperationTypes.Step,
98+
Action = "FAIL",
99+
SubType = "Step",
100+
Name = name,
101+
Error = ToSdkError(ex)
102+
});
103+
104+
throw new StepException(ex.Message, ex)
105+
{
106+
ErrorType = ex.GetType().FullName,
107+
ErrorData = ex.Message
108+
};
109+
}
110+
}
111+
112+
public async Task StepAsync(
113+
Func<IStepContext, Task> func,
114+
string? name = null,
115+
StepConfig? config = null,
116+
CancellationToken cancellationToken = default)
117+
{
118+
await StepAsync<object?>(
119+
async (ctx) => { await func(ctx); return null; },
120+
name, config, cancellationToken);
121+
}
122+
123+
public async Task<T> StepAsync<T>(
124+
Func<Task<T>> func,
125+
string? name = null,
126+
StepConfig? config = null,
127+
CancellationToken cancellationToken = default)
128+
{
129+
return await StepAsync<T>(
130+
async (_) => await func(),
131+
name, config, cancellationToken);
132+
}
133+
134+
public async Task StepAsync(
135+
Func<Task> func,
136+
string? name = null,
137+
StepConfig? config = null,
138+
CancellationToken cancellationToken = default)
139+
{
140+
await StepAsync<object?>(
141+
async (_) => { await func(); return null; },
142+
name, config, cancellationToken);
143+
}
144+
145+
#endregion
146+
147+
#region WaitAsync
148+
149+
public Task WaitAsync(
150+
TimeSpan duration,
151+
string? name = null,
152+
CancellationToken cancellationToken = default)
153+
{
154+
if (duration < TimeSpan.FromSeconds(1))
155+
throw new ArgumentOutOfRangeException(nameof(duration), "Wait duration must be at least 1 second.");
156+
157+
cancellationToken.ThrowIfCancellationRequested();
158+
159+
var operationId = _idGenerator.NextId(name);
160+
var waitSeconds = (int)Math.Max(1, Math.Ceiling(duration.TotalSeconds));
161+
162+
var startedOp = _state.GetOperation(operationId, OperationAction.Start);
163+
if (startedOp?.WaitDetails?.ScheduledEndTimestamp is { } expiresAtMs)
164+
{
165+
if (DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() >= expiresAtMs)
166+
{
167+
_state.MarkVisited(operationId, wasReplayed: true);
168+
return Task.CompletedTask;
169+
}
170+
}
171+
172+
var succeededOp = _state.GetOperation(operationId, OperationAction.Succeed);
173+
if (succeededOp != null)
174+
{
175+
_state.MarkVisited(operationId, wasReplayed: true);
176+
return Task.CompletedTask;
177+
}
178+
179+
_state.MarkVisited(operationId, wasReplayed: false);
180+
181+
_state.AddPendingCheckpoint(new SdkOperationUpdate
182+
{
183+
Id = operationId,
184+
Type = OperationTypes.Wait,
185+
Action = "START",
186+
SubType = "Wait",
187+
Name = name,
188+
WaitOptions = new SdkWaitOptions { WaitSeconds = waitSeconds }
189+
});
190+
191+
_terminationManager.Terminate(TerminationReason.WaitScheduled, $"wait:{name ?? operationId}");
192+
193+
return new TaskCompletionSource<object?>().Task;
194+
}
195+
196+
#endregion
197+
198+
#region Helpers
199+
200+
[UnconditionalSuppressMessage("Trimming", "IL2026",
201+
Justification = "Default reflection-based path used in JIT mode. AOT users supply a trim-safe ICheckpointSerializer<T> via StepConfig.Serializer to avoid this code path.")]
202+
[UnconditionalSuppressMessage("AOT", "IL3050",
203+
Justification = "Default reflection-based path used in JIT mode. AOT users supply a trim-safe ICheckpointSerializer<T> via StepConfig.Serializer to avoid this code path.")]
204+
private T DeserializeResult<T>(string? serialized, ICheckpointSerializer? customSerializer, string operationId)
205+
{
206+
if (serialized == null)
207+
return default!;
208+
209+
if (customSerializer is ICheckpointSerializer<T> typed)
210+
return typed.Deserialize(serialized, new SerializationContext(operationId, _durableExecutionArn));
211+
212+
return JsonSerializer.Deserialize<T>(serialized)!;
213+
}
214+
215+
[UnconditionalSuppressMessage("Trimming", "IL2026",
216+
Justification = "Default reflection-based path used in JIT mode. AOT users supply a trim-safe ICheckpointSerializer<T> via StepConfig.Serializer to avoid this code path.")]
217+
[UnconditionalSuppressMessage("AOT", "IL3050",
218+
Justification = "Default reflection-based path used in JIT mode. AOT users supply a trim-safe ICheckpointSerializer<T> via StepConfig.Serializer to avoid this code path.")]
219+
private string SerializeResult<T>(T value, ICheckpointSerializer? customSerializer, string operationId)
220+
{
221+
if (value == null)
222+
return "null";
223+
224+
if (customSerializer is ICheckpointSerializer<T> typed)
225+
return typed.Serialize(value, new SerializationContext(operationId, _durableExecutionArn));
226+
227+
return JsonSerializer.Serialize(value);
228+
}
229+
230+
private static StepException CreateStepException(Operation failedOp)
231+
{
232+
var err = failedOp.StepDetails?.Error;
233+
return new StepException(err?.ErrorMessage ?? "Step failed")
234+
{
235+
ErrorType = err?.ErrorType,
236+
ErrorData = err?.ErrorData,
237+
OriginalStackTrace = err?.StackTrace
238+
};
239+
}
240+
241+
private static SdkErrorObject ToSdkError(Exception ex) => new()
242+
{
243+
ErrorType = ex.GetType().FullName,
244+
ErrorMessage = ex.Message,
245+
StackTrace = ex.StackTrace?.Split('\n', StringSplitOptions.RemoveEmptyEntries).ToList()
246+
};
247+
248+
#endregion
249+
}
250+
251+
internal sealed class DurableExecutionContext : IExecutionContext
252+
{
253+
public DurableExecutionContext(string durableExecutionArn)
254+
{
255+
DurableExecutionArn = durableExecutionArn;
256+
}
257+
258+
public string DurableExecutionArn { get; }
259+
}
260+
261+
internal sealed class StepContext : IStepContext
262+
{
263+
public StepContext(string operationId, int attemptNumber, ILogger logger)
264+
{
265+
OperationId = operationId;
266+
AttemptNumber = attemptNumber;
267+
Logger = logger;
268+
}
269+
270+
public ILogger Logger { get; }
271+
public int AttemptNumber { get; }
272+
public string OperationId { get; }
273+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Marks a Lambda function as a durable execution handler. When used with
5+
/// [LambdaFunction] from Amazon.Lambda.Annotations, the source generator
6+
/// produces the handler wrapper and CloudFormation DurableConfig automatically.
7+
/// </summary>
8+
[AttributeUsage(AttributeTargets.Method, AllowMultiple = false)]
9+
public sealed class DurableExecutionAttribute : Attribute
10+
{
11+
/// <summary>
12+
/// Maximum execution timeout in seconds. Default is service-determined.
13+
/// </summary>
14+
public int ExecutionTimeout { get; set; }
15+
16+
/// <summary>
17+
/// Number of days to retain execution history. Default is service-determined.
18+
/// </summary>
19+
public int RetentionPeriodInDays { get; set; }
20+
21+
/// <summary>
22+
/// Name of a field or property on the class that provides a custom IAmazonLambda client.
23+
/// When not specified, the generated code creates a default AmazonLambdaClient.
24+
/// </summary>
25+
public string? LambdaClientFactory { get; set; }
26+
}

0 commit comments

Comments
 (0)