Skip to content

Commit 3be25fa

Browse files
committed
Add durable execution Step + Wait end-to-end
Implements the minimum viable slice of the Amazon.Lambda.DurableExecution SDK: a workflow can run StepAsync and WaitAsync against a real Lambda, with replay-aware checkpointing wired through to the AWS service. Public API surface introduced: - DurableFunction.WrapAsync — entry point that handles the durable execution envelope (input hydration, output construction, status mapping) - IDurableContext.StepAsync / WaitAsync (4 Step overloads, 1 Wait) - StepConfig with serializer hook (retry deferred to follow-up PR) - ICheckpointSerializer interface - [DurableExecution] attribute (recognized by future source generator) - DurableExecutionException base + StepException Internals: - DurableExecutionHandler — Task.WhenAny race between user code and the suspension signal, returning Succeeded/Failed/Pending - ExecutionState — replay-aware operation lookup and pending checkpoint buffer - OperationIdGenerator — deterministic, replay-stable IDs - TerminationManager — TaskCompletionSource-based suspension trigger - LambdaDurableServiceClient — wraps AWSSDK.Lambda's checkpoint and state APIs Tests: - 86 unit tests covering enums, exceptions, models, configs, ID generation, termination, execution state, the handler race, the context (Step + Wait paths), and the WrapAsync entry point - 8 end-to-end integration tests deploying real Lambdas via Docker on the provided.al2023 runtime: StepWaitStep, MultipleSteps, WaitOnly, LongerWait, ReplayDeterminism, RetrySucceeds, RetryExhausts, StepFails Out of scope (follow-up PRs): - IRetryStrategy, ExponentialRetryStrategy, retry decision factories - DefaultJsonCheckpointSerializer - DurableLogger replay-suppression (currently returns NullLogger) - Callbacks, InvokeAsync, ParallelAsync, MapAsync, RunInChildContextAsync, WaitForConditionAsync — interface intentionally does not declare them - Annotations source-generator integration - DurableTestRunner / Amazon.Lambda.DurableExecution.Testing package - dotnet new lambda.DurableFunction blueprint stack-info: PR: #2360, branch: GarrettBeatty/stack/2 remove
1 parent 1fb7f7f commit 3be25fa

66 files changed

Lines changed: 5218 additions & 26 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Docs/durable-execution-design.md

Lines changed: 157 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,43 @@ private async Task MyWorkflow(OrderEvent input, IDurableContext context)
250250
}
251251
```
252252

253+
For **NativeAOT** deployments, pass a `JsonSerializerContext` so the SDK can serialize/deserialize your input and output types without reflection:
254+
255+
```csharp
256+
[JsonSerializable(typeof(OrderEvent))]
257+
[JsonSerializable(typeof(OrderResult))]
258+
internal partial class MyJsonContext : JsonSerializerContext { }
259+
260+
public class Function
261+
{
262+
public Task<DurableExecutionInvocationOutput> FunctionHandler(
263+
DurableExecutionInvocationInput invocationInput, ILambdaContext context)
264+
=> DurableFunction.WrapAsync<OrderEvent, OrderResult>(
265+
MyWorkflow, invocationInput, context, MyJsonContext.Default);
266+
267+
private async Task<OrderResult> MyWorkflow(OrderEvent input, IDurableContext context)
268+
{
269+
// ...
270+
}
271+
}
272+
```
273+
274+
To inject a custom `IAmazonLambda` client (e.g., for VPC endpoints or unit testing), use the overload that accepts one:
275+
276+
```csharp
277+
public class Function
278+
{
279+
private readonly IAmazonLambda _lambdaClient;
280+
281+
public Function(IAmazonLambda lambdaClient) => _lambdaClient = lambdaClient;
282+
283+
public Task<DurableExecutionInvocationOutput> FunctionHandler(
284+
DurableExecutionInvocationInput invocationInput, ILambdaContext context)
285+
=> DurableFunction.WrapAsync<OrderEvent, OrderResult>(
286+
MyWorkflow, invocationInput, context, _lambdaClient);
287+
}
288+
```
289+
253290
You'd also need to manually configure the CloudFormation template with `DurableConfig` and managed policies:
254291

255292
```json
@@ -877,14 +914,15 @@ When `RunAsync` starts, it kicks off two tasks in parallel: user code and a term
877914

