Skip to content

Commit 12fc784

Browse files
committed
feat(sdk): Add UpdatedOperationIds support for replay status tracking
1 parent 118865a commit 12fc784

8 files changed

Lines changed: 229 additions & 2 deletions

File tree

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,50 @@ void waitAndResume_producesSpansAcrossInvocations() {
158158
assertEquals(2, invocationSpans.size(), "Should have 2 invocation spans (one per run)");
159159
}
160160

161+
@Test
162+
void waitCompletedDuringSuspension_producesOperationSpan() {
163+
var runner = LocalDurableTestRunner.create(
164+
String.class,
165+
(input, ctx) -> {
166+
ctx.step("before-wait", String.class, stepCtx -> "pre");
167+
ctx.wait("pause", Duration.ofMinutes(1));
168+
ctx.step("after-wait", String.class, stepCtx -> "post");
169+
return "done";
170+
},
171+
otelConfig);
172+
173+
// First invocation: step completes, wait starts, suspend
174+
var result1 = runner.run("input");
175+
assertEquals(ExecutionStatus.PENDING, result1.getStatus());
176+
177+
// The wait operation span should be open (ended with PENDING status at invocation end)
178+
var spansAfterFirst = spanExporter.getFinishedSpanItems();
179+
var waitSpansAfterFirst = spansAfterFirst.stream()
180+
.filter(s -> s.getName().equals("durable.wait:pause"))
181+
.toList();
182+
assertEquals(1, waitSpansAfterFirst.size(), "Wait span should exist after first invocation (ended as PENDING)");
183+
184+
// Advance time (wait completes externally) and resume
185+
runner.advanceTime();
186+
var result2 = runner.run("input");
187+
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
188+
189+
// After second invocation, the wait should have a second operation span
190+
// (fired by onOperationEnd via updatedOperationIds) showing it completed
191+
var allSpans = spanExporter.getFinishedSpanItems();
192+
var waitSpansTotal = allSpans.stream()
193+
.filter(s -> s.getName().equals("durable.wait:pause"))
194+
.toList();
195+
assertEquals(
196+
2,
197+
waitSpansTotal.size(),
198+
"Wait should have 2 spans: one PENDING from first invocation, one completed from second. Got: "
199+
+ allSpans.stream()
200+
.map(SpanData::getName)
201+
.filter(n -> n.contains("pause"))
202+
.toList());
203+
}
204+
161205
@Test
162206
void childContext_producesNestedSpans() {
163207
var runner = LocalDurableTestRunner.create(

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/PluginIntegrationTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,66 @@ void plugin_operationEnd_notFiredOnReplay() {
175175
assertEquals(1, step1EndCount, "step1 onOperationEnd should fire only once (not on replay)");
176176
}
177177

178+
@Test
179+
void plugin_operationEnd_firedForOperationCompletedDuringSuspension() {
180+
var plugin = new RecordingPlugin();
181+
var config = DurableConfig.builder().withPlugins(plugin).build();
182+
183+
var runner = LocalDurableTestRunner.create(
184+
String.class,
185+
(input, context) -> {
186+
context.step("step1", String.class, stepCtx -> "done");
187+
context.wait("pause", Duration.ofMinutes(1));
188+
context.step("step2", String.class, stepCtx -> "final");
189+
return "complete";
190+
},
191+
config);
192+
193+
// First invocation: step1 completes, then suspends at wait
194+
var result1 = runner.run("input");
195+
assertEquals(ExecutionStatus.PENDING, result1.getStatus());
196+
197+
// Advance time (wait completes externally while Lambda was frozen)
198+
runner.advanceTime();
199+
200+
// Clear plugin state to only track second invocation
201+
plugin.operationEnds.clear();
202+
plugin.operationStarts.clear();
203+
204+
var result2 = runner.run("input");
205+
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
206+
207+
// onOperationEnd for "pause" should fire during the second invocation
208+
// because the wait completed during suspension (it's in updatedOperationIds)
209+
long pauseEndCount = plugin.operationEnds.stream()
210+
.filter(info -> "pause".equals(info.name()))
211+
.count();
212+
assertEquals(1, pauseEndCount, "wait operation onOperationEnd should fire when it completes during suspension");
213+
}
214+
215+
@Test
216+
void plugin_operationEnd_firedOnceForStepCompletingInCurrentInvocation() {
217+
var plugin = new RecordingPlugin();
218+
var config = DurableConfig.builder().withPlugins(plugin).build();
219+
220+
var runner = LocalDurableTestRunner.create(
221+
String.class,
222+
(input, context) -> {
223+
context.step("step1", String.class, stepCtx -> "done");
224+
return "complete";
225+
},
226+
config);
227+
228+
// Single invocation: step1 completes within this invocation
229+
var result = runner.runUntilComplete("input");
230+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
231+
232+
long step1EndCount = plugin.operationEnds.stream()
233+
.filter(info -> "step1".equals(info.name()))
234+
.count();
235+
assertEquals(1, step1EndCount, "step1 onOperationEnd should fire exactly once");
236+
}
237+
178238
// ─── User function hooks ─────────────────────────────────────────────
179239

180240
@Test

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,10 +346,15 @@ private DurableExecutionInput createDurableInput(I input) {
346346
var allOps = new ArrayList<>(List.of(executionOp));
347347
allOps.addAll(existingOps);
348348

349+
// Compute updatedOperationIds: all operations that were updated since the last invocation.
350+
// In the test runner, this is all operations that have been modified by advanceTime/complete calls.
351+
var updatedOperationIds = storage.getUpdatedOperationIdsSinceLastInvocation();
352+
349353
return new DurableExecutionInput(
350354
executionArn,
351355
UUID.randomUUID().toString(),
352-
CheckpointUpdatedExecutionState.builder().operations(allOps).build());
356+
CheckpointUpdatedExecutionState.builder().operations(allOps).build(),
357+
updatedOperationIds);
353358
}
354359

355360
private Context mockLambdaContext() {

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
import java.util.Collections;
66
import java.util.HashMap;
7+
import java.util.HashSet;
78
import java.util.LinkedHashMap;
89
import java.util.List;
910
import java.util.Map;
11+
import java.util.Set;
1012
import java.util.UUID;
1113
import java.util.concurrent.CopyOnWriteArrayList;
1214
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,11 +37,15 @@ public class LocalMemoryExecutionClient implements DurableExecutionClient {
3537
private final EventProcessor eventProcessor = new EventProcessor();
3638
private final List<OperationUpdate> operationUpdates = new CopyOnWriteArrayList<>();
3739
private final Map<String, Operation> updatedOperations = new HashMap<>();
40+
private final Set<String> operationIdsUpdatedSinceLastInvocation = Collections.synchronizedSet(new HashSet<>());
41+
private volatile boolean withinCheckpoint = false;
3842

3943
@Override
4044
public CheckpointDurableExecutionResponse checkpoint(String arn, String token, List<OperationUpdate> updates) {
45+
withinCheckpoint = true;
4146
operationUpdates.addAll(updates);
4247
updates.forEach(this::applyUpdate);
48+
withinCheckpoint = false;
4349

4450
var newToken = UUID.randomUUID().toString();
4551

@@ -116,6 +122,16 @@ public List<Operation> getAllOperations() {
116122
return existingOperations.values().stream().toList();
117123
}
118124

125+
/**
126+
* Returns the list of operation IDs that have been updated since the last invocation, then clears the tracking set.
127+
* This simulates the backend's {@code UpdatedOperationIds} field behavior.
128+
*/
129+
public List<String> getUpdatedOperationIdsSinceLastInvocation() {
130+
var ids = List.copyOf(operationIdsUpdatedSinceLastInvocation);
131+
operationIdsUpdatedSinceLastInvocation.clear();
132+
return ids;
133+
}
134+
119135
/** Build TestResult from current state. */
120136
public <O> TestResult<O> toTestResult(DurableExecutionOutput output, TypeToken<O> resultType, SerDes serDes) {
121137
var testOperations = existingOperations.values().stream()
@@ -238,5 +254,11 @@ private void updateOperation(OperationUpdate update, Operation op) {
238254
synchronized (updatedOperations) {
239255
updatedOperations.put(op.id(), op);
240256
}
257+
// Only track operations updated outside of a checkpoint call (i.e., between invocations)
258+
// for the updatedOperationIds field. Checkpoint updates happen during an invocation and
259+
// are already visible to the SDK via the checkpoint response (channel 2).
260+
if (!withinCheckpoint) {
261+
operationIdsUpdatedSinceLastInvocation.add(op.id());
262+
}
241263
}
242264
}

sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class ExecutionManager implements AutoCloseable {
5757
private final String durableExecutionArn;
5858
private final AtomicReference<ExecutionMode> executionMode;
5959
private final DurableConfig durableConfig;
60+
private final Set<String> updatedOperationIds;
6061

6162
// ===== Thread Coordination =====
6263
private final Map<String, BaseDurableOperation> registeredOperations = new ConcurrentHashMap<>();
@@ -71,6 +72,10 @@ public ExecutionManager(DurableExecutionInput input, DurableConfig config) {
7172
durableConfig = config;
7273
this.durableExecutionArn = input.durableExecutionArn();
7374

75+
// Store the set of operation IDs updated since the last successful invocation
76+
this.updatedOperationIds =
77+
input.updatedOperationIds() != null ? Set.copyOf(input.updatedOperationIds()) : Collections.emptySet();
78+
7479
// Create checkpoint batcher for internal coordination
7580
this.checkpointManager =
7681
new CheckpointManager(config, durableExecutionArn, input.checkpointToken(), this::onCheckpointComplete);
@@ -109,6 +114,18 @@ public boolean isReplaying() {
109114
return executionMode.get() == ExecutionMode.REPLAY;
110115
}
111116

117+
/**
118+
* Returns {@code true} if the given operation was updated since the last successful invocation. This is used by the
119+
* OTel plugin to determine whether a replayed completed operation should emit a span — only operations that
120+
* transitioned during suspension should be traced on reinvocation.
121+
*
122+
* @param operationId the operation ID to check
123+
* @return true if the operation was updated since the last successful invocation
124+
*/
125+
public boolean isOperationUpdatedSinceLastInvocation(String operationId) {
126+
return updatedOperationIds.contains(operationId);
127+
}
128+
112129
/** Registers an operation so it can receive checkpoint completion notifications. */
113130
public void registerOperation(BaseDurableOperation operation) {
114131
registeredOperations.put(operation.getOperationId(), operation);

sdk/src/main/java/software/amazon/lambda/durable/model/DurableExecutionInput.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package software.amazon.lambda.durable.model;
44

5+
import java.util.List;
56
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
67

78
/**
@@ -10,6 +11,21 @@
1011
* @param durableExecutionArn ARN identifying this durable execution
1112
* @param checkpointToken token used to authenticate checkpoint API calls
1213
* @param initialExecutionState snapshot of operations already completed in previous invocations
14+
* @param updatedOperationIds IDs of operations that changed since the previous successful invocation; empty list if
15+
* nothing changed
1316
*/
1417
public record DurableExecutionInput(
15-
String durableExecutionArn, String checkpointToken, CheckpointUpdatedExecutionState initialExecutionState) {}
18+
String durableExecutionArn,
19+
String checkpointToken,
20+
CheckpointUpdatedExecutionState initialExecutionState,
21+
List<String> updatedOperationIds) {
22+
23+
/**
24+
* Constructor that defaults updatedOperationIds to empty list. Used by tests that don't need to supply updated
25+
* operation IDs.
26+
*/
27+
public DurableExecutionInput(
28+
String durableExecutionArn, String checkpointToken, CheckpointUpdatedExecutionState initialExecutionState) {
29+
this(durableExecutionArn, checkpointToken, initialExecutionState, List.of());
30+
}
31+
}

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,14 @@ public void execute() {
136136
}
137137
// Fire onOperationStart plugin hook (including replay)
138138
fireOnOperationStart(existing);
139+
140+
// Fire onOperationEnd for operations that completed during suspension (between invocations).
141+
// This enables the OTel plugin to emit spans for operations that transitioned while Lambda was frozen.
142+
if (replayCompletedOperation.get()
143+
&& executionManager.isOperationUpdatedSinceLastInvocation(getOperationId())) {
144+
fireOnOperationEnd(existing, null);
145+
}
146+
139147
replay(existing);
140148
} else {
141149
if (durableContext.isReplaying()) {

sdk/src/test/java/software/amazon/lambda/durable/execution/ExecutionManagerTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,59 @@ void emptyInitialState() {
139139
assertNotNull(executionManager.getExecutionOperation());
140140
assertEquals(EXECUTION_OP_ID, executionManager.getExecutionOperation().id());
141141
}
142+
143+
// ─── UpdatedOperationIds tests ───────────────────────────────────────
144+
145+
@Test
146+
void isOperationUpdatedSinceLastInvocation_returnsFalse_whenEmptyList() {
147+
// Default 3-arg constructor uses empty list
148+
var manager = createManager(List.of(executionOp(), stepOp("1", OperationStatus.SUCCEEDED)));
149+
150+
assertFalse(manager.isOperationUpdatedSinceLastInvocation("1"));
151+
}
152+
153+
@Test
154+
void isOperationUpdatedSinceLastInvocation_returnsTrue_whenOperationIsInList() {
155+
client = TestUtils.createMockClient();
156+
var initialState = CheckpointUpdatedExecutionState.builder()
157+
.operations(List.of(executionOp(), stepOp("1", OperationStatus.SUCCEEDED)))
158+
.build();
159+
var input = new DurableExecutionInput(EXECUTION_ARN, "test-token", initialState, List.of("1"));
160+
var manager = new ExecutionManager(
161+
input,
162+
DurableConfig.builder().withDurableExecutionClient(client).build());
163+
164+
assertTrue(manager.isOperationUpdatedSinceLastInvocation("1"));
165+
}
166+
167+
@Test
168+
void isOperationUpdatedSinceLastInvocation_returnsFalse_whenOperationNotInList() {
169+
client = TestUtils.createMockClient();
170+
var initialState = CheckpointUpdatedExecutionState.builder()
171+
.operations(List.of(executionOp(), stepOp("1", OperationStatus.SUCCEEDED)))
172+
.build();
173+
var input = new DurableExecutionInput(EXECUTION_ARN, "test-token", initialState, List.of("2"));
174+
var manager = new ExecutionManager(
175+
input,
176+
DurableConfig.builder().withDurableExecutionClient(client).build());
177+
178+
assertFalse(manager.isOperationUpdatedSinceLastInvocation("1"));
179+
}
180+
181+
@Test
182+
void isOperationUpdatedSinceLastInvocation_handlesMultipleIds() {
183+
client = TestUtils.createMockClient();
184+
var initialState = CheckpointUpdatedExecutionState.builder()
185+
.operations(List.of(executionOp(), stepOp("1", OperationStatus.SUCCEEDED)))
186+
.build();
187+
var input = new DurableExecutionInput(EXECUTION_ARN, "test-token", initialState, List.of("1", "2", "3"));
188+
var manager = new ExecutionManager(
189+
input,
190+
DurableConfig.builder().withDurableExecutionClient(client).build());
191+
192+
assertTrue(manager.isOperationUpdatedSinceLastInvocation("1"));
193+
assertTrue(manager.isOperationUpdatedSinceLastInvocation("2"));
194+
assertTrue(manager.isOperationUpdatedSinceLastInvocation("3"));
195+
assertFalse(manager.isOperationUpdatedSinceLastInvocation("4"));
196+
}
142197
}

0 commit comments

Comments
 (0)