Skip to content

Commit 9e5113d

Browse files
committed
Add durable execution Step + Wait end-to-end
Implements the minimum viable slice of the Amazon.Lambda.DurableExecution SDK: a workflow can run StepAsync and WaitAsync against a real Lambda, with replay-aware checkpointing wired through to the AWS service. Public API surface introduced: - DurableFunction.WrapAsync — entry point that handles the durable execution envelope (input hydration, output construction, status mapping) - IDurableContext.StepAsync / WaitAsync (4 Step overloads, 1 Wait) - StepConfig with serializer hook (retry deferred to follow-up PR) - ICheckpointSerializer interface - [DurableExecution] attribute (recognized by future source generator) - DurableExecutionException base + StepException Internals: - DurableExecutionHandler — Task.WhenAny race between user code and the suspension signal, returning Succeeded/Failed/Pending - ExecutionState — replay-aware operation lookup and pending checkpoint buffer - OperationIdGenerator — deterministic, replay-stable IDs - TerminationManager — TaskCompletionSource-based suspension trigger - LambdaDurableServiceClient — wraps AWSSDK.Lambda's checkpoint and state APIs Tests: - 86 unit tests covering enums, exceptions, models, configs, ID generation, termination, execution state, the handler race, the context (Step + Wait paths), and the WrapAsync entry point - 8 end-to-end integration tests deploying real Lambdas via Docker on the provided.al2023 runtime: StepWaitStep, MultipleSteps, WaitOnly, LongerWait, ReplayDeterminism, RetrySucceeds, RetryExhausts, StepFails Out of scope (follow-up PRs): - IRetryStrategy, ExponentialRetryStrategy, retry decision factories - DefaultJsonCheckpointSerializer - DurableLogger replay-suppression (currently returns NullLogger) - Callbacks, InvokeAsync, ParallelAsync, MapAsync, RunInChildContextAsync, WaitForConditionAsync — interface intentionally does not declare them - Annotations source-generator integration - DurableTestRunner / Amazon.Lambda.DurableExecution.Testing package - dotnet new lambda.DurableFunction blueprint stack-info: PR: #2360, branch: GarrettBeatty/stack/2 remove update update update update
1 parent 1fb7f7f commit 9e5113d

74 files changed

Lines changed: 6428 additions & 61 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Docs/durable-execution-design.md

Lines changed: 196 additions & 42 deletions
Large diffs are not rendered by default.

Libraries/src/Amazon.Lambda.DurableExecution/Amazon.Lambda.DurableExecution.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
<EnableTrimAnalyzer>true</EnableTrimAnalyzer>
1515
<Nullable>enable</Nullable>
1616
<ImplicitUsings>enable</ImplicitUsings>
17+
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
18+
<WarningsAsErrors>IL2026,IL2067,IL2075,IL3050</WarningsAsErrors>
1719
</PropertyGroup>
1820

1921
<ItemGroup>

