Skip to content

Commit 55db890

Browse files
committed
Add durable execution Step + Wait end-to-end
Implements the minimum viable slice of the Amazon.Lambda.DurableExecution SDK: a workflow can run StepAsync and WaitAsync against a real Lambda, with replay-aware checkpointing wired through to the AWS service. Public API surface introduced: - DurableFunction.WrapAsync — entry point that handles the durable execution envelope (input hydration, output construction, status mapping) - IDurableContext.StepAsync / WaitAsync (4 Step overloads, 1 Wait) - StepConfig with retry strategy, semantics, and serializer hooks - IRetryStrategy + ExponentialRetryStrategy + retry decision factories - ICheckpointSerializer + DefaultJsonCheckpointSerializer - [DurableExecution] attribute (recognized by future source generator) - DurableExecutionException base + StepException Internals: - DurableExecutionHandler — Task.WhenAny race between user code and the suspension signal, returning Succeeded/Failed/Pending - ExecutionState — replay-aware operation lookup and pending checkpoint buffer - OperationIdGenerator — deterministic, replay-stable IDs - TerminationManager — TaskCompletionSource-based suspension trigger - LambdaDurableServiceClient — wraps AWSSDK.Lambda's checkpoint and state APIs Tests: - 86 unit tests covering enums, exceptions, models, configs, retry, ID generation, termination, execution state, the handler race, the context (Step + Wait paths), and the WrapAsync entry point - 8 end-to-end integration tests deploying real Lambdas via Docker on the provided.al2023 runtime: StepWaitStep, MultipleSteps, WaitOnly, LongerWait, ReplayDeterminism, RetrySucceeds, RetryExhausts, StepFails Out of scope (follow-up PRs): - Callbacks, InvokeAsync, ParallelAsync, MapAsync, RunInChildContextAsync, WaitForConditionAsync — interface intentionally does not declare them - DurableLogger replay-suppression - Annotations source-generator integration - DurableTestRunner / Amazon.Lambda.DurableExecution.Testing package - dotnet new lambda.DurableFunction blueprint
1 parent 9d78744 commit 55db890

66 files changed

Lines changed: 5736 additions & 18 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Libraries/src/Amazon.Lambda.DurableExecution/Amazon.Lambda.DurableExecution.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
<EnableTrimAnalyzer>true</EnableTrimAnalyzer>
1515
<Nullable>enable</Nullable>
1616
<ImplicitUsings>enable</ImplicitUsings>
17+
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
18+
<WarningsAsErrors>IL2026,IL2067,IL2075,IL3050</WarningsAsErrors>
1719
</PropertyGroup>
1820

1921
<ItemGroup>

