Skip to content

Commit f821fe9

Browse files
committed
Add durable execution Step + Wait end-to-end
Implements the minimum viable slice of the Amazon.Lambda.DurableExecution SDK: a workflow can run StepAsync and WaitAsync against a real Lambda, with replay-aware checkpointing wired through to the AWS service. Public API surface introduced: - DurableFunction.WrapAsync — entry point that handles the durable execution envelope (input hydration, output construction, status mapping) - IDurableContext.StepAsync / WaitAsync (4 Step overloads, 1 Wait) - StepConfig with serializer hook (retry deferred to follow-up PR) - ICheckpointSerializer interface - [DurableExecution] attribute (recognized by future source generator) - DurableExecutionException base + StepException Internals: - DurableExecutionHandler — Task.WhenAny race between user code and the suspension signal, returning Succeeded/Failed/Pending - ExecutionState — replay-aware operation lookup and pending checkpoint buffer - OperationIdGenerator — deterministic, replay-stable IDs - TerminationManager — TaskCompletionSource-based suspension trigger - LambdaDurableServiceClient — wraps AWSSDK.Lambda's checkpoint and state APIs Tests: - 86 unit tests covering enums, exceptions, models, configs, ID generation, termination, execution state, the handler race, the context (Step + Wait paths), and the WrapAsync entry point - 8 end-to-end integration tests deploying real Lambdas via Docker on the provided.al2023 runtime: StepWaitStep, MultipleSteps, WaitOnly, LongerWait, ReplayDeterminism, RetrySucceeds, RetryExhausts, StepFails Out of scope (follow-up PRs): - IRetryStrategy, ExponentialRetryStrategy, retry decision factories - DefaultJsonCheckpointSerializer - DurableLogger replay-suppression (currently returns NullLogger) - Callbacks, InvokeAsync, ParallelAsync, MapAsync, RunInChildContextAsync, WaitForConditionAsync — interface intentionally does not declare them - Annotations source-generator integration - DurableTestRunner / Amazon.Lambda.DurableExecution.Testing package - dotnet new lambda.DurableFunction blueprint stack-info: PR: #2360, branch: GarrettBeatty/stack/2
1 parent a90329b commit f821fe9

67 files changed

Lines changed: 4930 additions & 22 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Docs/durable-execution-design.md

