Skip to content

Commit 5cdab8d

Browse files
GarrettBeattyclaude
andcommitted
Track replay state per operation rather than via a global flag
Match the Python / Java / JavaScript reference SDKs' replay-mode model: the workflow is "replaying" iff it has not yet revisited every checkpointed completed user-replayable operation. A single global flag flipped on the first fresh op (the prior model) misclassified workflow- body code that runs before the first step and would not generalize to Map/Parallel/Callback later. ExecutionState changes: - Replace `Mode`/`ExecutionMode`/`EnterExecutionMode()` with `IsReplaying` + `TrackReplay(operationId)`. - Initial replay decision: any non-EXECUTION op present means we're replaying. The service always sends an EXECUTION-type op carrying the input payload — that's bookkeeping, not user history, so it does not count toward replay (matches Python execution.py:258, Java ExecutionManager:81, JS execution-context.ts:62). - TrackReplay flips IsReplaying false once every checkpointed terminal- status non-EXECUTION op has been visited. Terminal set matches Python's: SUCCEEDED, FAILED, CANCELLED, STOPPED. Operation changes: - DurableOperation.ExecuteAsync calls TrackReplay(OperationId) at the top, so every operation participates in visit accounting without each subclass needing to remember. - StepOperation/WaitOperation drop their manual EnterExecutionMode calls. Tests: - ExecutionStateTests rewritten around IsReplaying/TrackReplay, including pinning regressions: only-EXECUTION-op ⇒ NotReplaying, all-visited ⇒ flips out of replay, PENDING ops do not block transition, idempotency. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9e5113d commit 5cdab8d

5 files changed

Lines changed: 195 additions & 90 deletions

File tree