Libraries/src/Amazon.Lambda.DurableExecution/AssemblyMarker.cs

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Configuration for step execution.
5+
/// </summary>
6+
public class StepConfig
7+
{
8+
/// <summary>
9+
/// Custom serializer for the step result. Default is System.Text.Json.
10+
/// </summary>
11+
public ICheckpointSerializer? Serializer { get; set; }
12+
}
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
using System.Diagnostics.CodeAnalysis;
2+
using System.Text.Json;
3+
using Amazon.Lambda.Core;
4+
using Amazon.Lambda.DurableExecution.Internal;
5+
using Microsoft.Extensions.Logging;
6+
using Microsoft.Extensions.Logging.Abstractions;
7+
8+
namespace Amazon.Lambda.DurableExecution;
9+
10+
/// <summary>
11+
/// Core implementation of IDurableContext providing replay-aware step
12+
/// execution and waits.
13+
/// </summary>
14+
internal sealed class DurableContext : IDurableContext
15+
{
16+
private readonly ExecutionState _state;
17+
private readonly TerminationManager _terminationManager;
18+
private readonly OperationIdGenerator _idGenerator;
19+
private readonly string _durableExecutionArn;
20+
21+
public DurableContext(
22+
ExecutionState state,
23+
TerminationManager terminationManager,
24+
OperationIdGenerator idGenerator,
25+
string durableExecutionArn,
26+
ILambdaContext lambdaContext)
27+
{
28+
_state = state;
29+
_terminationManager = terminationManager;
30+
_idGenerator = idGenerator;
31+
_durableExecutionArn = durableExecutionArn;
32+
LambdaContext = lambdaContext;
33+
}
34+
35+
public ILogger Logger => NullLogger.Instance;
36+
public IExecutionContext ExecutionContext => new DurableExecutionContext(_durableExecutionArn);
37+
public ILambdaContext LambdaContext { get; }
38+
39+
#region StepAsync
40+
41+
public async Task<T> StepAsync<T>(
42+
Func<IStepContext, Task<T>> func,
43+
string? name = null,
44+
StepConfig? config = null,
45+
CancellationToken cancellationToken = default)
46+
{
47+
var operationId = _idGenerator.NextId(name);
48+
49+
var existingRecord = _state.GetOperation(operationId, OperationAction.Succeed);
50+
if (existingRecord != null)
51+
{
52+
_state.MarkVisited(operationId, wasReplayed: true);
53+
return DeserializeResult<T>(existingRecord.Result, config?.Serializer, operationId);
54+
}
55+
56+
var failedRecord = _state.GetOperation(operationId, OperationAction.Fail);
57+
if (failedRecord != null)
58+
{
59+
_state.MarkVisited(operationId, wasReplayed: true);
60+
throw CreateStepException(failedRecord);
61+
}
62+
63+
_state.MarkVisited(operationId, wasReplayed: false);
64+
65+
cancellationToken.ThrowIfCancellationRequested();
66+
67+
try
68+
{
69+
var stepContext = new StepContext(operationId, attemptNumber: 1, Logger);
70+
var result = await func(stepContext);
71+
72+
var serializedResult = SerializeResult(result, config?.Serializer, operationId);
73+
_state.AddPendingCheckpoint(new OperationRecord
74+
{
75+
OperationId = operationId,
76+
Type = OperationType.Step,
77+
Action = OperationAction.Succeed,
78+
Status = OperationStatus.Succeeded,
79+
Name = name,
80+
Result = serializedResult,
81+
Timestamp = DateTimeOffset.UtcNow.ToString("O")
82+
});
83+
84+
return result;
85+
}
86+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
87+
{
88+
throw;
89+
}
90+
catch (Exception ex)
91+
{
92+
_state.AddPendingCheckpoint(new OperationRecord
93+
{
94+
OperationId = operationId,
95+
Type = OperationType.Step,
96+
Action = OperationAction.Fail,
97+
Status = OperationStatus.Failed,
98+
Name = name,
99+
Error = ErrorObject.FromException(ex),
100+
Timestamp = DateTimeOffset.UtcNow.ToString("O")
101+
});
102+
103+
throw new StepException(ex.Message, ex)
104+
{
105+
ErrorType = ex.GetType().FullName,
106+
ErrorData = ex.Message
107+
};
108+
}
109+
}
110+
111+
public async Task StepAsync(
112+
Func<IStepContext, Task> func,
113+
string? name = null,
114+
StepConfig? config = null,
115+
CancellationToken cancellationToken = default)
116+
{
117+
await StepAsync<object?>(
118+
async (ctx) => { await func(ctx); return null; },
119+
name, config, cancellationToken);
120+
}
121+
122+
public async Task<T> StepAsync<T>(
123+
Func<Task<T>> func,
124+
string? name = null,
125+
StepConfig? config = null,
126+
CancellationToken cancellationToken = default)
127+
{
128+
return await StepAsync<T>(
129+
async (_) => await func(),
130+
name, config, cancellationToken);
131+
}
132+
133+
public async Task StepAsync(
134+
Func<Task> func,
135+
string? name = null,
136+
StepConfig? config = null,
137+
CancellationToken cancellationToken = default)
138+
{
139+
await StepAsync<object?>(
140+
async (_) => { await func(); return null; },
141+
name, config, cancellationToken);
142+
}
143+
144+
#endregion
145+
146+
#region WaitAsync
147+
148+
public Task WaitAsync(
149+
TimeSpan duration,
150+
string? name = null,
151+
CancellationToken cancellationToken = default)
152+
{
153+
if (duration < TimeSpan.FromSeconds(1))
154+
throw new ArgumentOutOfRangeException(nameof(duration), "Wait duration must be at least 1 second.");
155+
156+
cancellationToken.ThrowIfCancellationRequested();
157+
158+
var operationId = _idGenerator.NextId(name);
159+
var expiresAt = DateTimeOffset.UtcNow.Add(duration);
160+
161+
var existingRecord = _state.GetOperation(operationId, OperationAction.Start);
162+
if (existingRecord?.ExpiresAt != null)
163+
{
164+
var expiration = DateTimeOffset.Parse(existingRecord.ExpiresAt);
165+
if (DateTimeOffset.UtcNow >= expiration)
166+
{
167+
_state.MarkVisited(operationId, wasReplayed: true);
168+
return Task.CompletedTask;
169+
}
170+
}
171+
172+
var succeedRecord = _state.GetOperation(operationId, OperationAction.Succeed);
173+
if (succeedRecord != null)
174+
{
175+
_state.MarkVisited(operationId, wasReplayed: true);
176+
return Task.CompletedTask;
177+
}
178+
179+
_state.MarkVisited(operationId, wasReplayed: false);
180+
181+
_state.AddPendingCheckpoint(new OperationRecord
182+
{
183+
OperationId = operationId,
184+
Type = OperationType.Wait,
185+
Action = OperationAction.Start,
186+
Status = OperationStatus.Pending,
187+
Name = name,
188+
ExpiresAt = expiresAt.ToString("O"),
189+
Timestamp = DateTimeOffset.UtcNow.ToString("O")
190+
});
191+
192+
_terminationManager.Terminate(TerminationReason.WaitScheduled, $"wait:{name ?? operationId}");
193+
194+
return new TaskCompletionSource<object?>().Task;
195+
}
196+
197+
#endregion
198+
199+
#region Helpers
200+
201+
[UnconditionalSuppressMessage("Trimming", "IL2026",
202+
Justification = "Default reflection-based path used in JIT mode. AOT users supply a trim-safe ICheckpointSerializer<T> via StepConfig.Serializer to avoid this code path.")]
203+
private T DeserializeResult<T>(string? serialized, ICheckpointSerializer? customSerializer, string operationId)
204+
{
205+
if (serialized == null)
206+
return default!;
207+
208+
if (customSerializer is ICheckpointSerializer<T> typed)
209+
return typed.Deserialize(serialized, new SerializationContext(operationId, _durableExecutionArn));
210+
211+
return JsonSerializer.Deserialize<T>(serialized)!;
212+
}
213+
214+
[UnconditionalSuppressMessage("Trimming", "IL2026",
215+
Justification = "Default reflection-based path used in JIT mode. AOT users supply a trim-safe ICheckpointSerializer<T> via StepConfig.Serializer to avoid this code path.")]
216+
private string SerializeResult<T>(T value, ICheckpointSerializer? customSerializer, string operationId)
217+
{
218+
if (value == null)
219+
return "null";
220+
221+
if (customSerializer is ICheckpointSerializer<T> typed)
222+
return typed.Serialize(value, new SerializationContext(operationId, _durableExecutionArn));
223+
224+
return JsonSerializer.Serialize(value);
225+
}
226+
227+
private static StepException CreateStepException(OperationRecord failedRecord)
228+
{
229+
return new StepException(failedRecord.Error?.ErrorMessage ?? "Step failed")
230+
{
231+
ErrorType = failedRecord.Error?.ErrorType,
232+
ErrorData = failedRecord.Error?.ErrorData,
233+
OriginalStackTrace = failedRecord.Error?.StackTrace
234+
};
235+
}
236+
237+
#endregion
238+
}
239+
240+
internal sealed class DurableExecutionContext : IExecutionContext
241+
{
242+
public DurableExecutionContext(string durableExecutionArn)
243+
{
244+
DurableExecutionArn = durableExecutionArn;
245+
}
246+
247+
public string DurableExecutionArn { get; }
248+
}
249+
250+
internal sealed class StepContext : IStepContext
251+
{
252+
public StepContext(string operationId, int attemptNumber, ILogger logger)
253+
{
254+
OperationId = operationId;
255+
AttemptNumber = attemptNumber;
256+
Logger = logger;
257+
}
258+
259+
public ILogger Logger { get; }
260+
public int AttemptNumber { get; }
261+
public string OperationId { get; }
262+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Marks a Lambda function as a durable execution handler. When used with
5+
/// [LambdaFunction] from Amazon.Lambda.Annotations, the source generator
6+
/// produces the handler wrapper and CloudFormation DurableConfig automatically.
7+
/// </summary>
8+
[AttributeUsage(AttributeTargets.Method, AllowMultiple = false)]
9+
public sealed class DurableExecutionAttribute : Attribute
10+
{
11+
/// <summary>
12+
/// Maximum execution timeout in seconds. Default is service-determined.
13+
/// </summary>
14+
public int ExecutionTimeout { get; set; }
15+
16+
/// <summary>
17+
/// Number of days to retain execution history. Default is service-determined.
18+
/// </summary>
19+
public int RetentionPeriodInDays { get; set; }
20+
21+
/// <summary>
22+
/// Name of a field or property on the class that provides a custom IAmazonLambda client.
23+
/// When not specified, the generated code creates a default AmazonLambdaClient.
24+
/// </summary>
25+
public string? LambdaClientFactory { get; set; }
26+
}

0 commit comments

Comments
 (0)