Skip to content

Commit 097e5a1

Browse files
committed
Retry
stack-info: PR: #2363, branch: GarrettBeatty/stack/3
1 parent a8b59d9 commit 097e5a1

39 files changed

Lines changed: 1756 additions & 120 deletions

Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,23 @@ public StepException(string message) : base(message) { }
4747
/// <summary>Creates a <see cref="StepException"/> wrapping an inner exception.</summary>
4848
public StepException(string message, Exception innerException) : base(message, innerException) { }
4949
}
50+
51+
/// <summary>
52+
/// Thrown when a step under <see cref="StepSemantics.AtMostOncePerRetry"/> is
53+
/// detected to have been interrupted mid-execution on a prior invocation
54+
/// (replay sees a <c>STARTED</c> checkpoint with no terminal record).
55+
/// </summary>
56+
/// <remarks>
57+
/// Surfaces in <see cref="IRetryStrategy.ShouldRetry"/> so user-supplied
58+
/// strategies can distinguish "my code threw" from "a previous attempt
59+
/// crashed before it could record a result".
60+
/// </remarks>
61+
public class StepInterruptedException : StepException
62+
{
63+
/// <summary>Creates an empty <see cref="StepInterruptedException"/>.</summary>
64+
public StepInterruptedException() { }
65+
/// <summary>Creates a <see cref="StepInterruptedException"/> with the given message.</summary>
66+
public StepInterruptedException(string message) : base(message) { }
67+
/// <summary>Creates a <see cref="StepInterruptedException"/> wrapping an inner exception.</summary>
68+
public StepInterruptedException(string message, Exception innerException) : base(message, innerException) { }
69+
}

Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,7 @@ private static async Task<DurableExecutionInvocationOutput> WrapAsyncCore<TInput
155155
///
156156
/// State-hydration errors (<c>GetExecutionStateAsync</c>) propagate as
157157
/// <see cref="DurableExecutionException"/> too, but they are NOT caught here — they
158-
/// flow up to the host so Lambda retries, matching Python's <c>GetExecutionStateError</c>
159-
/// (which extends <c>InvocationError</c>).
158+
/// flow up to the host so Lambda retries.
160159
///
161160
/// User-code SDK errors (e.g. an SDK call inside a Step body) are caught by
162161
/// <c>StepRunner</c> and surfaced as <c>StepException</c> for the workflow's normal
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
namespace Amazon.Lambda.DurableExecution;
2+
3+
/// <summary>
4+
/// Determines whether a failed step should be retried and with what delay.
5+
/// </summary>
6+
public interface IRetryStrategy
7+
{
8+
/// <summary>
9+
/// Evaluates whether the given exception warrants a retry.
10+
/// </summary>
11+
/// <param name="exception">The exception that caused the step to fail.</param>
12+
/// <param name="attemptNumber">The 1-based attempt number that just failed.</param>
13+
/// <returns>A decision indicating whether to retry and the delay before the next attempt.</returns>
14+
RetryDecision ShouldRetry(Exception exception, int attemptNumber);
15+
}
16+
17+
/// <summary>
18+
/// The outcome of a retry evaluation.
19+
/// </summary>
20+
public readonly struct RetryDecision
21+
{
22+
/// <summary>Whether the step should be retried.</summary>
23+
public bool ShouldRetry { get; }
24+
25+
/// <summary>The delay before the next retry attempt.</summary>
26+
public TimeSpan Delay { get; }
27+
28+
private RetryDecision(bool shouldRetry, TimeSpan delay)
29+
{
30+
ShouldRetry = shouldRetry;
31+
Delay = delay;
32+
}
33+
34+
/// <summary>Indicates the step should not be retried.</summary>
35+
public static RetryDecision DoNotRetry() => new(false, TimeSpan.Zero);
36+
37+
/// <summary>Indicates the step should be retried after the specified delay.</summary>
38+
public static RetryDecision RetryAfter(TimeSpan delay) => new(true, delay);
39+
}

Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,12 @@ namespace Amazon.Lambda.DurableExecution.Internal;
1111
/// call awaits the flush of its containing batch (sync semantics).
1212
/// </summary>
1313
/// <remarks>
14-
/// TODO: when Map / Parallel / ChildContext / WaitForCondition land — or when
15-
/// AtLeastOncePerRetry step START gets a non-blocking variant — they will need
16-
/// a fire-and-forget overload like
17-
/// <c>Task EnqueueAsync(SdkOperationUpdate update, bool sync)</c> where
18-
/// <c>sync=false</c> returns as soon as the item is queued. Java's
19-
/// <c>sendOperationUpdate</c> vs <c>sendOperationUpdateAsync</c> is the model.
20-
/// Today every call site is sync, so the API stays minimal.
14+
/// Fire-and-forget semantics are achieved by simply not awaiting the returned
15+
/// Task. Errors still surface deterministically via <c>_terminalError</c>: the
16+
/// next sync <see cref="EnqueueAsync"/> or <see cref="DrainAsync"/> rethrows.
17+
/// Callers using fire-and-forget should observe the discarded Task's exception
18+
/// (see <c>StepOperation.FireAndForget</c>) so it doesn't trip the runtime's
19+
/// <c>UnobservedTaskException</c> event.
2120
/// </remarks>
2221
internal sealed class CheckpointBatcher : IAsyncDisposable
2322
{

Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcherConfig.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,10 @@ internal sealed class CheckpointBatcherConfig
2424
/// <remarks>
2525
/// TODO: not enforced today. The worker only checks <see cref="MaxBatchOperations"/>;
2626
/// a single oversized item (or a batch whose serialized size exceeds 750 KB)
27-
/// will be sent to the service and rejected there. Java/JS/Python all
28-
/// pre-flight this on the in-flight batch and split before the next add.
29-
/// Wire this in alongside the async-flush operations (Map / Parallel /
30-
/// child-context) since those are the scenarios that can actually fill a
31-
/// batch — today every batch is 1 item with <see cref="FlushInterval"/>
32-
/// = Zero, so the gap is latent.
27+
/// will be sent to the service and rejected there. Wire this in alongside
28+
/// the async-flush operations (Map / Parallel / child-context) since those
29+
/// are the scenarios that can actually fill a batch — today every batch is
30+
/// 1 item with <see cref="FlushInterval"/> = Zero, so the gap is latent.
3331
/// </remarks>
3432
internal int MaxBatchBytes { get; init; } = 750 * 1024;
3533
}

Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ namespace Amazon.Lambda.DurableExecution.Internal;
66
/// <see cref="CheckpointBatcher"/>; this type is the inbound side only.
77
/// </summary>
88
/// <remarks>
9-
/// Replay tracking mirrors the Python / Java / JavaScript reference SDKs:
109
/// <list type="bullet">
1110
/// <item>At construction the workflow is "replaying" if and only if any user-replayable
1211
/// op is present. The service always sends one <c>EXECUTION</c>-type op
@@ -41,11 +40,15 @@ public void LoadFromCheckpoint(InitialExecutionState? initialState)
4140
AddOperations(initialState.Operations);
4241
}
4342