Libraries/src/Amazon.Lambda.DurableExecution/AssemblyMarker.cs

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Configuration for step execution.
5+
/// </summary>
6+
public sealed class StepConfig
7+
{
8+
// TODO: Retry support is deferred to a follow-up PR. When added, this is
9+
// where RetryStrategy and Semantics (AtLeastOncePerRetry / AtMostOncePerRetry)
10+
// will live. The follow-up needs to use service-mediated retries (checkpoint
11+
// a RETRY operation + suspend the Lambda) rather than an in-process Task.Delay
12+
// loop, to avoid billing Lambda compute time during retry backoff.
13+
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
using System.Diagnostics.CodeAnalysis;
2+
using Amazon.Lambda.Core;
3+
using Amazon.Lambda.DurableExecution.Internal;
4+
using Microsoft.Extensions.Logging;
5+
using Microsoft.Extensions.Logging.Abstractions;
6+
7+
namespace Amazon.Lambda.DurableExecution;
8+
9+
/// <summary>
10+
/// Implementation of <see cref="IDurableContext"/>. Constructs and dispatches
11+
/// per-operation classes (<see cref="StepOperation{T}"/>, <see cref="WaitOperation"/>);
12+
/// the replay logic lives in those classes.
13+
/// </summary>
14+
internal sealed class DurableContext : IDurableContext
15+
{
16+
private readonly ExecutionState _state;
17+
private readonly TerminationManager _terminationManager;
18+
private readonly OperationIdGenerator _idGenerator;
19+
private readonly string _durableExecutionArn;
20+
private readonly CheckpointBatcher? _batcher;
21+
22+
public DurableContext(
23+
ExecutionState state,
24+
TerminationManager terminationManager,
25+
OperationIdGenerator idGenerator,
26+
string durableExecutionArn,
27+
ILambdaContext lambdaContext,
28+
CheckpointBatcher? batcher = null)
29+
{
30+
_state = state;
31+
_terminationManager = terminationManager;
32+
_idGenerator = idGenerator;
33+
_durableExecutionArn = durableExecutionArn;
34+
_batcher = batcher;
35+
LambdaContext = lambdaContext;
36+
}
37+
38+
// Replay-safe logger ships in a follow-up PR; see IDurableContext.Logger doc.
39+
public ILogger Logger => NullLogger.Instance;
40+
public IExecutionContext ExecutionContext => new DurableExecutionContext(_durableExecutionArn);
41+
public ILambdaContext LambdaContext { get; }
42+
43+
[RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
44+
[RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
45+
public Task<T> StepAsync<T>(
46+
Func<IStepContext, Task<T>> func,
47+
string? name = null,
48+
StepConfig? config = null,
49+
CancellationToken cancellationToken = default)
50+
=> RunStep(func, new ReflectionJsonCheckpointSerializer<T>(), name, config, cancellationToken);
51+
52+
public async Task StepAsync(
53+
Func<IStepContext, Task> func,
54+
string? name = null,
55+
StepConfig? config = null,
56+
CancellationToken cancellationToken = default)
57+
{
58+
// Void steps don't carry a meaningful payload; we wrap with a null-only
59+
// serializer that doesn't touch reflection.
60+
await RunStep<object?>(
61+
async (ctx) => { await func(ctx); return null; },
62+
NullCheckpointSerializer.Instance,
63+
name, config, cancellationToken);
64+
}
65+
66+
public Task<T> StepAsync<T>(
67+
Func<IStepContext, Task<T>> func,
68+
ICheckpointSerializer<T> serializer,
69+
string? name = null,
70+
StepConfig? config = null,
71+
CancellationToken cancellationToken = default)
72+
=> RunStep(func, serializer, name, config, cancellationToken);
73+
74+
75+
private Task<T> RunStep<T>(
76+
Func<IStepContext, Task<T>> func,
77+
ICheckpointSerializer<T> serializer,
78+
string? name,
79+
StepConfig? config,
80+
CancellationToken cancellationToken)
81+
{
82+
var operationId = _idGenerator.NextId();
83+
var op = new StepOperation<T>(
84+
operationId, name, func, config, serializer, Logger,
85+
_state, _terminationManager, _durableExecutionArn, _batcher);
86+
return op.ExecuteAsync(cancellationToken);
87+
}
88+
89+
public Task WaitAsync(
90+
TimeSpan duration,
91+
string? name = null,
92+
CancellationToken cancellationToken = default)
93+
{
94+
// Service timer granularity is 1 second; sub-second waits would round to 0.
95+
// WaitOptions.WaitSeconds is integer in [1, 31_622_400] (1 second to ~1 year).
96+
if (duration < TimeSpan.FromSeconds(1))
97+
throw new ArgumentOutOfRangeException(nameof(duration), duration, "Wait duration must be at least 1 second.");
98+
99+
if (duration > TimeSpan.FromSeconds(31_622_400))
100+
throw new ArgumentOutOfRangeException(nameof(duration), duration, "Wait duration must be at most 31,622,400 seconds (~1 year).");
101+
102+
cancellationToken.ThrowIfCancellationRequested();
103+
104+
var operationId = _idGenerator.NextId();
105+
var waitSeconds = (int)Math.Max(1, Math.Ceiling(duration.TotalSeconds));
106+
var op = new WaitOperation(
107+
operationId, name, waitSeconds,
108+
_state, _terminationManager, _durableExecutionArn, _batcher);
109+
return op.ExecuteAsync(cancellationToken);
110+
}
111+
}
112+
113+
/// <summary>
114+
/// Trim-safe serializer used by the void <c>StepAsync</c> overloads, which never
115+
/// carry a meaningful payload. Always serializes to <c>"null"</c> and discards
116+
/// on deserialize.
117+
/// </summary>
118+
internal sealed class NullCheckpointSerializer : ICheckpointSerializer<object?>
119+
{
120+
public static NullCheckpointSerializer Instance { get; } = new();
121+
public string Serialize(object? value, SerializationContext context) => "null";
122+
public object? Deserialize(string data, SerializationContext context) => null;
123+
}
124+
125+
internal sealed class DurableExecutionContext : IExecutionContext
126+
{
127+
public DurableExecutionContext(string durableExecutionArn)
128+
{
129+
DurableExecutionArn = durableExecutionArn;
130+
}
131+
132+
public string DurableExecutionArn { get; }
133+
}
134+
135+
internal sealed class StepContext : IStepContext
136+
{
137+
public StepContext(string operationId, int attemptNumber, ILogger logger)
138+
{
139+
OperationId = operationId;
140+
AttemptNumber = attemptNumber;
141+
Logger = logger;
142+
}
143+
144+
public ILogger Logger { get; }
145+
public int AttemptNumber { get; }
146+
public string OperationId { get; }
147+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
using Amazon.Lambda.DurableExecution.Internal;
2+
3+
namespace Amazon.Lambda.DurableExecution;
4+
5+
/// <summary>
6+
/// The result of running a durable execution handler.
7+
/// </summary>
8+
internal sealed class HandlerResult<TResult>
9+
{
10+
public required InvocationStatus Status { get; init; }
11+
public TResult? Result { get; init; }
12+
public string? Message { get; init; }
13+
public Exception? Exception { get; init; }
14+
}
15+
16+
/// <summary>
17+
/// Core orchestration engine for durable execution. Races user code against
18+
/// a termination signal using Task.WhenAny. When user code completes, returns
19+
/// SUCCEEDED/FAILED. When termination wins (wait, callback, invoke), returns PENDING.
20+
/// </summary>
21+
internal static class DurableExecutionHandler
22+
{
23+
/// <summary>
24+
/// Runs the user's workflow function within the durable execution engine.
25+
/// </summary>
26+
/// <remarks>
27+
/// <para>
28+
/// Suspension flow — example: <c>await ctx.WaitAsync(TimeSpan.FromSeconds(5))</c>:
29+
/// </para>
30+
/// <code>
31+
/// user code DurableContext TerminationMgr RunAsync
32+
/// ───────── ────────────── ────────────── ────────
33+
/// WaitAsync(5s) ─────► queue WAIT START
34+
/// checkpoint
35+
/// Terminate() ──────► TerminationTask
36+
/// completes
37+
/// ◄────── new TCS().Task
38+
/// (never completes)
39+
/// await blocks
40+
/// forever WhenAny:
41+
/// ── termination wins
42+
/// ── userTask abandoned
43+
/// ── return Pending
44+
/// </code>
45+
/// <para>
46+
/// Key insight: <c>WaitAsync</c> never returns a completed Task — it hands back
47+
/// a TaskCompletionSource that is never resolved. The user's <c>await</c> blocks
48+
/// indefinitely. The escape signal is <c>terminationManager.Terminate()</c>,
49+
/// which <c>Task.WhenAny</c> picks up. We return Pending; the dangling user
50+
/// Task is GC'd. The service flushes checkpoints, fires the wait timer, then
51+
/// re-invokes Lambda — on replay, <c>WaitAsync</c> sees the matching SUCCEED
52+
/// checkpoint and returns <c>Task.CompletedTask</c> normally.
53+
/// </para>
54+
/// <para>
55+
/// The same pattern applies to retries (<c>RetryScheduled</c>), callbacks
56+
/// (<c>CallbackPending</c>), and chained invokes (<c>InvokePending</c>).
57+
/// </para>
58+
/// </remarks>
59+
/// <typeparam name="TResult">The workflow return type.</typeparam>
60+
/// <param name="executionState">Hydrated execution state from prior invocations.</param>
61+
/// <param name="terminationManager">Manages the suspension signal.</param>
62+
/// <param name="userHandler">The user's workflow function receiving a DurableContext.</param>
63+
/// <returns>The handler result indicating SUCCEEDED, FAILED, or PENDING.</returns>
64+
internal static async Task<HandlerResult<TResult>> RunAsync<TResult>(
65+
ExecutionState executionState,
66+
TerminationManager terminationManager,
67+
Func<Task<TResult>> userHandler)
68+
{
69+
// Run user code on a threadpool thread so it executes independently of
70+
// the termination signal. When TerminationManager fires (e.g., WaitAsync),
71+
// we need the WhenAny race below to resolve immediately without waiting
72+
// for the user task to reach an await point.
73+
var userTask = Task.Run(userHandler);
74+
75+
// Race: user code completing vs. termination signal (wait/callback/retry).
76+
// If termination wins, we return PENDING and the abandoned userTask is never awaited.
77+
var winner = await Task.WhenAny(userTask, terminationManager.TerminationTask);
78+
79+
if (winner == terminationManager.TerminationTask)
80+
{
81+
var terminationResult = await terminationManager.TerminationTask;
82+
83+
if (terminationResult.Exception != null)
84+
{
85+
return new HandlerResult<TResult>
86+
{
87+
Status = InvocationStatus.Failed,
88+
Message = terminationResult.Exception.Message,
89+
Exception = terminationResult.Exception
90+
};
91+
}
92+
93+
return new HandlerResult<TResult>
94+
{
95+
Status = InvocationStatus.Pending,
96+
Message = terminationResult.Message
97+
};
98+
}
99+
100+
try
101+
{
102+
var result = await userTask;
103+
return new HandlerResult<TResult>
104+
{
105+
Status = InvocationStatus.Succeeded,
106+
Result = result
107+
};
108+
}
109+
catch (Exception ex)
110+
{
111+
return new HandlerResult<TResult>
112+
{
113+
Status = InvocationStatus.Failed,
114+
Message = ex.Message,
115+
Exception = ex
116+
};
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)