Skip to content

Commit 4d97473

Browse files
GarrettBeattyclaude
andcommitted
Add RunInChildContextAsync
Adds child-context support to the .NET Durable Execution SDK. A child context is a logical sub-workflow with its own deterministic operation-ID space, persisted as a CONTEXT operation so subsequent invocations replay the cached value without re-executing the function. Public surface: - IDurableContext.RunInChildContextAsync<T> (reflection + AOT-safe ICheckpointSerializer<T> overloads, plus a void overload). - ChildContextConfig with SubType (observability label) and ErrorMapping (transform exceptions before they surface to the caller). - ChildContextException for failure surfacing. Used as a building block for upcoming WaitForCallbackAsync. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> update docs (#2372) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5ca0d3c commit 4d97473

25 files changed

Lines changed: 1578 additions & 74 deletions

Docs/durable-execution-design.md

Lines changed: 72 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,10 +1197,9 @@ public class StepConfig
11971197
public StepSemantics Semantics { get; set; } = StepSemantics.AtLeastOncePerRetry;
11981198

11991199
// Note: there is no Serializer property here. Step result serialization
1200-
// is delegated to the ILambdaSerializer registered on ILambdaContext.Serializer
1201-
// (assembly attribute or LambdaBootstrapBuilder.Create). To swap the
1202-
// step-checkpoint format for a single step, the planned route is the
1203-
// StepAsync(..., ICheckpointSerializer<T>, ...) overload (post-v1).
1200+
// is delegated to the ILambdaSerializer registered on
1201+
// ILambdaContext.Serializer (assembly attribute or
1202+
// LambdaBootstrapBuilder.Create).
12041203
}
12051204

12061205
public enum StepSemantics
@@ -1231,10 +1230,9 @@ public class CallbackConfig
12311230
/// </summary>
12321231
public TimeSpan HeartbeatTimeout { get; set; } = TimeSpan.Zero;
12331232

1234-
/// <summary>
1235-
/// Custom serializer for callback result.
1236-
/// </summary>
1237-
public ICheckpointSerializer? Serializer { get; set; }
1233+
// Note: there is no Serializer property here. Callback result
1234+
// serialization flows through the ILambdaSerializer registered on
1235+
// ILambdaContext.Serializer, the same as StepAsync.
12381236
}
12391237

12401238
/// <summary>
@@ -1259,14 +1257,14 @@ public class InvokeConfig
12591257
public TimeSpan Timeout { get; set; } = TimeSpan.Zero;
12601258

12611259
/// <summary>
1262-
/// Custom serializer for the payload.
1260+
/// Optional tenant identifier propagated to the chained invocation.
1261+
/// Matches the tenantId field on Python/JS/Java InvokeConfig.
12631262
/// </summary>
1264-
public ICheckpointSerializer? PayloadSerializer { get; set; }
1263+
public string? TenantId { get; set; }
12651264

1266-
/// <summary>
1267-
/// Custom serializer for the result.
1268-
/// </summary>
1269-
public ICheckpointSerializer? ResultSerializer { get; set; }
1265+
// Note: there are no payload/result serializer properties here. Both
1266+
// flow through the ILambdaSerializer registered on
1267+
// ILambdaContext.Serializer, the same as StepAsync.
12701268
}
12711269

