Skip to content

Commit 8058cdf

Browse files
GarrettBeattyclaude
andcommitted
Add RunInChildContextAsync
Adds child-context support to the .NET Durable Execution SDK. A child context is a logical sub-workflow with its own deterministic operation-ID space, persisted as a CONTEXT operation so subsequent invocations replay the cached value without re-executing the function. Public surface: - IDurableContext.RunInChildContextAsync<T> (reflection + AOT-safe ICheckpointSerializer<T> overloads, plus a void overload). - ChildContextConfig with SubType (observability label) and ErrorMapping (transform exceptions before they surface to the caller). - ChildContextException for failure surfacing. Used as a building block for upcoming WaitForCallbackAsync. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 097e5a1 commit 8058cdf

8 files changed

Lines changed: 872 additions & 0 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Configuration for a child context.
5+
/// </summary>
6+
/// <remarks>
7+
/// A child context is a logical sub-workflow with its own deterministic
8+
/// operation-ID space, persisted as a <c>CONTEXT</c> operation. Use
9+
/// <see cref="IDurableContext.RunInChildContextAsync{T}(System.Func{IDurableContext, System.Threading.Tasks.Task{T}}, string?, ChildContextConfig?, System.Threading.CancellationToken)"/>
10+
/// (and overloads) to run code inside one.
11+
/// </remarks>
12+
public sealed class ChildContextConfig
13+
{
14+
/// <summary>
15+
/// Operation sub-type label for observability (e.g. <c>"WaitForCallback"</c>).
16+
/// Surfaces on the wire <c>OperationUpdate.SubType</c> field.
17+
/// </summary>
18+
public string? SubType { get; set; }
19+
20+
/// <summary>
21+
/// Optional function to transform exceptions thrown by the child context's
22+
/// user function before they surface to the caller. Useful for wrapping
23+
/// low-level errors into domain-specific exceptions.
24+
/// </summary>
25+
/// <remarks>
26+
/// Applied when the user function throws (the mapped exception propagates
27+
/// to the caller of <c>RunInChildContextAsync</c>) and on replay of a
28+
/// <c>FAILED</c> child context (the constructed
29+
/// <see cref="ChildContextException"/> is mapped before being thrown).
30+
/// </remarks>
31+
public Func<Exception, Exception>? ErrorMapping { get; set; }
32+
}

Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,55 @@ public Task WaitAsync(
103103
_state, _terminationManager, _durableExecutionArn, _batcher);
104104
return op.ExecuteAsync(cancellationToken);
105105
}
106+
107+
public Task<T> RunInChildContextAsync<T>(
108+
Func<IDurableContext, Task<T>> func,
109+
string? name = null,
110+
ChildContextConfig? config = null,
111+
CancellationToken cancellationToken = default)
112+
=> RunChildContext(func, name, config, cancellationToken);
113+
114+
public async Task RunInChildContextAsync(
115+
Func<IDurableContext, Task> func,
116+
string? name = null,
117+
ChildContextConfig? config = null,
118+
CancellationToken cancellationToken = default)
119+
{
120+
// Void child contexts don't carry a meaningful payload; the wrapper
121+
// returns null so the registered ILambdaSerializer is never asked to
122+
// serialize a real value.
123+
await RunChildContext<object?>(
124+
async (ctx) => { await func(ctx); return null; },
125+
name, config, cancellationToken);
126+
}
127+
128+
private Task<T> RunChildContext<T>(
129+
Func<IDurableContext, Task<T>> func,
130+
string? name,
131+
ChildContextConfig? config,
132+
CancellationToken cancellationToken)
133+
{
134+
var serializer = LambdaContext.Serializer
135+
?? throw new InvalidOperationException(
136+
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
137+
"Register a serializer via LambdaBootstrapBuilder.Create(handler, serializer) " +
138+
"(or in tests, set TestLambdaContext.Serializer).");
139+
140+
var operationId = _idGenerator.NextId();
141+
142+
// Capture this DurableContext's collaborators; the child shares state,
143+
// termination, batcher, ARN, and Lambda context — but uses a child
144+
// OperationIdGenerator so its operation IDs are deterministically
145+
// namespaced under the parent op ID.
146+
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
147+
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
148+
_durableExecutionArn, LambdaContext, _batcher);
149+
150+
var op = new ChildContextOperation<T>(
151+
operationId, name, func, config, serializer, ChildFactory,
152+
_state, _terminationManager, _durableExecutionArn, _batcher);
153+
return op.ExecuteAsync(cancellationToken);
154+
}
106155
}
107156