Libraries/src/Amazon.Lambda.DurableExecution/Internal/DurableOperation.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ public Task<TResult> ExecuteAsync(CancellationToken cancellationToken)
4444
{
4545
State.ValidateReplayConsistency(OperationId, OperationType, Name);
4646

47+
// Record that the workflow has reached this op. If every completed
48+
// checkpointed op has now been visited, the state flips out of replay.
49+
State.TrackReplay(OperationId);
50+
4751
var existing = State.GetOperation(OperationId);
4852
return existing == null
4953
? StartAsync(cancellationToken)
Lines changed: 73 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,50 @@
11
namespace Amazon.Lambda.DurableExecution.Internal;
22

33
/// <summary>
4-
/// Replay state of the current invocation.
5-
/// </summary>
6-
internal enum ExecutionMode
7-
{
8-
/// <summary>Re-deriving prior operations from checkpointed state.</summary>
9-
Replay,
10-
/// <summary>Executing fresh code that hasn't been checkpointed before.</summary>
11-
Execution
12-
}
13-
14-
/// <summary>
15-
/// In-memory store of the operations replayed from <see cref="InitialExecutionState"/>.
16-
/// Read-only after load (apart from <see cref="EnterExecutionMode"/>); outbound
17-
/// checkpoints are owned by <see cref="CheckpointBatcher"/>.
4+
/// In-memory store of the operations replayed from <see cref="InitialExecutionState"/>
5+
/// plus replay-mode tracking. Outbound checkpoints are owned by
6+
/// <see cref="CheckpointBatcher"/>; this type is the inbound side only.
187
/// </summary>
8+
/// <remarks>
9+
/// Replay tracking mirrors the Python / Java / JavaScript reference SDKs:
10+
/// <list type="bullet">
11+
/// <item>At construction the workflow is "replaying" iff any user-replayable
12+
/// op is present. The service always sends one <c>EXECUTION</c>-type op
13+
/// carrying the input payload — that's bookkeeping, not user history,
14+
/// so it doesn't count.</item>
15+
/// <item><see cref="TrackReplay"/> is called by every <c>DurableOperation.ExecuteAsync</c>
16+
/// at the top of the call. Once every checkpointed completed
17+
/// non-<c>EXECUTION</c> op has been visited, the workflow has caught up
18+
/// to the replay frontier and <see cref="IsReplaying"/> flips to <c>false</c>
19+
/// for the rest of the invocation.</item>
20+
/// </list>
21+
/// </remarks>
1922
internal sealed class ExecutionState
2023
{
2124
private readonly Dictionary<string, Operation> _operations = new();
22-
23-
public ExecutionMode Mode { get; private set; } = ExecutionMode.Replay;
25+
private readonly HashSet<string> _visitedOperations = new();
26+
private bool _isReplaying;
2427

2528
public int CheckpointedOperationCount => _operations.Count;
2629

30+
/// <summary>
31+
/// True when the workflow is re-deriving prior operations from checkpointed
32+
/// state. False when running fresh (not-yet-checkpointed) code.
33+
/// </summary>
34+
public bool IsReplaying => _isReplaying;
35+
2736
public void LoadFromCheckpoint(InitialExecutionState? initialState)
2837
{
29-
if (initialState?.Operations == null)
38+
if (initialState?.Operations != null)
3039
{
31-
Mode = ExecutionMode.Execution;
32-
return;
40+
AddOperations(initialState.Operations);
3341
}
3442

35-
AddOperations(initialState.Operations);
36-
37-
if (_operations.Count == 0)
38-
{
39-
Mode = ExecutionMode.Execution;
40-
}
43+
// Only user-replayable ops put us into replay mode. The service-side
44+
// EXECUTION op (input payload bookkeeping) is always present and must
45+
// not count — see Python execution.py:258 / Java ExecutionManager:81 /
46+
// JS execution-context.ts:62 for the same rule.
47+
_isReplaying = HasReplayableOperations();
4148
}
4249

4350
public void AddOperations(IEnumerable<Operation> operations)
@@ -60,9 +67,36 @@ public void AddOperations(IEnumerable<Operation> operations)
6067
return op;
6168
}
6269

70+
public bool HasOperation(string operationId) => _operations.ContainsKey(operationId);
71+
72+
/// <summary>
73+
/// Records that the workflow has reached <paramref name="operationId"/>.
74+
/// Once every checkpointed completed non-<c>EXECUTION</c> op has been
75+
/// visited the workflow has caught up to the replay frontier and
76+
/// <see cref="IsReplaying"/> flips to false. Idempotent: calling more than
77+
/// once with the same id has no additional effect.
78+
/// </summary>
79+
public void TrackReplay(string operationId)
80+
{
81+
if (!_isReplaying) return;
82+
83+
_visitedOperations.Add(operationId);
84+
85+
// Have we visited every completed non-EXECUTION op? If so, anything
86+
// emitted from here on is fresh execution.
87+
foreach (var op in _operations.Values)
88+
{
89+
if (op.Type == OperationTypes.Execution) continue;
90+
if (!IsTerminalStatus(op.Status)) continue;
91+
if (!_visitedOperations.Contains(op.Id!)) return;
92+
}
93+
94+
_isReplaying = false;
95+
}
96+
6397
public void ValidateReplayConsistency(string operationId, string expectedType, string? expectedName)
6498
{
65-
if (Mode != ExecutionMode.Replay) return;
99+
if (!_isReplaying) return;
66100

67101
if (!_operations.TryGetValue(operationId, out var op)) return;
68102

@@ -83,11 +117,18 @@ public void ValidateReplayConsistency(string operationId, string expectedType, s
83117
}
84118
}
85119

86-
public bool HasOperation(string operationId) => _operations.ContainsKey(operationId);
120+
private bool HasReplayableOperations()
121+
{
122+
foreach (var op in _operations.Values)
123+
{
124+
if (op.Type != OperationTypes.Execution) return true;
125+
}
126+
return false;
127+
}
87128

88-
/// <summary>
89-
/// Transitions to <see cref="ExecutionMode.Execution"/>. Called by an operation
90-
/// that's about to run fresh (not-yet-checkpointed) code. Idempotent.
91-
/// </summary>
92-
public void EnterExecutionMode() => Mode = ExecutionMode.Execution;
129+
private static bool IsTerminalStatus(string? status) =>
130+
status == OperationStatuses.Succeeded
131+
|| status == OperationStatuses.Failed
132+
|| status == OperationStatuses.Cancelled
133+
|| status == OperationStatuses.Stopped;
93134
}

Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@ public StepOperation(
5050
protected override string OperationType => OperationTypes.Step;
5151

5252
protected override Task<T> StartAsync(CancellationToken cancellationToken)
53-
{
54-
State.EnterExecutionMode();
55-
return ExecuteFunc(cancellationToken);
56-
}
53+
=> ExecuteFunc(cancellationToken);
5754

5855
protected override Task<T> ReplayAsync(Operation existing, CancellationToken cancellationToken)
5956
{
@@ -73,7 +70,6 @@ protected override Task<T> ReplayAsync(Operation existing, CancellationToken can
7370
// STARTED/READY/PENDING from a prior invocation — no retry logic
7471
// in this commit, so fall through and execute fresh. (Future work
7572
// on retries will replace this default with explicit arms.)
76-
State.EnterExecutionMode();
7773
return ExecuteFunc(cancellationToken);
7874
}
7975
}

Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitOperation.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ public WaitOperation(
4141

4242
protected override async Task<object?> StartAsync(CancellationToken cancellationToken)
4343
{
44-
State.EnterExecutionMode();
45-
4644
// Sync-flush WAIT START before suspending — the service can't schedule
4745
// a timer for a checkpoint it hasn't received.
4846
await EnqueueAsync(new SdkOperationUpdate

0 commit comments

Comments
 (0)