Skip to content

Commit 369a029

Browse files
GarrettBeattyclaude
andcommitted
Add RunInChildContextAsync
Adds child-context support to the .NET Durable Execution SDK. A child context is a logical sub-workflow with its own deterministic operation-ID space, persisted as a CONTEXT operation so subsequent invocations replay the cached value without re-executing the function. Public surface: - IDurableContext.RunInChildContextAsync<T> (reflection + AOT-safe ICheckpointSerializer<T> overloads, plus a void overload). - ChildContextConfig with SubType (observability label) and ErrorMapping (transform exceptions before they surface to the caller). - ChildContextException for failure surfacing. Used as a building block for upcoming WaitForCallbackAsync. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7ca2099 commit 369a029

9 files changed

Lines changed: 955 additions & 1 deletion

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Configuration for a child context.
5+
/// </summary>
6+
/// <remarks>
7+
/// A child context is a logical sub-workflow with its own deterministic
8+
/// operation-ID space, persisted as a <c>CONTEXT</c> operation. Use
9+
/// <see cref="IDurableContext.RunInChildContextAsync{T}(System.Func{IDurableContext, System.Threading.Tasks.Task{T}}, string?, ChildContextConfig?, System.Threading.CancellationToken)"/>
10+
/// (and overloads) to run code inside one.
11+
/// </remarks>
12+
public sealed class ChildContextConfig
13+
{
14+
/// <summary>
15+
/// Operation sub-type label for observability (e.g. <c>"WaitForCallback"</c>).
16+
/// Surfaces on the wire <c>OperationUpdate.SubType</c> field.
17+
/// </summary>
18+
public string? SubType { get; set; }
19+
20+
/// <summary>
21+
/// Optional function to transform exceptions thrown by the child context's
22+
/// user function before they surface to the caller. Useful for wrapping
23+
/// low-level errors into domain-specific exceptions.
24+
/// </summary>
25+
/// <remarks>
26+
/// Applied when the user function throws (the mapped exception propagates
27+
/// to the caller of <c>RunInChildContextAsync</c>) and on replay of a
28+
/// <c>FAILED</c> child context (the constructed
29+
/// <see cref="ChildContextException"/> is mapped before being thrown).
30+
/// </remarks>
31+
public Func<Exception, Exception>? ErrorMapping { get; set; }
32+
}

Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,60 @@ public Task WaitAsync(
108108
_state, _terminationManager, _durableExecutionArn, _batcher);
109109
return op.ExecuteAsync(cancellationToken);
110110
}
111+
112+
[RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
113+
[RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
114+
public Task<T> RunInChildContextAsync<T>(
115+
Func<IDurableContext, Task<T>> func,
116+
string? name = null,
117+
ChildContextConfig? config = null,
118+
CancellationToken cancellationToken = default)
119+
=> RunChildContext(func, new ReflectionJsonCheckpointSerializer<T>(), name, config, cancellationToken);
120+
121+
public Task<T> RunInChildContextAsync<T>(
122+
Func<IDurableContext, Task<T>> func,
123+
ICheckpointSerializer<T> serializer,
124+
string? name = null,
125+
ChildContextConfig? config = null,
126+
CancellationToken cancellationToken = default)
127+
=> RunChildContext(func, serializer, name, config, cancellationToken);
128+
129+
public async Task RunInChildContextAsync(
130+
Func<IDurableContext, Task> func,
131+
string? name = null,
132+
ChildContextConfig? config = null,
133+
CancellationToken cancellationToken = default)
134+
{
135+
// Void child contexts don't carry a meaningful payload; wrap with a
136+
// null-only serializer that doesn't touch reflection.
137+
await RunChildContext<object?>(
138+
async (ctx) => { await func(ctx); return null; },
139+
NullCheckpointSerializer.Instance,
140+
name, config, cancellationToken);
141+
}
142+
143+
private Task<T> RunChildContext<T>(
144+
Func<IDurableContext, Task<T>> func,
145+
ICheckpointSerializer<T> serializer,
146+
string? name,
147+
ChildContextConfig? config,
148+
CancellationToken cancellationToken)
149+
{
150+
var operationId = _idGenerator.NextId();
151+
152+
// Capture this DurableContext's collaborators; the child shares state,
153+
// termination, batcher, ARN, and Lambda context — but uses a child
154+
// OperationIdGenerator so its operation IDs are deterministically
155+
// namespaced under the parent op ID.
156+
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
157+
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
158+
_durableExecutionArn, LambdaContext, _batcher);
159+
160+
var op = new ChildContextOperation<T>(
161+
operationId, name, func, config, serializer, ChildFactory,
162+
_state, _terminationManager, _durableExecutionArn, _batcher);
163+
return op.ExecuteAsync(cancellationToken);
164+
}
111165
}
112166