108157
internal sealed class DurableExecutionContext : IExecutionContext

Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,31 @@ public StepInterruptedException(string message) : base(message) { }
6767
/// <summary>Creates a <see cref="StepInterruptedException"/> wrapping an inner exception.</summary>
6868
public StepInterruptedException(string message, Exception innerException) : base(message, innerException) { }
6969
}
70+
71+
/// <summary>
72+
/// Thrown when a child context's user function fails. Surfaces from
73+
/// <c>RunInChildContextAsync</c>; the underlying error is preserved on the
74+
/// <see cref="ErrorType"/>/<see cref="ErrorData"/>/<see cref="OriginalStackTrace"/>
75+
/// fields. Use <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
76+
/// domain-specific exception.
77+
/// </summary>
78+
public class ChildContextException : DurableExecutionException
79+
{
80+
/// <summary>
81+
/// The child context's <see cref="ChildContextConfig.SubType"/>, if any.
82+
/// </summary>
83+
public string? SubType { get; init; }
84+
/// <summary>The fully-qualified type name of the original exception.</summary>
85+
public string? ErrorType { get; init; }
86+
/// <summary>Optional structured error data attached by the user.</summary>
87+
public string? ErrorData { get; init; }
88+
/// <summary>Stack trace of the original exception, captured before serialization.</summary>
89+
public IReadOnlyList<string>? OriginalStackTrace { get; init; }
90+
91+
/// <summary>Creates an empty <see cref="ChildContextException"/>.</summary>
92+
public ChildContextException() { }
93+
/// <summary>Creates a <see cref="ChildContextException"/> with the given message.</summary>
94+
public ChildContextException(string message) : base(message) { }
95+
/// <summary>Creates a <see cref="ChildContextException"/> wrapping an inner exception.</summary>
96+
public ChildContextException(string message, Exception innerException) : base(message, innerException) { }
97+
}

Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,37 @@ Task WaitAsync(
6060
TimeSpan duration,
6161
string? name = null,
6262
CancellationToken cancellationToken = default);
63+
64+
/// <summary>
65+
/// Run a user function inside a logical sub-workflow (a "child context").
66+
/// The child has its own deterministic operation-ID space; its result is
67+
/// checkpointed as a <c>CONTEXT</c> operation so subsequent invocations
68+
/// replay the cached value without re-executing the func.
69+
/// </summary>
70+
/// <remarks>
71+
/// Use child contexts to group related durable operations (e.g. a step plus
72+
/// a wait plus a step) into a single observability/error-handling boundary.
73+
/// On failure, surfaces as <see cref="ChildContextException"/>; supply
74+
/// <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
75+
/// domain-specific exception.
76+
/// The child context's return value is serialized to a checkpoint using the
77+
/// <see cref="ILambdaSerializer"/> registered on
78+
/// <see cref="ILambdaContext.Serializer"/>.
79+
/// </remarks>
80+
Task<T> RunInChildContextAsync<T>(
81+
Func<IDurableContext, Task<T>> func,
82+
string? name = null,
83+
ChildContextConfig? config = null,
84+
CancellationToken cancellationToken = default);
85+
86+
/// <summary>
87+
/// Run a user function inside a child context that returns no value.
88+
/// </summary>
89+
Task RunInChildContextAsync(
90+
Func<IDurableContext, Task> func,
91+
string? name = null,
92+
ChildContextConfig? config = null,
93+
CancellationToken cancellationToken = default);
6394
}
6495

