Skip to content

Commit 4132f5d

Browse files
GarrettBeattyclaude
andcommitted
Add RunInChildContextAsync
Adds child-context support to the .NET Durable Execution SDK. A child context is a logical sub-workflow with its own deterministic operation-ID space, persisted as a CONTEXT operation so subsequent invocations replay the cached value without re-executing the function. Public surface: - IDurableContext.RunInChildContextAsync<T> (reflection + AOT-safe ICheckpointSerializer<T> overloads, plus a void overload). - ChildContextConfig with SubType (observability label) and ErrorMapping (transform exceptions before they surface to the caller). - ChildContextException for failure surfacing. Used as a building block for upcoming WaitForCallbackAsync. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 097e5a1 commit 4132f5d

19 files changed

Lines changed: 1203 additions & 4 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Configuration for a child context.
5+
/// </summary>
6+
/// <remarks>
7+
/// A child context is a logical sub-workflow with its own deterministic
8+
/// operation-ID space, persisted as a <c>CONTEXT</c> operation. Use
9+
/// <see cref="IDurableContext.RunInChildContextAsync{T}(System.Func{IDurableContext, System.Threading.Tasks.Task{T}}, string?, ChildContextConfig?, System.Threading.CancellationToken)"/>
10+
/// (and overloads) to run code inside one.
11+
/// </remarks>
12+
public sealed class ChildContextConfig
13+
{
14+
/// <summary>
15+
/// Operation sub-type label for observability (e.g. <c>"WaitForCallback"</c>).
16+
/// Surfaces on the wire <c>OperationUpdate.SubType</c> field.
17+
/// </summary>
18+
public string? SubType { get; set; }
19+
20+
/// <summary>
21+
/// Optional function to transform exceptions thrown by the child context's
22+
/// user function before they surface to the caller. Useful for wrapping
23+
/// low-level errors into domain-specific exceptions.
24+
/// </summary>
25+
/// <remarks>
26+
/// Applied when the user function throws (the mapped exception propagates
27+
/// to the caller of <c>RunInChildContextAsync</c>) and on replay of a
28+
/// <c>FAILED</c> child context (the constructed
29+
/// <see cref="ChildContextException"/> is mapped before being thrown).
30+
/// </remarks>
31+
public Func<Exception, Exception>? ErrorMapping { get; set; }
32+
}

Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private Task<T> RunStep<T>(
7676

7777
var operationId = _idGenerator.NextId();
7878
var op = new StepOperation<T>(
79-
operationId, name, func, config, serializer, Logger,
79+
operationId, name, _idGenerator.ParentId, func, config, serializer, Logger,
8080
_state, _terminationManager, _durableExecutionArn, _batcher);
8181
return op.ExecuteAsync(cancellationToken);
8282
}
@@ -99,7 +99,58 @@ public Task WaitAsync(
9999
var operationId = _idGenerator.NextId();
100100
var waitSeconds = (int)Math.Max(1, Math.Ceiling(duration.TotalSeconds));
101101
var op = new WaitOperation(
102-
operationId, name, waitSeconds,
102+
operationId, name, _idGenerator.ParentId, waitSeconds,
103+
_state, _terminationManager, _durableExecutionArn, _batcher);
104+
return op.ExecuteAsync(cancellationToken);
105+
}
106+
107+
public Task<T> RunInChildContextAsync<T>(
108+
Func<IDurableContext, Task<T>> func,
109+
string? name = null,
110+
ChildContextConfig? config = null,
111+
CancellationToken cancellationToken = default)
112+
=> RunChildContext(func, name, config, cancellationToken);
113+
114+
public async Task RunInChildContextAsync(
115+
Func<IDurableContext, Task> func,
116+
string? name = null,
117+
ChildContextConfig? config = null,
118+
CancellationToken cancellationToken = default)
119+
{
120+
// Void child contexts don't carry a meaningful payload; the wrapper
121+
// returns null so the registered ILambdaSerializer is never asked to
122+
// serialize a real value.
123+
await RunChildContext<object?>(
124+
async (ctx) => { await func(ctx); return null; },
125+
name, config, cancellationToken);
126+
}
127+
128+
private Task<T> RunChildContext<T>(
129+
Func<IDurableContext, Task<T>> func,
130+
string? name,
131+
ChildContextConfig? config,
132+
CancellationToken cancellationToken)
133+
{
134+
var serializer = LambdaContext.Serializer
135+
?? throw new InvalidOperationException(
136+
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
137+
"In the class library programming model, register one with " +
138+
"[assembly: LambdaSerializer(typeof(...))]. In an executable / custom " +
139+
"runtime, pass it to LambdaBootstrapBuilder.Create(handler, serializer). " +
140+
"In tests, set TestLambdaContext.Serializer.");
141+
142+
var operationId = _idGenerator.NextId();
143+
144+
// Capture this DurableContext's collaborators; the child shares state,
145+
// termination, batcher, ARN, and Lambda context — but uses a child
146+
// OperationIdGenerator so its operation IDs are deterministically
147+
// namespaced under the parent op ID.
148+
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
149+
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
150+
_durableExecutionArn, LambdaContext, _batcher);
151+
152+
var op = new ChildContextOperation<T>(
153+
operationId, name, _idGenerator.ParentId, func, config, serializer, ChildFactory,
103154
_state, _terminationManager, _durableExecutionArn, _batcher);
104155
return op.ExecuteAsync(cancellationToken);
105156
}

Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,31 @@ public StepInterruptedException(string message) : base(message) { }
6767
/// <summary>Creates a <see cref="StepInterruptedException"/> wrapping an inner exception.</summary>
6868
public StepInterruptedException(string message, Exception innerException) : base(message, innerException) { }
6969
}
70+
71+
/// <summary>
72+
/// Thrown when a child context's user function fails. Surfaces from
73+
/// <c>RunInChildContextAsync</c>; the underlying error is preserved on the
74+
/// <see cref="ErrorType"/>/<see cref="ErrorData"/>/<see cref="OriginalStackTrace"/>
75+
/// fields. Use <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
76+
/// domain-specific exception.
77+
/// </summary>
78+
public class ChildContextException : DurableExecutionException
79+
{
80+
/// <summary>
81+
/// The child context's <see cref="ChildContextConfig.SubType"/>, if any.
82+
/// </summary>
83+
public string? SubType { get; init; }
84+
/// <summary>The fully-qualified type name of the original exception.</summary>
85+
public string? ErrorType { get; init; }
86+
/// <summary>Optional structured error data attached by the user.</summary>
87+
public string? ErrorData { get; init; }
88+
/// <summary>Stack trace of the original exception, captured before serialization.</summary>
89+
public IReadOnlyList<string>? OriginalStackTrace { get; init; }
90+
91+
/// <summary>Creates an empty <see cref="ChildContextException"/>.</summary>
92+
public ChildContextException() { }
93+
/// <summary>Creates a <see cref="ChildContextException"/> with the given message.</summary>
94+
public ChildContextException(string message) : base(message) { }
95+
/// <summary>Creates a <see cref="ChildContextException"/> wrapping an inner exception.</summary>
96+
public ChildContextException(string message, Exception innerException) : base(message, innerException) { }
97+
}

Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,37 @@ Task WaitAsync(
6060
TimeSpan duration,
6161
string? name = null,
6262
CancellationToken cancellationToken = default);
63+
64+
/// <summary>
65+
/// Run a user function inside a logical sub-workflow (a "child context").
66+
/// The child has its own deterministic operation-ID space; its result is
67+
/// checkpointed as a <c>CONTEXT</c> operation so subsequent invocations
68+
/// replay the cached value without re-executing the func.
69+
/// </summary>
70+
/// <remarks>
71+
/// Use child contexts to group related durable operations (e.g. a step plus
72+
/// a wait plus a step) into a single observability/error-handling boundary.
73+
/// On failure, surfaces as <see cref="ChildContextException"/>; supply
74+
/// <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
75+
/// domain-specific exception.
76+
/// The child context's return value is serialized to a checkpoint using the
77+
/// <see cref="ILambdaSerializer"/> registered on
78+
/// <see cref="ILambdaContext.Serializer"/>.
79+
/// </remarks>
80+
Task<T> RunInChildContextAsync<T>(
81+
Func<IDurableContext, Task<T>> func,
82+
string? name = null,
83+
ChildContextConfig? config = null,
84+
CancellationToken cancellationToken = default);
85+
86+
/// <summary>
87+
/// Run a user function inside a child context that returns no value.
88+
/// </summary>
89+
Task RunInChildContextAsync(
90+
Func<IDurableContext, Task> func,
91+
string? name = null,
92+
ChildContextConfig? config = null,
93+
CancellationToken cancellationToken = default);
6394
}
6495