12721270
/// <summary>
@@ -1381,10 +1379,9 @@ public class CompletionConfig
13811379
/// </summary>
13821380
public class ChildContextConfig
13831381
{
1384-
/// <summary>
1385-
/// Custom serializer for the child context's return value.
1386-
/// </summary>
1387-
public ICheckpointSerializer? Serializer { get; set; }
1382+
// Note: there is no Serializer property here. The child context's
1383+
// return value is serialized via the ILambdaSerializer registered on
1384+
// ILambdaContext.Serializer, the same as StepAsync.
13881385
13891386
/// <summary>
13901387
/// Operation sub-type label for observability (e.g., in test runner output).
@@ -1425,34 +1422,54 @@ public class WaitForConditionConfig<TState>
14251422
public interface IBatchResult<T>
14261423
{
14271424
/// <summary>
1428-
/// All items (succeeded and failed).
1425+
/// All items, in original index order.
14291426
/// </summary>
14301427
IReadOnlyList<IBatchItem<T>> All { get; }
14311428

14321429
/// <summary>
1433-
/// Only successful items.
1430+
/// Items whose Status is Succeeded.
14341431
/// </summary>
14351432
IReadOnlyList<IBatchItem<T>> Succeeded { get; }
14361433

14371434
/// <summary>
1438-
/// Only failed items.
1435+
/// Items whose Status is Failed.
14391436
/// </summary>
14401437
IReadOnlyList<IBatchItem<T>> Failed { get; }
14411438

14421439
/// <summary>
1443-
/// Get all successful results. Throws if any failed.
1440+
/// Items still in flight when the batch resolved (CompletionConfig short-circuit).
1441+
/// </summary>
1442+
IReadOnlyList<IBatchItem<T>> Started { get; }
1443+
1444+
/// <summary>
1445+
/// Get all successful results in original index order. Throws if any failed.
14441446
/// </summary>
14451447
IReadOnlyList<T> GetResults();
14461448

14471449
/// <summary>
1448-
/// Throw an exception if any item failed.
1450+
/// Get all errors from failed items.
1451+
/// </summary>
1452+
IReadOnlyList<DurableExecutionException> GetErrors();
1453+
1454+
/// <summary>
1455+
/// Throw a single aggregated exception if any item failed.
14491456
/// </summary>
14501457
void ThrowIfError();
14511458

14521459
/// <summary>
1453-
/// Why the operation completed.
1460+
/// True if any item is in the Failed state.
1461+
/// </summary>
1462+
bool HasFailure { get; }
1463+
1464+
/// <summary>
1465+
/// Why the batch resolved.
14541466
/// </summary>
14551467
CompletionReason CompletionReason { get; }
1468+
1469+
int SuccessCount { get; }
1470+
int FailureCount { get; }
1471+
int StartedCount { get; }
1472+
int TotalCount { get; }
14561473
}
14571474

14581475
public interface IBatchItem<T>
@@ -1463,7 +1480,29 @@ public interface IBatchItem<T>
14631480
DurableExecutionException? Error { get; }
14641481
}
14651482

1466-
public enum BatchItemStatus { Succeeded, Failed, Cancelled }
1483+
/// <summary>
1484+
/// Status of an individual item in a batch result.
1485+
/// Mirrors the wire-state observed at the time the batch resolved — items still
1486+
/// running when a CompletionConfig short-circuits remain in <see cref="Started"/>.
1487+
/// </summary>
1488+
public enum BatchItemStatus
1489+
{
1490+
/// <summary>
1491+
/// The branch ran to completion and produced a result.
1492+
/// </summary>
1493+
Succeeded,
1494+
1495+
/// <summary>
1496+
/// The branch ran to completion and threw.
1497+
/// </summary>
1498+
Failed,
1499+
1500+
/// <summary>
1501+
/// The branch was still in flight when the batch's CompletionConfig
1502+
/// resolved (e.g., FirstSuccessful returned before this branch finished).
1503+
/// </summary>
1504+
Started
1505+
}
14671506
public enum CompletionReason { AllCompleted, MinSuccessfulReached, FailureToleranceExceeded }
14681507

14691508
/// <summary>
@@ -1616,32 +1655,17 @@ public class BadResult
16161655

16171656
### Custom Serialization
16181657

1619-
Implement `ICheckpointSerializer<T>` for custom serialization:
1658+
There is no per-call serializer override on any durable-execution API. Every checkpoint — step results, callback results, invoke payloads/results, child-context results — is serialized via the `ILambdaSerializer` registered on `ILambdaContext.Serializer`. To customize, register a different `ILambdaSerializer` for the function:
16201659

16211660
```csharp
1622-
public interface ICheckpointSerializer<T>
1623-
{
1624-
string Serialize(T value, SerializationContext context);
1625-
T Deserialize(string data, SerializationContext context);
1626-
}
1661+
// Class library mode — register via the assembly attribute.
1662+
[assembly: LambdaSerializer(typeof(MyCustomSerializer))]
16271663

1628-
public record SerializationContext(string OperationId, string DurableExecutionArn);
1664+
// Executable / custom runtime — pass to LambdaBootstrapBuilder.Create.
1665+
using var bootstrap = LambdaBootstrapBuilder.Create(handler, new MyCustomSerializer()).Build();
16291666
```
16301667