6596
/// <summary>
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
using System.IO;
2+
using System.Text;
3+
using Amazon.Lambda.Core;
4+
using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
5+
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;
6+
7+
namespace Amazon.Lambda.DurableExecution.Internal;
8+
9+
/// <summary>
10+
/// Durable child context operation. Runs a user-supplied function inside a
11+
/// nested <see cref="DurableContext"/> with its own deterministic operation-ID
12+
/// space, persisting the function's result so subsequent invocations replay
13+
/// the cached value without re-executing.
14+
/// </summary>
15+
/// <remarks>
16+
/// Replay branches — example: <c>await ctx.RunInChildContextAsync(child =&gt; ..., name: "phase")</c>
17+
/// <list type="bullet">
18+
/// <item><b>Fresh</b>: no prior state → sync-flush CONTEXT START → run user
19+
/// func → on success emit CONTEXT SUCCEED → on failure emit CONTEXT FAIL
20+
/// and throw <see cref="ChildContextException"/>.</item>
21+
/// <item><b>SUCCEEDED</b>: return cached deserialized result; user func is
22+
/// NOT re-executed.</item>
23+
/// <item><b>FAILED</b>: throw <see cref="ChildContextException"/> with the
24+
/// recorded error; if <see cref="ChildContextConfig.ErrorMapping"/> is
25+
/// set, the mapped exception is thrown instead.</item>
26+
/// <item><b>STARTED</b> / <b>PENDING</b>: re-run the user func without
27+
/// re-checkpointing START. The child's own operations recover from their
28+
/// own checkpoints, so this is replay propagation; if a wait/callback
29+
/// inside the child is still pending, the user func re-suspends.</item>
30+
/// </list>
31+
/// Unlike <see cref="StepOperation{T}"/>, child contexts have no retry strategy:
32+
/// failure is terminal and surfaces immediately via
33+
/// <see cref="ChildContextException"/>.
34+
/// </remarks>
35+
internal sealed class ChildContextOperation<T> : DurableOperation<T>
36+
{
37+
private readonly Func<IDurableContext, Task<T>> _func;
38+
private readonly ChildContextConfig? _config;
39+
private readonly ILambdaSerializer _serializer;
40+
private readonly Func<string, IDurableContext> _childContextFactory;
41+
42+
public ChildContextOperation(
43+
string operationId,
44+
string? name,
45+
Func<IDurableContext, Task<T>> func,
46+
ChildContextConfig? config,
47+
ILambdaSerializer serializer,
48+
Func<string, IDurableContext> childContextFactory,
49+
ExecutionState state,
50+
TerminationManager termination,
51+
string durableExecutionArn,
52+
CheckpointBatcher? batcher = null)
53+
: base(operationId, name, state, termination, durableExecutionArn, batcher)
54+
{
55+
_func = func;
56+
_config = config;
57+
_serializer = serializer;
58+
_childContextFactory = childContextFactory;
59+
}
60+
61+
protected override string OperationType => OperationTypes.Context;
62+
63+
protected override async Task<T> StartAsync(CancellationToken cancellationToken)
64+
{
65+
// Sync-flush CONTEXT START before user code so the service has a record
66+
// of the parent context if the inner func suspends (e.g. a Wait inside
67+
// the child terminates the workflow before SUCCEED is reached).
68+
await EnqueueAsync(new SdkOperationUpdate
69+
{
70+
Id = OperationId,
71+
Type = OperationTypes.Context,
72+
Action = "START",
73+
SubType = _config?.SubType,
74+
Name = Name
75+
}, cancellationToken);
76+
77+
return await ExecuteFunc(cancellationToken);
78+
}
79+
80+
protected override Task<T> ReplayAsync(Operation existing, CancellationToken cancellationToken)
81+
{
82+
switch (existing.Status)
83+
{
84+
case OperationStatuses.Succeeded:
85+
// Side-effecting code runs at most once: replay returns the
86+
// cached result without invoking the user func.
87+
return Task.FromResult(DeserializeResult(existing.ContextDetails?.Result));
88+
89+
case OperationStatuses.Failed:
90+
throw MapFailureException(BuildChildContextException(existing));
91+
92+
case OperationStatuses.Started:
93+
case OperationStatuses.Pending:
94+
// Re-run the user func: the child's own operations replay from
95+
// their own checkpoints. Do NOT re-checkpoint START — the
96+
// original is still authoritative. If something inside the
97+
// child is still pending (Wait, callback, retry) the user func
98+
// will re-suspend on its own.
99+
return ExecuteFunc(cancellationToken);
100+
101+
default:
102+
throw new NonDeterministicExecutionException(
103+
$"Child context operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay.");
104+
}
105+
}
106+
107+
private async Task<T> ExecuteFunc(CancellationToken cancellationToken)
108+
{
109+
cancellationToken.ThrowIfCancellationRequested();
110+
111+
var childContext = _childContextFactory(OperationId);
112+
113+
T result;
114+
try
115+
{
116+
result = await _func(childContext);
117+
}
118+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
119+
{
120+
throw;
121+
}
122+
catch (Exception ex)
123+
{
124+
await EnqueueAsync(new SdkOperationUpdate
125+
{
126+
Id = OperationId,
127+
Type = OperationTypes.Context,
128+
Action = "FAIL",
129+
SubType = _config?.SubType,
130+
Name = Name,
131+
Error = ToSdkError(ex)
132+
}, cancellationToken);
133+
134+
throw MapFailureException(new ChildContextException(ex.Message, ex)
135+
{
136+
SubType = _config?.SubType,
137+
ErrorType = ex.GetType().FullName
138+
});
139+
}
140+
141+
await EnqueueAsync(new SdkOperationUpdate
142+
{
143+
Id = OperationId,
144+
Type = OperationTypes.Context,
145+
Action = "SUCCEED",
146+
SubType = _config?.SubType,
147+
Name = Name,
148+
Payload = SerializeResult(result)
149+
}, cancellationToken);
150+
151+
return result;
152+
}
153+
154+
private Exception MapFailureException(ChildContextException ex)
155+
{
156+
var mapper = _config?.ErrorMapping;
157+
if (mapper == null) return ex;
158+
159+
var mapped = mapper(ex);
160+
return mapped ?? ex;
161+
}
162+
163+
private ChildContextException BuildChildContextException(Operation failedOp)
164+
{
165+
var err = failedOp.ContextDetails?.Error;
166+
return new ChildContextException(err?.ErrorMessage ?? "Child context failed")
167+
{
168+
SubType = failedOp.SubType ?? _config?.SubType,
169+
ErrorType = err?.ErrorType,
170+
ErrorData = err?.ErrorData,
171+
OriginalStackTrace = err?.StackTrace
172+
};
173+
}
174+
175+
private T DeserializeResult(string? serialized)
176+
{
177+
if (serialized == null) return default!;
178+
var bytes = Encoding.UTF8.GetBytes(serialized);
179+
using var ms = new MemoryStream(bytes);
180+
return _serializer.Deserialize<T>(ms);
181+
}
182+
183+
private string SerializeResult(T value)
184+
{
185+
using var ms = new MemoryStream();
186+
_serializer.Serialize(value, ms);
187+
return Encoding.UTF8.GetString(ms.ToArray());
188+
}
189+
190+
private static SdkErrorObject ToSdkError(Exception ex) => new()
191+
{
192+
ErrorType = ex.GetType().FullName,
193+
ErrorMessage = ex.Message,
194+
StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList()
195+
};
196+
}

Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,15 @@ private static Operation MapFromSdkOperation(SdkOperation sdkOp)
132132
ExecutionDetails = sdkOp.ExecutionDetails != null ? new ExecutionDetails
133133
{
134134
InputPayload = sdkOp.ExecutionDetails.InputPayload
135+
} : null,
136+
ContextDetails = sdkOp.ContextDetails != null ? new Internal.ContextDetails
137+
{
138+
Result = sdkOp.ContextDetails.Result,
139+
Error = sdkOp.ContextDetails.Error != null ? new ErrorObject
140+
{
141+
ErrorType = sdkOp.ContextDetails.Error.ErrorType,
142+
ErrorMessage = sdkOp.ContextDetails.Error.ErrorMessage
143+
} : null
135144
} : null
136145
};
137146
}

0 commit comments

Comments
 (0)