44-
// Only user-replayable ops put us into replay mode. The service-side
45-
// EXECUTION op (input payload bookkeeping) is always present and must
46-
// not count — see Python execution.py:258 / Java ExecutionManager:81 /
47-
// JS execution-context.ts:62 for the same rule.
48-
(_isReplaying, _remainingReplayOps) = ScanReplayable();
43+
// We're "replaying" when there are completed ops (SUCCEEDED, FAILED,
44+
// CANCELLED, STOPPED) we need to re-derive before resuming live work.
45+
// The service-side EXECUTION op (input payload bookkeeping) is always
46+
// present and doesn't count. If the only ops are in-progress
47+
// (READY/PENDING/STARTED), there's nothing to re-derive — the next
48+
// user call IS the next thing to run — so IsReplaying starts false.
49+
var (_, terminalCount) = ScanReplayable();
50+
_remainingReplayOps = terminalCount;
51+
_isReplaying = terminalCount > 0;
4952
}
5053

5154
public void AddOperations(IEnumerable<Operation> operations)
@@ -91,8 +94,11 @@ public void TrackReplay(string operationId)
9194

9295
public void ValidateReplayConsistency(string operationId, string expectedType, string? expectedName)
9396
{
94-
if (!_isReplaying) return;
95-
97+
// Independent of IsReplaying: as long as a checkpoint record exists
98+
// for this id, its type/name must match what user code is asking for.
99+
// If the only checkpointed ops are in-progress (PENDING/READY/STARTED),
100+
// IsReplaying is false but the records still exist and code drift can
101+
// still produce a mismatch.
96102
if (!_operations.TryGetValue(operationId, out var op)) return;
97103

98104
if (op.Type != null && op.Type != expectedType)

Libraries/src/Amazon.Lambda.DurableExecution/Internal/OperationIdGenerator.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ namespace Amazon.Lambda.DurableExecution.Internal;
88
/// <summary>
99
/// Generates deterministic operation IDs for durable operations. Each call
1010
/// increments an internal counter and SHA-256 hashes <c>"&lt;parentId&gt;-&lt;counter&gt;"</c>
11-
/// (or just <c>"&lt;counter&gt;"</c> at the root). Hashing matches the wire format
12-
/// used by the Java/JS/Python SDKs so the same workflow position produces a
13-
/// stable, opaque ID across replays — and the human-readable step name is
14-
/// carried separately on <c>OperationUpdate.Name</c>, so renaming a step does
15-
/// not break replay correlation.
11+
/// (or just <c>"&lt;counter&gt;"</c> at the root). The same workflow position
12+
/// produces a stable, opaque ID across replays — and the human-readable step
13+
/// name is carried separately on <c>OperationUpdate.Name</c>, so renaming a
14+
/// step does not break replay correlation.
1615
/// </summary>
1716
internal sealed class OperationIdGenerator
1817
{
@@ -46,7 +45,7 @@ public OperationIdGenerator(string? parentId)
4645

4746
/// <summary>
4847
/// Generates the next operation ID. The counter is pre-incremented so the
49-
/// first ID is <c>hash("1")</c>, matching the reference SDKs.
48+
/// first ID is <c>hash("1")</c>.
5049
/// </summary>
5150
/// <remarks>
5251
/// Uses <see cref="Interlocked.Increment(ref int)"/> so concurrent callers
@@ -55,7 +54,7 @@ public OperationIdGenerator(string? parentId)
5554
/// <c>MapAsync</c> branches that fan out before awaiting) cannot collide
5655
/// on the same ID. Determinism still requires that calls happen in a
5756
/// deterministic order — atomicity prevents duplicate IDs but not
58-
/// reordering between replays. Matches Java's <c>AtomicInteger.incrementAndGet</c>.
57+
/// reordering between replays.
5958
/// </remarks>
6059
public string NextId()
6160
{

0 commit comments

Comments
 (0)