1631-
Usage — pass the serializer to the per-step `StepAsync` overload directly. This is
1632-
the only way to override the registered `ILambdaSerializer` for a single step's
1633-
checkpoint; it's intentional that there's no `StepConfig.Serializer` knob, so you
1634-
have one obvious place to opt in (and the type is `ICheckpointSerializer<T>`, not
1635-
a non-generic marker, so the compiler catches a mismatched `T`):
1636-
1637-
```csharp
1638-
var result = await context.StepAsync(
1639-
async () => await GetLargeData(),
1640-
new CompressedJsonSerializer<LargeData>(),
1641-
name: "get_data");
1642-
```
1643-
1644-
> **Status:** the `ICheckpointSerializer<T>` overload is a planned post-v1 addition. Today, all step checkpoints flow through the `ILambdaSerializer` registered on `ILambdaContext.Serializer` — see [NativeAOT compatibility](#nativeaot-compatibility) for how that's wired.
1668+
The customization applies uniformly to the whole function — there is no way today to swap the format for a single step or a single result type. See [NativeAOT compatibility](#nativeaot-compatibility) for how the registration flows in JIT vs. AOT.
16451669

16461670
### Class library vs. executable output
16471671

@@ -1722,27 +1746,7 @@ The SDK handles overflow transparently:
17221746

17231747
**Lambda response exceeding 6 MB:** If the final orchestration result exceeds the response payload limit, the SDK checkpoints the result before returning the `DurableExecutionInvocationOutput`. The service reads the result from the checkpoint rather than from the response body.
17241748

1725-
**Guidance for very large results:** For results that are inherently large (multi-MB payloads), use a custom `ICheckpointSerializer<T>` that offloads to external storage (S3, DynamoDB) and returns a reference. This keeps checkpoint sizes small and avoids pagination overhead:
1726-
1727-
```csharp
1728-
public class S3BackedSerializer<T> : ICheckpointSerializer<T>
1729-
{
1730-
public string Serialize(T value, SerializationContext context)
1731-
{
1732-
var key = $"results/{context.DurableExecutionArn}/{context.OperationId}";
1733-
// Upload to S3, return the key as the checkpoint value
1734-
_s3Client.PutObject(new PutObjectRequest { BucketName = _bucket, Key = key, ... });
1735-
return key;
1736-
}
1737-
1738-
public T Deserialize(string data, SerializationContext context)
1739-
{
1740-
// Download from S3 using the stored key
1741-
var response = _s3Client.GetObject(new GetObjectRequest { BucketName = _bucket, Key = data });
1742-
return JsonSerializer.Deserialize<T>(response.ResponseStream);
1743-
}
1744-
}
1745-
```
1749+
**Guidance for very large results:** For results that are inherently large (multi-MB payloads), do the offload yourself inside the step — write the payload to external storage (S3, DynamoDB) and return a reference (e.g. an S3 key) from the step. The reference is what the SDK serializes and checkpoints, so the checkpoint stays small and pagination is avoided. Subsequent steps fetch the payload from external storage on demand.
17461750

17471751
---
17481752

@@ -1946,7 +1950,7 @@ This is post-v1 work. For the initial release, developers test durable functions
19461950
- **Lambda runtime:** Requires the managed .NET 8 runtime or a custom runtime (`provided.al2023`) for NativeAOT deployments.
19471951
- **Durable execution service:** The function must be configured with `DurableConfig` (handled automatically by the `[DurableExecution]` source generator).
19481952
- **Qualified function identifiers:** `InvokeAsync` requires a version number, alias, or `$LATEST` — unqualified ARNs are not supported for durable invocations.
1949-
- **Serializable results:** All step return types must be JSON-serializable (or use a custom `ICheckpointSerializer<T>`).
1953+
- **Serializable results:** All step return types must be serializable by the `ILambdaSerializer` registered on `ILambdaContext.Serializer` (default: `System.Text.Json`).
19501954

19511955
---
19521956

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Configuration for a child context.
5+
/// </summary>
6+
/// <remarks>
7+
/// A child context is a logical sub-workflow with its own deterministic
8+
/// operation-ID space, persisted as a <c>CONTEXT</c> operation. Use
9+
/// <see cref="IDurableContext.RunInChildContextAsync{T}(System.Func{IDurableContext, System.Threading.Tasks.Task{T}}, string?, ChildContextConfig?, System.Threading.CancellationToken)"/>
10+
/// (and overloads) to run code inside one.
11+
/// </remarks>
12+
public sealed class ChildContextConfig
13+
{
14+
/// <summary>
15+
/// Operation sub-type label for observability (e.g. <c>"WaitForCallback"</c>).
16+
/// Surfaces on the wire <c>OperationUpdate.SubType</c> field.
17+
/// </summary>
18+
public string? SubType { get; set; }
19+
20+
/// <summary>
21+
/// Optional function to transform exceptions thrown by the child context's
22+
/// user function before they surface to the caller. Useful for wrapping
23+
/// low-level errors into domain-specific exceptions.
24+
/// </summary>
25+
/// <remarks>
26+
/// Applied when the user function throws (the mapped exception propagates
27+
/// to the caller of <c>RunInChildContextAsync</c>) and on replay of a
28+
/// <c>FAILED</c> child context (the constructed
29+
/// <see cref="ChildContextException"/> is mapped before being thrown).
30+
/// </remarks>
31+
public Func<Exception, Exception>? ErrorMapping { get; set; }
32+
}

Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private Task<T> RunStep<T>(
7676

7777
var operationId = _idGenerator.NextId();
7878
var op = new StepOperation<T>(
79-
operationId, name, func, config, serializer, Logger,
79+
operationId, name, _idGenerator.ParentId, func, config, serializer, Logger,
8080
_state, _terminationManager, _durableExecutionArn, _batcher);
8181
return op.ExecuteAsync(cancellationToken);
8282
}
@@ -99,7 +99,58 @@ public Task WaitAsync(
9999
var operationId = _idGenerator.NextId();
100100
var waitSeconds = (int)Math.Max(1, Math.Ceiling(duration.TotalSeconds));
101101
var op = new WaitOperation(
102-
operationId, name, waitSeconds,
102+
operationId, name, _idGenerator.ParentId, waitSeconds,
103+
_state, _terminationManager, _durableExecutionArn, _batcher);
104+
return op.ExecuteAsync(cancellationToken);
105+
}
106+
107+
public Task<T> RunInChildContextAsync<T>(
108+
Func<IDurableContext, Task<T>> func,
109+
string? name = null,
110+
ChildContextConfig? config = null,
111+
CancellationToken cancellationToken = default)
112+
=> RunChildContext(func, name, config, cancellationToken);
113+
114+
public async Task RunInChildContextAsync(
115+
Func<IDurableContext, Task> func,
116+
string? name = null,
117+
ChildContextConfig? config = null,
118+
CancellationToken cancellationToken = default)
119+
{
120+
// Void child contexts don't carry a meaningful payload; the wrapper
121+
// returns null so the registered ILambdaSerializer is never asked to
122+
// serialize a real value.
123+
await RunChildContext<object?>(
124+
async (ctx) => { await func(ctx); return null; },
125+
name, config, cancellationToken);
126+
}
127+
128+
private Task<T> RunChildContext<T>(
129+
Func<IDurableContext, Task<T>> func,
130+
string? name,
131+
ChildContextConfig? config,
132+
CancellationToken cancellationToken)
133+
{
134+
var serializer = LambdaContext.Serializer
135+
?? throw new InvalidOperationException(
136+
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
137+
"In the class library programming model, register one with " +
138+
"[assembly: LambdaSerializer(typeof(...))]. In an executable / custom " +
139+
"runtime, pass it to LambdaBootstrapBuilder.Create(handler, serializer). " +
140+
"In tests, set TestLambdaContext.Serializer.");
141+
142+
var operationId = _idGenerator.NextId();
143+
144+
// Capture this DurableContext's collaborators; the child shares state,
145+
// termination, batcher, ARN, and Lambda context — but uses a child
146+
// OperationIdGenerator so its operation IDs are deterministically
147+
// namespaced under the parent op ID.
148+
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
149+
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
150+
_durableExecutionArn, LambdaContext, _batcher);
151+
152+
var op = new ChildContextOperation<T>(
153+
operationId, name, _idGenerator.ParentId, func, config, serializer, ChildFactory,
103154
_state, _terminationManager, _durableExecutionArn, _batcher);
104155
return op.ExecuteAsync(cancellationToken);
105156
}

Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,31 @@ public StepInterruptedException(string message) : base(message) { }
6767
/// <summary>Creates a <see cref="StepInterruptedException"/> wrapping an inner exception.</summary>
6868
public StepInterruptedException(string message, Exception innerException) : base(message, innerException) { }
6969
}
70+
71+
/// <summary>
72+
/// Thrown when a child context's user function fails. Surfaces from
73+
/// <c>RunInChildContextAsync</c>; the underlying error is preserved on the
74+
/// <see cref="ErrorType"/>/<see cref="ErrorData"/>/<see cref="OriginalStackTrace"/>
75+
/// fields. Use <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
76+
/// domain-specific exception.
77+
/// </summary>
78+
public class ChildContextException : DurableExecutionException
79+
{
80+
/// <summary>
81+
/// The child context's <see cref="ChildContextConfig.SubType"/>, if any.
82+
/// </summary>
83+
public string? SubType { get; init; }
84+
/// <summary>The fully-qualified type name of the original exception.</summary>
85+
public string? ErrorType { get; init; }
86+
/// <summary>Optional structured error data attached by the user.</summary>
87+
public string? ErrorData { get; init; }
88+
/// <summary>Stack trace of the original exception, captured before serialization.</summary>
89+
public IReadOnlyList<string>? OriginalStackTrace { get; init; }
90+
91+
/// <summary>Creates an empty <see cref="ChildContextException"/>.</summary>
92+
public ChildContextException() { }
93+
/// <summary>Creates a <see cref="ChildContextException"/> with the given message.</summary>
94+
public ChildContextException(string message) : base(message) { }
95+
/// <summary>Creates a <see cref="ChildContextException"/> wrapping an inner exception.</summary>
96+
public ChildContextException(string message, Exception innerException) : base(message, innerException) { }
97+
}

0 commit comments

Comments
 (0)