113167
/// <summary>

Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/DurableExecutionException.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,31 @@ public StepException(string message) : base(message) { }
4747
/// <summary>Creates a <see cref="StepException"/> wrapping an inner exception.</summary>
4848
public StepException(string message, Exception innerException) : base(message, innerException) { }
4949
}
50+
51+
/// <summary>
52+
/// Thrown when a child context's user function fails. Surfaces from
53+
/// <c>RunInChildContextAsync</c>; the underlying error is preserved on the
54+
/// <see cref="ErrorType"/>/<see cref="ErrorData"/>/<see cref="OriginalStackTrace"/>
55+
/// fields. Use <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
56+
/// domain-specific exception.
57+
/// </summary>
58+
public class ChildContextException : DurableExecutionException
59+
{
60+
/// <summary>
61+
/// The child context's <see cref="ChildContextConfig.SubType"/>, if any.
62+
/// </summary>
63+
public string? SubType { get; init; }
64+
/// <summary>The fully-qualified type name of the original exception.</summary>
65+
public string? ErrorType { get; init; }
66+
/// <summary>Optional structured error data attached by the user.</summary>
67+
public string? ErrorData { get; init; }
68+
/// <summary>Stack trace of the original exception, captured before serialization.</summary>
69+
public IReadOnlyList<string>? OriginalStackTrace { get; init; }
70+
71+
/// <summary>Creates an empty <see cref="ChildContextException"/>.</summary>
72+
public ChildContextException() { }
73+
/// <summary>Creates a <see cref="ChildContextException"/> with the given message.</summary>
74+
public ChildContextException(string message) : base(message) { }
75+
/// <summary>Creates a <see cref="ChildContextException"/> wrapping an inner exception.</summary>
76+
public ChildContextException(string message, Exception innerException) : base(message, innerException) { }
77+
}

Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,52 @@ Task WaitAsync(
7373
TimeSpan duration,
7474
string? name = null,
7575
CancellationToken cancellationToken = default);
76+
77+
/// <summary>
78+
/// Run a user function inside a logical sub-workflow (a "child context").
79+
/// The child has its own deterministic operation-ID space; its result is
80+
/// checkpointed as a <c>CONTEXT</c> operation so subsequent invocations
81+
/// replay the cached value without re-executing the func.
82+
/// </summary>
83+
/// <remarks>
84+
/// Use child contexts to group related durable operations (e.g. a step plus
85+
/// a wait plus a step) into a single observability/error-handling boundary.
86+
/// On failure, surfaces as <see cref="ChildContextException"/>; supply
87+
/// <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
88+
/// domain-specific exception.
89+
/// The child context's return value is serialized to a checkpoint using
90+
/// reflection-based <c>System.Text.Json</c>. For NativeAOT or trimmed
91+
/// deployments, use the overload that takes an
92+
/// <see cref="ICheckpointSerializer{T}"/>.
93+
/// </remarks>
94+
[RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
95+
[RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
96+
Task<T> RunInChildContextAsync<T>(
97+
Func<IDurableContext, Task<T>> func,
98+
string? name = null,
99+
ChildContextConfig? config = null,
100+
CancellationToken cancellationToken = default);
101+
102+
/// <summary>
103+
/// Run a user function inside a child context with AOT-safe checkpoint
104+
/// serialization. The supplied <paramref name="serializer"/> is used in
105+
/// place of reflection-based JSON.
106+
/// </summary>
107+
Task<T> RunInChildContextAsync<T>(
108+
Func<IDurableContext, Task<T>> func,
109+
ICheckpointSerializer<T> serializer,
110+
string? name = null,
111+
ChildContextConfig? config = null,
112+
CancellationToken cancellationToken = default);
113+
114+
/// <summary>
115+
/// Run a user function inside a child context that returns no value.
116+
/// </summary>
117+
Task RunInChildContextAsync(
118+
Func<IDurableContext, Task> func,
119+
string? name = null,
120+
ChildContextConfig? config = null,
121+
CancellationToken cancellationToken = default);
76122
}
77123

78124
/// <summary>
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
2+
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;
3+
4+
namespace Amazon.Lambda.DurableExecution.Internal;
5+
6+
/// <summary>
7+
/// Durable child context operation. Runs a user-supplied function inside a
8+
/// nested <see cref="DurableContext"/> with its own deterministic operation-ID
9+
/// space, persisting the function's result so subsequent invocations replay
10+
/// the cached value without re-executing.
11+
/// </summary>
12+
/// <remarks>
13+
/// Replay branches — example: <c>await ctx.RunInChildContextAsync(child =&gt; ..., name: "phase")</c>
14+
/// <list type="bullet">
15+
/// <item><b>Fresh</b>: no prior state → sync-flush CONTEXT START → run user
16+
/// func → on success emit CONTEXT SUCCEED → on failure emit CONTEXT FAIL
17+
/// and throw <see cref="ChildContextException"/>.</item>
18+
/// <item><b>SUCCEEDED</b>: return cached deserialized result; user func is
19+
/// NOT re-executed.</item>
20+
/// <item><b>FAILED</b>: throw <see cref="ChildContextException"/> with the
21+
/// recorded error; if <see cref="ChildContextConfig.ErrorMapping"/> is
22+
/// set, the mapped exception is thrown instead.</item>
23+
/// <item><b>STARTED</b> / <b>PENDING</b>: re-run the user func without
24+
/// re-checkpointing START. The child's own operations recover from their
25+
/// own checkpoints, so this is replay propagation; if a wait/callback
26+
/// inside the child is still pending, the user func re-suspends.</item>
27+
/// </list>
28+
/// Unlike <see cref="StepOperation{T}"/>, child contexts have no retry strategy:
29+
/// failure is terminal and surfaces immediately via
30+
/// <see cref="ChildContextException"/>.
31+
/// </remarks>
32+
internal sealed class ChildContextOperation<T> : DurableOperation<T>
33+
{
34+
private readonly Func<IDurableContext, Task<T>> _func;
35+
private readonly ChildContextConfig? _config;
36+
private readonly ICheckpointSerializer<T> _serializer;
37+
private readonly Func<string, IDurableContext> _childContextFactory;
38+
39+
public ChildContextOperation(
40+
string operationId,
41+
string? name,
42+
Func<IDurableContext, Task<T>> func,
43+
ChildContextConfig? config,
44+
ICheckpointSerializer<T> serializer,
45+
Func<string, IDurableContext> childContextFactory,
46+
ExecutionState state,
47+
TerminationManager termination,
48+
string durableExecutionArn,
49+
CheckpointBatcher? batcher = null)
50+
: base(operationId, name, state, termination, durableExecutionArn, batcher)
51+
{
52+
_func = func;
53+
_config = config;
54+
_serializer = serializer;
55+
_childContextFactory = childContextFactory;
56+
}
57+
58+
protected override string OperationType => OperationTypes.Context;
59+
60+
protected override async Task<T> StartAsync(CancellationToken cancellationToken)
61+
{
62+
// Sync-flush CONTEXT START before user code so the service has a record
63+
// of the parent context if the inner func suspends (e.g. a Wait inside
64+
// the child terminates the workflow before SUCCEED is reached).
65+
await EnqueueAsync(new SdkOperationUpdate
66+
{
67+
Id = OperationId,
68+
Type = OperationTypes.Context,
69+
Action = "START",
70+
SubType = _config?.SubType,
71+
Name = Name
72+
}, cancellationToken);
73+
74+
return await ExecuteFunc(cancellationToken);
75+
}
76+
77+
protected override Task<T> ReplayAsync(Operation existing, CancellationToken cancellationToken)
78+
{
79+
switch (existing.Status)
80+
{
81+
case OperationStatuses.Succeeded:
82+
// Side-effecting code runs at most once: replay returns the
83+
// cached result without invoking the user func.
84+
return Task.FromResult(DeserializeResult(existing.ContextDetails?.Result));
85+
86+
case OperationStatuses.Failed:
87+
throw MapFailureException(BuildChildContextException(existing));
88+
89+
case OperationStatuses.Started:
90+
case OperationStatuses.Pending:
91+
// Re-run the user func: the child's own operations replay from
92+
// their own checkpoints. Do NOT re-checkpoint START — the
93+
// original is still authoritative. If something inside the
94+
// child is still pending (Wait, callback, retry) the user func
95+
// will re-suspend on its own.
96+
return ExecuteFunc(cancellationToken);
97+
98+
default:
99+
throw new NonDeterministicExecutionException(
100+
$"Child context operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay.");
101+
}
102+
}
103+
104+
private async Task<T> ExecuteFunc(CancellationToken cancellationToken)
105+
{
106+
cancellationToken.ThrowIfCancellationRequested();
107+
108+
var childContext = _childContextFactory(OperationId);
109+
110+
T result;
111+
try
112+
{
113+
result = await _func(childContext);
114+
}
115+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
116+
{
117+
throw;
118+
}
119+
catch (Exception ex)
120+
{
121+
await EnqueueAsync(new SdkOperationUpdate
122+
{
123+
Id = OperationId,
124+
Type = OperationTypes.Context,
125+
Action = "FAIL",
126+
SubType = _config?.SubType,
127+
Name = Name,
128+
Error = ToSdkError(ex)
129+
}, cancellationToken);
130+
131+
throw MapFailureException(new ChildContextException(ex.Message, ex)
132+
{
133+
SubType = _config?.SubType,
134+
ErrorType = ex.GetType().FullName
135+
});
136+
}
137+
138+
await EnqueueAsync(new SdkOperationUpdate
139+
{
140+
Id = OperationId,
141+
Type = OperationTypes.Context,
142+
Action = "SUCCEED",
143+
SubType = _config?.SubType,
144+
Name = Name,
145+
Payload = SerializeResult(result)
146+
}, cancellationToken);
147+
148+
return result;
149+
}
150+
151+
private Exception MapFailureException(ChildContextException ex)
152+
{
153+
var mapper = _config?.ErrorMapping;
154+
if (mapper == null) return ex;
155+
156+
var mapped = mapper(ex);
157+
return mapped ?? ex;
158+
}
159+
160+
private ChildContextException BuildChildContextException(Operation failedOp)
161+
{
162+
var err = failedOp.ContextDetails?.Error;
163+
return new ChildContextException(err?.ErrorMessage ?? "Child context failed")
164+
{
165+
SubType = failedOp.SubType ?? _config?.SubType,
166+
ErrorType = err?.ErrorType,
167+
ErrorData = err?.ErrorData,
168+
OriginalStackTrace = err?.StackTrace
169+
};
170+
}
171+
172+
private T DeserializeResult(string? serialized)
173+
{
174+
if (serialized == null) return default!;
175+
return _serializer.Deserialize(serialized, new SerializationContext(OperationId, DurableExecutionArn));
176+
}
177+
178+
private string SerializeResult(T value)
179+
=> _serializer.Serialize(value, new SerializationContext(OperationId, DurableExecutionArn));
180+
181+
private static SdkErrorObject ToSdkError(Exception ex) => new()
182+
{
183+
ErrorType = ex.GetType().FullName,
184+
ErrorMessage = ex.Message,
185+
StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList()
186+
};
187+
}

Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,15 @@ private static Internal.Operation MapFromSdkOperation(SdkOperation sdkOp)
102102
ExecutionDetails = sdkOp.ExecutionDetails != null ? new Internal.ExecutionDetails
103103
{
104104
InputPayload = sdkOp.ExecutionDetails.InputPayload
105+
} : null,
106+
ContextDetails = sdkOp.ContextDetails != null ? new Internal.ContextDetails
107+
{
108+
Result = sdkOp.ContextDetails.Result,
109+
Error = sdkOp.ContextDetails.Error != null ? new ErrorObject
110+
{
111+
ErrorType = sdkOp.ContextDetails.Error.ErrorType,
112+
ErrorMessage = sdkOp.ContextDetails.Error.ErrorMessage
113+
} : null
105114
} : null
106115
};
107116
}

0 commit comments

Comments
 (0)