6596
/// <summary>
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
using System.IO;
2+
using System.Text;
3+
using Amazon.Lambda.Core;
4+
using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
5+
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;
6+
7+
namespace Amazon.Lambda.DurableExecution.Internal;
8+
9+
/// <summary>
10+
/// Durable child context operation. Runs a user-supplied function inside a
11+
/// nested <see cref="DurableContext"/> with its own deterministic operation-ID
12+
/// space, persisting the function's result so subsequent invocations replay
13+
/// the cached value without re-executing.
14+
/// </summary>
15+
/// <remarks>
16+
/// Replay branches — example: <c>await ctx.RunInChildContextAsync(child =&gt; ..., name: "phase")</c>
17+
/// <list type="bullet">
18+
/// <item><b>Fresh</b>: no prior state → sync-flush CONTEXT START → run user
19+
/// func → on success emit CONTEXT SUCCEED → on failure emit CONTEXT FAIL
20+
/// and throw <see cref="ChildContextException"/>.</item>
21+
/// <item><b>SUCCEEDED</b>: return cached deserialized result; user func is
22+
/// NOT re-executed.</item>
23+
/// <item><b>FAILED</b>: throw <see cref="ChildContextException"/> with the
24+
/// recorded error; if <see cref="ChildContextConfig.ErrorMapping"/> is
25+
/// set, the mapped exception is thrown instead.</item>
26+
/// <item><b>STARTED</b> / <b>PENDING</b>: re-run the user func without
27+
/// re-checkpointing START. The child's own operations recover from their
28+
/// own checkpoints, so this is replay propagation; if a wait/callback
29+
/// inside the child is still pending, the user func re-suspends.</item>
30+
/// </list>
31+
/// Unlike <see cref="StepOperation{T}"/>, child contexts have no retry strategy:
32+
/// failure is terminal and surfaces immediately via
33+
/// <see cref="ChildContextException"/>.
34+
/// </remarks>
35+
internal sealed class ChildContextOperation<T> : DurableOperation<T>
36+
{
37+
private readonly Func<IDurableContext, Task<T>> _func;
38+
private readonly ChildContextConfig? _config;
39+
private readonly ILambdaSerializer _serializer;
40+
private readonly Func<string, IDurableContext> _childContextFactory;
41+
42+
public ChildContextOperation(
43+
string operationId,
44+
string? name,
45+
string? parentId,
46+
Func<IDurableContext, Task<T>> func,
47+
ChildContextConfig? config,
48+
ILambdaSerializer serializer,
49+
Func<string, IDurableContext> childContextFactory,
50+
ExecutionState state,
51+
TerminationManager termination,
52+
string durableExecutionArn,
53+
CheckpointBatcher? batcher = null)
54+
: base(operationId, name, parentId, state, termination, durableExecutionArn, batcher)
55+
{
56+
_func = func;
57+
_config = config;
58+
_serializer = serializer;
59+
_childContextFactory = childContextFactory;
60+
}
61+
62+
protected override string OperationType => OperationTypes.Context;
63+
64+
protected override async Task<T> StartAsync(CancellationToken cancellationToken)
65+
{
66+
// Sync-flush CONTEXT START before user code so the service has a record
67+
// of the parent context if the inner func suspends (e.g. a Wait inside
68+
// the child terminates the workflow before SUCCEED is reached).
69+
await EnqueueAsync(new SdkOperationUpdate
70+
{
71+
Id = OperationId,
72+
ParentId = ParentId,
73+
Type = OperationTypes.Context,
74+
Action = "START",
75+
SubType = _config?.SubType,
76+
Name = Name
77+
}, cancellationToken);
78+
79+
return await ExecuteFunc(cancellationToken);
80+
}
81+
82+
protected override Task<T> ReplayAsync(Operation existing, CancellationToken cancellationToken)
83+
{
84+
switch (existing.Status)
85+
{
86+
case OperationStatuses.Succeeded:
87+
// Side-effecting code runs at most once: replay returns the
88+
// cached result without invoking the user func.
89+
return Task.FromResult(DeserializeResult(existing.ContextDetails?.Result));
90+
91+
case OperationStatuses.Failed:
92+
throw MapFailureException(BuildChildContextException(existing));
93+
94+
case OperationStatuses.Started:
95+
case OperationStatuses.Pending:
96+
// Re-run the user func: the child's own operations replay from
97+
// their own checkpoints. Do NOT re-checkpoint START — the
98+
// original is still authoritative. If something inside the
99+
// child is still pending (Wait, callback, retry) the user func
100+
// will re-suspend on its own.
101+
return ExecuteFunc(cancellationToken);
102+
103+
default:
104+
throw new NonDeterministicExecutionException(
105+
$"Child context operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay.");
106+
}
107+
}
108+
109+
private async Task<T> ExecuteFunc(CancellationToken cancellationToken)
110+
{
111+
cancellationToken.ThrowIfCancellationRequested();
112+
113+
var childContext = _childContextFactory(OperationId);
114+
115+
T result;
116+
try
117+
{
118+
result = await _func(childContext);
119+
}
120+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
121+
{
122+
throw;
123+
}
124+
catch (Exception ex)
125+
{
126+
await EnqueueAsync(new SdkOperationUpdate
127+
{
128+
Id = OperationId,
129+
ParentId = ParentId,
130+
Type = OperationTypes.Context,
131+
Action = "FAIL",
132+
SubType = _config?.SubType,
133+
Name = Name,
134+
Error = ToSdkError(ex)
135+
}, cancellationToken);
136+
137+
throw MapFailureException(new ChildContextException(ex.Message, ex)
138+
{
139+
SubType = _config?.SubType,
140+
ErrorType = ex.GetType().FullName
141+
});
142+
}
143+
144+
await EnqueueAsync(new SdkOperationUpdate
145+
{
146+
Id = OperationId,
147+
ParentId = ParentId,
148+
Type = OperationTypes.Context,
149+
Action = "SUCCEED",
150+
SubType = _config?.SubType,
151+
Name = Name,
152+
Payload = SerializeResult(result)
153+
}, cancellationToken);
154+
155+
return result;
156+
}
157+
158+
private Exception MapFailureException(ChildContextException ex)
159+
{
160+
var mapper = _config?.ErrorMapping;
161+
if (mapper == null) return ex;
162+
163+
var mapped = mapper(ex);
164+
return mapped ?? ex;
165+
}
166+
167+
private ChildContextException BuildChildContextException(Operation failedOp)
168+
{
169+
var err = failedOp.ContextDetails?.Error;
170+
return new ChildContextException(err?.ErrorMessage ?? "Child context failed")
171+
{
172+
SubType = failedOp.SubType ?? _config?.SubType,
173+
ErrorType = err?.ErrorType,
174+
ErrorData = err?.ErrorData,
175+
OriginalStackTrace = err?.StackTrace
176+
};
177+
}
178+
179+
private T DeserializeResult(string? serialized)
180+
{
181+
if (serialized == null) return default!;
182+
var bytes = Encoding.UTF8.GetBytes(serialized);
183+
using var ms = new MemoryStream(bytes);
184+
return _serializer.Deserialize<T>(ms);
185+
}
186+
187+
private string SerializeResult(T value)
188+
{
189+
using var ms = new MemoryStream();
190+
_serializer.Serialize(value, ms);
191+
return Encoding.UTF8.GetString(ms.ToArray());
192+
}
193+
194+
private static SdkErrorObject ToSdkError(Exception ex) => new()
195+
{
196+
ErrorType = ex.GetType().FullName,
197+
ErrorMessage = ex.Message,
198+
StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList()
199+
};
200+
}

Libraries/src/Amazon.Lambda.DurableExecution/Internal/DurableOperation.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,22 @@ internal abstract class DurableOperation<TResult>
1414
protected readonly TerminationManager Termination;
1515
protected readonly string OperationId;
1616
protected readonly string? Name;
17+
protected readonly string? ParentId;
1718
protected readonly string DurableExecutionArn;
1819
protected readonly CheckpointBatcher? Batcher;
1920

2021
protected DurableOperation(
2122
string operationId,
2223
string? name,
24+
string? parentId,
2325
ExecutionState state,
2426
TerminationManager termination,
2527
string durableExecutionArn,
2628
CheckpointBatcher? batcher = null)
2729
{
2830
OperationId = operationId;
2931
Name = name;
32+
ParentId = parentId;
3033
State = state;
3134
Termination = termination;
3235
DurableExecutionArn = durableExecutionArn;

0 commit comments

Comments
 (0)