Lines changed: 149 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,43 @@ private async Task MyWorkflow(OrderEvent input, IDurableContext context)
250250
}
251251
```
252252

253+
For **NativeAOT** deployments, pass a `JsonSerializerContext` so the SDK can serialize/deserialize your input and output types without reflection:
254+
255+
```csharp
256+
[JsonSerializable(typeof(OrderEvent))]
257+
[JsonSerializable(typeof(OrderResult))]
258+
internal partial class MyJsonContext : JsonSerializerContext { }
259+
260+
public class Function
261+
{
262+
public Task<DurableExecutionInvocationOutput> FunctionHandler(
263+
DurableExecutionInvocationInput invocationInput, ILambdaContext context)
264+
=> DurableFunction.WrapAsync<OrderEvent, OrderResult>(
265+
MyWorkflow, invocationInput, context, MyJsonContext.Default);
266+
267+
private async Task<OrderResult> MyWorkflow(OrderEvent input, IDurableContext context)
268+
{
269+
// ...
270+
}
271+
}
272+
```
273+
274+
To inject a custom `IAmazonLambda` client (e.g., for VPC endpoints or unit testing), use the overload that accepts one:
275+
276+
```csharp
277+
public class Function
278+
{
279+
private readonly IAmazonLambda _lambdaClient;
280+
281+
public Function(IAmazonLambda lambdaClient) => _lambdaClient = lambdaClient;
282+
283+
public Task<DurableExecutionInvocationOutput> FunctionHandler(
284+
DurableExecutionInvocationInput invocationInput, ILambdaContext context)
285+
=> DurableFunction.WrapAsync<OrderEvent, OrderResult>(
286+
MyWorkflow, invocationInput, context, _lambdaClient);
287+
}
288+
```
289+
253290
You'd also need to manually configure the CloudFormation template with `DurableConfig` and managed policies:
254291

255292
```json
@@ -906,21 +943,95 @@ Static helper for the non-Annotations handler path. Wraps a workflow function, h
906943
/// </summary>
907944
public static class DurableFunction
908945
{
946+
// ── Reflection-based overloads (JIT only) ──────────────────────────
947+
909948
/// <summary>
910949
/// Wrap a workflow that takes typed input and returns typed output.
950+
/// Reflection-based JSON — not AOT-safe.
911951
/// </summary>
952+
[RequiresUnreferencedCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
953+
[RequiresDynamicCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
912954
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput, TOutput>(
913955
Func<TInput, IDurableContext, Task<TOutput>> workflow,
914956
DurableExecutionInvocationInput invocationInput,
915957
ILambdaContext lambdaContext);
916958

917959
/// <summary>
918-
/// Wrap a workflow that takes typed input and returns no value.
960+
/// Wrap a workflow (typed input + output) with explicit Lambda client.
961+
/// Reflection-based JSON — not AOT-safe.
919962
/// </summary>
963+
[RequiresUnreferencedCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
964+
[RequiresDynamicCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
965+
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput, TOutput>(
966+
Func<TInput, IDurableContext, Task<TOutput>> workflow,
967+
DurableExecutionInvocationInput invocationInput,
968+
ILambdaContext lambdaContext,
969+
IAmazonLambda lambdaClient);
970+
971+
/// <summary>
972+
/// Wrap a void workflow (typed input, no output).
973+
/// Reflection-based JSON — not AOT-safe.
974+
/// </summary>
975+
[RequiresUnreferencedCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
976+
[RequiresDynamicCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
920977
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput>(
921978
Func<TInput, IDurableContext, Task> workflow,
922979
DurableExecutionInvocationInput invocationInput,
923980
ILambdaContext lambdaContext);
981+
982+
/// <summary>
983+
/// Wrap a void workflow with explicit Lambda client.
984+
/// Reflection-based JSON — not AOT-safe.
985+
/// </summary>
986+
[RequiresUnreferencedCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
987+
[RequiresDynamicCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
988+
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput>(
989+
Func<TInput, IDurableContext, Task> workflow,
990+
DurableExecutionInvocationInput invocationInput,
991+
ILambdaContext lambdaContext,
992+
IAmazonLambda lambdaClient);
993+
994+
// ── AOT-safe overloads (caller supplies JsonSerializerContext) ──────
995+
996+
/// <summary>
997+
/// Wrap a workflow (typed input + output). AOT-safe — requires
998+
/// [JsonSerializable(typeof(TInput))] and [JsonSerializable(typeof(TOutput))]
999+
/// on the supplied jsonContext.
1000+
/// </summary>
1001+
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput, TOutput>(
1002+
Func<TInput, IDurableContext, Task<TOutput>> workflow,
1003+
DurableExecutionInvocationInput invocationInput,
1004+
ILambdaContext lambdaContext,
1005+
JsonSerializerContext jsonContext);
1006+
1007+
/// <summary>
1008+
/// Wrap a workflow (typed input + output) with explicit Lambda client. AOT-safe.
1009+
/// </summary>
1010+
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput, TOutput>(
1011+
Func<TInput, IDurableContext, Task<TOutput>> workflow,
1012+
DurableExecutionInvocationInput invocationInput,
1013+
ILambdaContext lambdaContext,
1014+
IAmazonLambda lambdaClient,
1015+
JsonSerializerContext jsonContext);
1016+
1017+
/// <summary>
1018+
/// Wrap a void workflow (typed input, no output). AOT-safe.
1019+
/// </summary>
1020+
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput>(
1021+
Func<TInput, IDurableContext, Task> workflow,
1022+
DurableExecutionInvocationInput invocationInput,
1023+
ILambdaContext lambdaContext,
1024+
JsonSerializerContext jsonContext);
1025+
1026+
/// <summary>
1027+
/// Wrap a void workflow with explicit Lambda client. AOT-safe.
1028+
/// </summary>
1029+
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput>(
1030+
Func<TInput, IDurableContext, Task> workflow,
1031+
DurableExecutionInvocationInput invocationInput,
1032+
ILambdaContext lambdaContext,
1033+
IAmazonLambda lambdaClient,
1034+
JsonSerializerContext jsonContext);
9241035
}
9251036
```
9261037

@@ -968,6 +1079,24 @@ public interface IDurableContext
9681079
StepConfig? config = null,
9691080
CancellationToken cancellationToken = default);
9701081

1082+
/// <summary>
1083+
/// Execute a step (simplified overload without IStepContext).
1084+
/// </summary>
1085+
Task<T> StepAsync<T>(
1086+
Func<Task<T>> func,
1087+
string? name = null,
1088+
StepConfig? config = null,
1089+
CancellationToken cancellationToken = default);
1090+
1091+
/// <summary>
1092+
/// Execute a step that returns no value (simplified overload).
1093+
/// </summary>
1094+
Task StepAsync(
1095+
Func<Task> func,
1096+
string? name = null,
1097+
StepConfig? config = null,
1098+
CancellationToken cancellationToken = default);
1099+
9711100
/// <summary>
9721101
/// Suspend execution for the specified duration.
9731102
/// Throws ArgumentOutOfRangeException if duration is less than 1 second.
@@ -1579,13 +1708,29 @@ Both approaches produce a self-contained executable that the Lambda custom runti
15791708

15801709
### NativeAOT compatibility
15811710

1582-
The SDK is AOT-friendly but does not require AOT. The default JSON serialization uses reflection (standard `System.Text.Json` behavior), which works in JIT mode. For NativeAOT deployments, provide a `JsonSerializerContext` via the `ICheckpointSerializer<T>` interface — this avoids all runtime reflection and is fully trim-safe. The SDK itself avoids `Activator.CreateInstance`, `Type.GetType()`, and other reflection patterns, and uses `[DynamicallyAccessedMembers]` trimming annotations where needed.
1711+
The SDK is AOT-friendly but does not require AOT. The default JSON serialization uses reflection (standard `System.Text.Json` behavior), which works in JIT mode. For NativeAOT deployments, AOT safety is addressed at two levels:
1712+
1713+
1. **Entry point (`DurableFunction.WrapAsync`)** — pass a `JsonSerializerContext` that includes type info for your `TInput` and `TOutput` types. The reflection-based overloads are annotated with `[RequiresUnreferencedCode]` / `[RequiresDynamicCode]` so the trimmer will warn if you use them in AOT builds.
1714+
1715+
2. **Step checkpoints (`StepConfig.Serializer`)** — for custom or complex types checkpointed by `StepAsync`, provide an `ICheckpointSerializer` via `StepConfig` (e.g., `JsonCheckpointSerializer<T>` backed by a source-generated `JsonTypeInfo<T>`).
1716+
1717+
The SDK itself avoids `Activator.CreateInstance`, `Type.GetType()`, and other reflection patterns, and uses `[DynamicallyAccessedMembers]` trimming annotations where needed.
15831718

15841719
```csharp
15851720
// Default: works with reflection (JIT mode)
15861721
var result = await context.StepAsync<Order>(async () => await GetOrder());
15871722

1588-
// AOT mode: user provides serialization context
1723+
// AOT mode — entry point: pass JsonSerializerContext to WrapAsync
1724+
[JsonSerializable(typeof(OrderEvent))]
1725+
[JsonSerializable(typeof(OrderResult))]
1726+
internal partial class MyJsonContext : JsonSerializerContext { }
1727+
1728+
public Task<DurableExecutionInvocationOutput> FunctionHandler(
1729+
DurableExecutionInvocationInput invocationInput, ILambdaContext context)
1730+
=> DurableFunction.WrapAsync<OrderEvent, OrderResult>(
1731+
MyWorkflow, invocationInput, context, MyJsonContext.Default);
1732+
1733+
// AOT mode — step checkpoint: provide a serializer via StepConfig
15891734
var result = await context.StepAsync(
15901735
async () => await GetOrder(),
15911736
config: new StepConfig { Serializer = new JsonCheckpointSerializer<Order>(MyJsonContext.Default.Order) });
@@ -1701,7 +1846,7 @@ public class Functions
17011846
}
17021847
```
17031848

1704-
When no `LambdaClientFactory` is specified, the generated code creates a default `AmazonLambdaClient`. For the manual handler path, pass the client directly to `DurableExecutionHandler.RunAsync`.
1849+
When no `LambdaClientFactory` is specified, the generated code creates a default `AmazonLambdaClient`. For the manual handler path (`DurableFunction.WrapAsync`), pass the client directly via the `IAmazonLambda lambdaClient` overload.
17051850

17061851
> **Dependency boundaries:** `Amazon.Lambda.Annotations` has **no dependency** on the AWS SDK or on `Amazon.Lambda.DurableExecution`. The Annotations source generator references durable execution types by fully-qualified name strings only — it never takes a compile-time dependency on the durable package. The `[DurableExecution]` attribute is defined in `Amazon.Lambda.DurableExecution`, and the generated code resolves against the user's project references. There is only one source generator (Annotations) — no coordination between multiple generators is needed.
17071852

Libraries/src/Amazon.Lambda.DurableExecution/Amazon.Lambda.DurableExecution.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
<EnableTrimAnalyzer>true</EnableTrimAnalyzer>
1515
<Nullable>enable</Nullable>
1616
<ImplicitUsings>enable</ImplicitUsings>
17+
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
18+
<WarningsAsErrors>IL2026,IL2067,IL2075,IL3050</WarningsAsErrors>
1719
</PropertyGroup>
1820

1921
<ItemGroup>

Libraries/src/Amazon.Lambda.DurableExecution/AssemblyMarker.cs

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Configuration for step execution.
5+
/// </summary>
6+
public class StepConfig
7+
{
8+
/// <summary>
9+
/// Custom serializer for the step result. Default is System.Text.Json.
10+
/// </summary>
11+
public ICheckpointSerializer? Serializer { get; set; }
12+
13+
// TODO: Retry support is deferred to a follow-up PR. When added, this is
14+
// where RetryStrategy and Semantics (AtLeastOncePerRetry / AtMostOncePerRetry)
15+
// will live. The follow-up needs to use service-mediated retries (checkpoint
16+
// a RETRY operation + suspend the Lambda) rather than an in-process Task.Delay
17+
// loop, to match the Python and JS reference SDKs and to avoid billing Lambda
18+
// compute time during retry backoff.
19+
}

0 commit comments

Comments
 (0)