878915
The `TerminationManager` is a thin wrapper around `TaskCompletionSource<TerminationResult>`:
879916
- `TerminationTask` -- a Task that hangs forever until `Terminate()` is called
880-
- `Terminate(reason)` -- resolves the TCS, causing the race to pick termination
917+
- `Terminate(reason)` -- resolves the TCS and cancels `CancellationToken`, causing the race to pick termination
918+
- `CancellationToken` -- cancelled when `Terminate()` fires; exposed via `IDurableContext.CancellationToken` so user code can cooperatively stop after suspension
881919

882920
When user code hits a pending wait or callback:
883921
1. It checkpoints the operation state
884922
2. Calls `terminationManager.Terminate(WaitScheduled)`
885923
3. Awaits a new never-completing `TaskCompletionSource` (blocks itself permanently)
886924
4. `Task.WhenAny` sees the termination task resolved and picks it as the winner
887-
5. `RunAsync` returns PENDING; Lambda terminates; the abandoned user task is GC'd
925+
5. `RunAsync` returns PENDING; the `CancellationToken` signals abandoned user code to stop; Lambda terminates
888926

889927
### Lifecycle and cleanup
890928

@@ -906,21 +944,95 @@ Static helper for the non-Annotations handler path. Wraps a workflow function, h
906944
/// </summary>
907945
public static class DurableFunction
908946
{
947+
// ── Reflection-based overloads (JIT only) ──────────────────────────
948+
909949
/// <summary>
910950
/// Wrap a workflow that takes typed input and returns typed output.
951+
/// Reflection-based JSON — not AOT-safe.
911952
/// </summary>
953+
[RequiresUnreferencedCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
954+
[RequiresDynamicCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
912955
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput, TOutput>(
913956
Func<TInput, IDurableContext, Task<TOutput>> workflow,
914957
DurableExecutionInvocationInput invocationInput,
915958
ILambdaContext lambdaContext);
916959

917960
/// <summary>
918-
/// Wrap a workflow that takes typed input and returns no value.
961+
/// Wrap a workflow (typed input + output) with explicit Lambda client.
962+
/// Reflection-based JSON — not AOT-safe.
963+
/// </summary>
964+
[RequiresUnreferencedCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
965+
[RequiresDynamicCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
966+
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput, TOutput>(
967+
Func<TInput, IDurableContext, Task<TOutput>> workflow,
968+
DurableExecutionInvocationInput invocationInput,
969+
ILambdaContext lambdaContext,
970+
IAmazonLambda lambdaClient);
971+
972+
/// <summary>
973+
/// Wrap a void workflow (typed input, no output).
974+
/// Reflection-based JSON — not AOT-safe.
919975
/// </summary>
976+
[RequiresUnreferencedCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
977+
[RequiresDynamicCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
920978
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput>(
921979
Func<TInput, IDurableContext, Task> workflow,
922980
DurableExecutionInvocationInput invocationInput,
923981
ILambdaContext lambdaContext);
982+
983+
/// <summary>
984+
/// Wrap a void workflow with explicit Lambda client.
985+
/// Reflection-based JSON — not AOT-safe.
986+
/// </summary>
987+
[RequiresUnreferencedCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
988+
[RequiresDynamicCode("Uses reflection-based JSON. Use the JsonSerializerContext overload for AOT.")]
989+
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput>(
990+
Func<TInput, IDurableContext, Task> workflow,
991+
DurableExecutionInvocationInput invocationInput,
992+
ILambdaContext lambdaContext,
993+
IAmazonLambda lambdaClient);
994+
995+
// ── AOT-safe overloads (caller supplies JsonSerializerContext) ──────
996+
997+
/// <summary>
998+
/// Wrap a workflow (typed input + output). AOT-safe — requires
999+
/// [JsonSerializable(typeof(TInput))] and [JsonSerializable(typeof(TOutput))]
1000+
/// on the supplied jsonContext.
1001+
/// </summary>
1002+
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput, TOutput>(
1003+
Func<TInput, IDurableContext, Task<TOutput>> workflow,
1004+
DurableExecutionInvocationInput invocationInput,
1005+
ILambdaContext lambdaContext,
1006+
JsonSerializerContext jsonContext);
1007+
1008+
/// <summary>
1009+
/// Wrap a workflow (typed input + output) with explicit Lambda client. AOT-safe.
1010+
/// </summary>
1011+
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput, TOutput>(
1012+
Func<TInput, IDurableContext, Task<TOutput>> workflow,
1013+
DurableExecutionInvocationInput invocationInput,
1014+
ILambdaContext lambdaContext,
1015+
IAmazonLambda lambdaClient,
1016+
JsonSerializerContext jsonContext);
1017+
1018+
/// <summary>
1019+
/// Wrap a void workflow (typed input, no output). AOT-safe.
1020+
/// </summary>
1021+
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput>(
1022+
Func<TInput, IDurableContext, Task> workflow,
1023+
DurableExecutionInvocationInput invocationInput,
1024+
ILambdaContext lambdaContext,
1025+
JsonSerializerContext jsonContext);
1026+
1027+
/// <summary>
1028+
/// Wrap a void workflow with explicit Lambda client. AOT-safe.
1029+
/// </summary>
1030+
public static Task<DurableExecutionInvocationOutput> WrapAsync<TInput>(
1031+
Func<TInput, IDurableContext, Task> workflow,
1032+
DurableExecutionInvocationInput invocationInput,
1033+
ILambdaContext lambdaContext,
1034+
IAmazonLambda lambdaClient,
1035+
JsonSerializerContext jsonContext);
9241036
}
9251037
```
9261038

@@ -968,6 +1080,24 @@ public interface IDurableContext
9681080
StepConfig? config = null,
9691081
CancellationToken cancellationToken = default);
9701082

1083+
/// <summary>
1084+
/// Execute a step (simplified overload without IStepContext).
1085+
/// </summary>
1086+
Task<T> StepAsync<T>(
1087+
Func<Task<T>> func,
1088+
string? name = null,
1089+
StepConfig? config = null,
1090+
CancellationToken cancellationToken = default);
1091+
1092+
/// <summary>
1093+
/// Execute a step that returns no value (simplified overload).
1094+
/// </summary>
1095+
Task StepAsync(
1096+
Func<Task> func,
1097+
string? name = null,
1098+
StepConfig? config = null,
1099+
CancellationToken cancellationToken = default);
1100+
9711101
/// <summary>
9721102
/// Suspend execution for the specified duration.
9731103
/// Throws ArgumentOutOfRangeException if duration is less than 1 second.
@@ -1087,7 +1217,11 @@ public record DurableBranch<T>(string Name, Func<IDurableContext, Task<T>> Func)
10871217

10881218
#### CancellationToken behavior
10891219

1090-
All methods accept a `CancellationToken` that follows standard .NET semantics: cancellation throws `OperationCanceledException` and the execution fails. Cancellation does **not** trigger suspension — those are separate concepts. The durable execution service handles timeout scenarios automatically: if Lambda terminates mid-execution, the next invocation simply replays from the last checkpoint. For advanced users who want to suspend gracefully before timeout, check `context.LambdaContext.RemainingTime` and return early.
1220+
All methods accept a `CancellationToken` that follows standard .NET semantics: cancellation throws `OperationCanceledException` and the execution fails. Cancellation does **not** trigger suspension — those are separate concepts.
1221+
1222+
`IDurableContext.CancellationToken` is a context-level token that is automatically cancelled when the execution suspends (wait, retry, or callback). This allows in-flight user code to cooperatively stop after suspension rather than continuing to consume resources on the Lambda container until freeze. User code that performs long-running work (HTTP calls, file I/O) should pass this token to those operations.
1223+
1224+
The durable execution service handles timeout scenarios automatically: if Lambda terminates mid-execution, the next invocation simply replays from the last checkpoint. For advanced users who want to suspend gracefully before timeout, check `context.LambdaContext.RemainingTime` and return early.
10911225

10921226
### Configuration Types
10931227

@@ -1579,13 +1713,29 @@ Both approaches produce a self-contained executable that the Lambda custom runti
15791713

15801714
### NativeAOT compatibility
15811715

1582-
The SDK is AOT-friendly but does not require AOT. The default JSON serialization uses reflection (standard `System.Text.Json` behavior), which works in JIT mode. For NativeAOT deployments, provide a `JsonSerializerContext` via the `ICheckpointSerializer<T>` interface — this avoids all runtime reflection and is fully trim-safe. The SDK itself avoids `Activator.CreateInstance`, `Type.GetType()`, and other reflection patterns, and uses `[DynamicallyAccessedMembers]` trimming annotations where needed.
1716+
The SDK is AOT-friendly but does not require AOT. The default JSON serialization uses reflection (standard `System.Text.Json` behavior), which works in JIT mode. For NativeAOT deployments, AOT safety is addressed at two levels:
1717+
1718+
1. **Entry point (`DurableFunction.WrapAsync`)** — pass a `JsonSerializerContext` that includes type info for your `TInput` and `TOutput` types. The reflection-based overloads are annotated with `[RequiresUnreferencedCode]` / `[RequiresDynamicCode]` so the trimmer will warn if you use them in AOT builds.
1719+
1720+
2. **Step checkpoints (`StepConfig.Serializer`)** — for custom or complex types checkpointed by `StepAsync`, provide an `ICheckpointSerializer` via `StepConfig` (e.g., `JsonCheckpointSerializer<T>` backed by a source-generated `JsonTypeInfo<T>`).
1721+
1722+
The SDK itself avoids `Activator.CreateInstance`, `Type.GetType()`, and other reflection patterns, and uses `[DynamicallyAccessedMembers]` trimming annotations where needed.
15831723

15841724
```csharp
15851725
// Default: works with reflection (JIT mode)
15861726
var result = await context.StepAsync<Order>(async () => await GetOrder());
15871727

1588-
// AOT mode: user provides serialization context
1728+
// AOT mode — entry point: pass JsonSerializerContext to WrapAsync
1729+
[JsonSerializable(typeof(OrderEvent))]
1730+
[JsonSerializable(typeof(OrderResult))]
1731+
internal partial class MyJsonContext : JsonSerializerContext { }
1732+
1733+
public Task<DurableExecutionInvocationOutput> FunctionHandler(
1734+
DurableExecutionInvocationInput invocationInput, ILambdaContext context)
1735+
=> DurableFunction.WrapAsync<OrderEvent, OrderResult>(
1736+
MyWorkflow, invocationInput, context, MyJsonContext.Default);
1737+
1738+
// AOT mode — step checkpoint: provide a serializer via StepConfig
15891739
var result = await context.StepAsync(
15901740
async () => await GetOrder(),
15911741
config: new StepConfig { Serializer = new JsonCheckpointSerializer<Order>(MyJsonContext.Default.Order) });
@@ -1701,7 +1851,7 @@ public class Functions
17011851
}
17021852
```
17031853

1704-
When no `LambdaClientFactory` is specified, the generated code creates a default `AmazonLambdaClient`. For the manual handler path, pass the client directly to `DurableExecutionHandler.RunAsync`.
1854+
When no `LambdaClientFactory` is specified, the generated code creates a default `AmazonLambdaClient`. For the manual handler path (`DurableFunction.WrapAsync`), pass the client directly via the `IAmazonLambda lambdaClient` overload.
17051855

17061856
> **Dependency boundaries:** `Amazon.Lambda.Annotations` has **no dependency** on the AWS SDK or on `Amazon.Lambda.DurableExecution`. The Annotations source generator references durable execution types by fully-qualified name strings only — it never takes a compile-time dependency on the durable package. The `[DurableExecution]` attribute is defined in `Amazon.Lambda.DurableExecution`, and the generated code resolves against the user's project references. There is only one source generator (Annotations) — no coordination between multiple generators is needed.
17071857

Libraries/src/Amazon.Lambda.DurableExecution/Amazon.Lambda.DurableExecution.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
<EnableTrimAnalyzer>true</EnableTrimAnalyzer>
1515
<Nullable>enable</Nullable>
1616
<ImplicitUsings>enable</ImplicitUsings>
17+
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
18+
<WarningsAsErrors>IL2026,IL2067,IL2075,IL3050</WarningsAsErrors>
1719
</PropertyGroup>
1820

1921
<ItemGroup>

Libraries/src/Amazon.Lambda.DurableExecution/AssemblyMarker.cs

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Configuration for step execution.
5+
/// </summary>
6+
public class StepConfig
7+
{
8+
/// <summary>
9+
/// Custom serializer for the step result. Default is System.Text.Json.
10+
/// </summary>
11+
public ICheckpointSerializer? Serializer { get; set; }
12+
13+
// TODO: Retry support is deferred to a follow-up PR. When added, this is
14+
// where RetryStrategy and Semantics (AtLeastOncePerRetry / AtMostOncePerRetry)
15+
// will live. The follow-up needs to use service-mediated retries (checkpoint
16+
// a RETRY operation + suspend the Lambda) rather than an in-process Task.Delay
17+
// loop, to match the Python and JS reference SDKs and to avoid billing Lambda
18+
// compute time during retry backoff.
19+
}

0 commit comments

Comments
 (0)