Skip to content

Commit be04613

Browse files
committed
create OperationProcessor
1 parent f499a35 commit be04613

4 files changed

Lines changed: 289 additions & 155 deletions

File tree

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/EventProcessor.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,49 @@ void processUpdate(OperationUpdate update, Operation operation) {
3636
allEvents.add(event);
3737
}
3838

39+
// process new status of an operation without an OperationUpdate
40+
void processUpdate(Operation updatedOperation) {
41+
var builder = Event.builder()
42+
.eventId(eventId.getAndIncrement())
43+
.eventTimestamp(Instant.now())
44+
.id(updatedOperation.id())
45+
.name(updatedOperation.name());
46+
// support the statuses that don't have a corresponding OperationAction
47+
switch (updatedOperation.status()) {
48+
case STARTED -> {
49+
// used by resetCheckpointToStarted
50+
return;
51+
}
52+
case READY -> {
53+
if (updatedOperation.type() == OperationType.STEP) {
54+
// no event type for this case
55+
return;
56+
} else {
57+
throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type());
58+
}
59+
}
60+
case TIMED_OUT -> {
61+
switch (updatedOperation.type()) {
62+
case EXECUTION -> builder.eventType(EXECUTION_TIMED_OUT);
63+
case CHAINED_INVOKE -> builder.eventType(CHAINED_INVOKE_TIMED_OUT);
64+
case CALLBACK -> builder.eventType(CALLBACK_TIMED_OUT);
65+
default ->
66+
throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type());
67+
}
68+
}
69+
case STOPPED -> {
70+
switch (updatedOperation.type()) {
71+
case EXECUTION -> builder.eventType(EXECUTION_STOPPED);
72+
case CHAINED_INVOKE -> builder.eventType(CHAINED_INVOKE_STOPPED);
73+
default ->
74+
throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type());
75+
}
76+
}
77+
default -> throw new IllegalArgumentException("Unsupported operation status: " + updatedOperation.status());
78+
}
79+
allEvents.add(builder.build());
80+
}
81+
3982
List<Event> getAllEvents() {
4083
return List.copyOf(allEvents);
4184
}

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java

Lines changed: 66 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package software.amazon.lambda.durable.testing.local;
44

5-
import java.time.Instant;
65
import java.util.Collections;
76
import java.util.HashMap;
87
import java.util.LinkedHashMap;
@@ -11,7 +10,14 @@
1110
import java.util.UUID;
1211
import java.util.concurrent.CopyOnWriteArrayList;
1312
import java.util.concurrent.atomic.AtomicBoolean;
14-
import software.amazon.awssdk.services.lambda.model.*;
13+
import software.amazon.awssdk.services.lambda.model.CheckpointDurableExecutionResponse;
14+
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
15+
import software.amazon.awssdk.services.lambda.model.GetDurableExecutionStateResponse;
16+
import software.amazon.awssdk.services.lambda.model.Operation;
17+
import software.amazon.awssdk.services.lambda.model.OperationAction;
18+
import software.amazon.awssdk.services.lambda.model.OperationStatus;
19+
import software.amazon.awssdk.services.lambda.model.OperationType;
20+
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
1521
import software.amazon.lambda.durable.TypeToken;
1622
import software.amazon.lambda.durable.client.DurableExecutionClient;
1723
import software.amazon.lambda.durable.model.DurableExecutionOutput;
@@ -72,27 +78,14 @@ public boolean advanceTime() {
7278
var hasOperationsAdvanced = new AtomicBoolean(false);
7379
// forEach is safe as we're not adding or removing keys here
7480
existingOperations.forEach((key, op) -> {
75-
// advance pending retries
76-
if (op.status() == OperationStatus.PENDING) {
81+
if (op.type() == OperationType.STEP && op.status() == OperationStatus.PENDING) {
82+
applyResult(op, OperationResult.ready());
7783
hasOperationsAdvanced.set(true);
78-
var readyOp = op.toBuilder().status(OperationStatus.READY).build();
79-
updateOperation(readyOp);
8084
}
8185

82-
// advance waits
83-
if (op.status() == OperationStatus.STARTED && op.type() == OperationType.WAIT) {
84-
var succeededOp =
85-
op.toBuilder().status(OperationStatus.SUCCEEDED).build();
86-
// Generate WaitSucceeded event
87-
var update = OperationUpdate.builder()
88-
.id(op.id())
89-
.name(op.name())
90-
.type(OperationType.WAIT)
91-
.action(OperationAction.SUCCEED)
92-
.build();
93-
eventProcessor.processUpdate(update, succeededOp);
86+
if (op.type() == OperationType.WAIT && op.status() == OperationStatus.STARTED) {
87+
applyResult(op, OperationResult.succeeded(null));
9488
hasOperationsAdvanced.set(true);
95-
updateOperation(succeededOp);
9689
}
9790
});
9891
return hasOperationsAdvanced.get();
@@ -104,28 +97,10 @@ public void completeChainedInvoke(String name, OperationResult result) {
10497
if (op == null) {
10598
throw new IllegalStateException("Operation not found: " + name);
10699
}
107-
if (op.type() == OperationType.CHAINED_INVOKE
108-
&& op.status() == OperationStatus.STARTED
109-
&& op.name().equals(name)) {
110-
var newOp = op.toBuilder()
111-
.status(result.operationStatus())
112-
.chainedInvokeDetails(ChainedInvokeDetails.builder()
113-
.result(result.result())
114-
.error(result.error())
115-
.build())
116-
.build();
117-
var update = OperationUpdate.builder()
118-
.id(op.id())
119-
.name(op.name())
120-
.type(OperationType.CHAINED_INVOKE)
121-
.action(
122-
result.operationStatus() == OperationStatus.SUCCEEDED
123-
? OperationAction.SUCCEED
124-
: OperationAction.FAIL)
125-
.build();
126-
eventProcessor.processUpdate(update, newOp);
127-
updateOperation(newOp);
100+
if (op.type() != OperationType.CHAINED_INVOKE || op.status() != OperationStatus.STARTED) {
101+
throw new IllegalStateException("Operation is not a CHAINED_INVOKE or not in STARTED state");
128102
}
103+
applyResult(op, result);
129104
}
130105

131106
/** Returns the operation with the given name, or null if not found. */
@@ -164,7 +139,7 @@ public void resetCheckpointToStarted(String stepName) {
164139
throw new IllegalStateException("Operation not found: " + stepName);
165140
}
166141
var startedOp = op.toBuilder().status(OperationStatus.STARTED).build();
167-
updateOperation(startedOp);
142+
updateOperation(null, startedOp);
168143
}
169144

170145
/** Simulate fire-and-forget checkpoint loss by removing the operation entirely */
@@ -180,101 +155,9 @@ public void simulateFireAndForgetCheckpointLoss(String stepName) {
180155
}
181156

182157
private void applyUpdate(OperationUpdate update) {
183-
var operation = toOperation(update);
184-
updateOperation(operation);
185-
186-
eventProcessor.processUpdate(update, operation);
187-
}
188-
189-
private Operation toOperation(OperationUpdate update) {
190-
var builder = Operation.builder()
191-
.id(update.id())
192-
.name(update.name())
193-
.type(update.type())
194-
.subType(update.subType())
195-
.parentId(update.parentId())
196-
.status(deriveStatus(update.action()));
197-
198-
switch (update.type()) {
199-
case WAIT -> builder.waitDetails(buildWaitDetails(update));
200-
case STEP -> builder.stepDetails(buildStepDetails(update));
201-
case CALLBACK -> builder.callbackDetails(buildCallbackDetails(update));
202-
case EXECUTION -> {} // No details needed for EXECUTION operations
203-
case CHAINED_INVOKE -> builder.chainedInvokeDetails(buildChainedInvokeDetails(update));
204-
case CONTEXT -> builder.contextDetails(buildContextDetails(update));
205-
case UNKNOWN_TO_SDK_VERSION ->
206-
throw new UnsupportedOperationException("UNKNOWN_TO_SDK_VERSION not supported");
207-
}
208-
209-
return builder.build();
210-
}
211-
212-
private ChainedInvokeDetails buildChainedInvokeDetails(OperationUpdate update) {
213-
if (update.chainedInvokeOptions() == null) {
214-
return null;
215-
}
216-
return ChainedInvokeDetails.builder()
217-
.result(update.payload())
218-
.error(update.error())
219-
.build();
220-
}
221-
222-
private ContextDetails buildContextDetails(OperationUpdate update) {
223-
var detailsBuilder = ContextDetails.builder().result(update.payload()).error(update.error());
224-
225-
if (update.contextOptions() != null
226-
&& Boolean.TRUE.equals(update.contextOptions().replayChildren())) {
227-
detailsBuilder.replayChildren(true);
228-
}
229-
230-
return detailsBuilder.build();
231-
}
232-
233-
private WaitDetails buildWaitDetails(OperationUpdate update) {
234-
if (update.waitOptions() == null) return null;
235-
236-
var scheduledEnd = Instant.now().plusSeconds(update.waitOptions().waitSeconds());
237-
return WaitDetails.builder().scheduledEndTimestamp(scheduledEnd).build();
238-
}
239-
240-
private StepDetails buildStepDetails(OperationUpdate update) {
241158
var existingOp = existingOperations.get(update.id());
242-
var existing = existingOp != null ? existingOp.stepDetails() : null;
243-
244-
var detailsBuilder = existing != null ? existing.toBuilder() : StepDetails.builder();
245-
var attempt = existing != null && existing.attempt() != null ? existing.attempt() + 1 : 1;
246-
247-
if (update.action() == OperationAction.FAIL) {
248-
detailsBuilder.attempt(attempt).error(update.error());
249-
}
250-
251-
if (update.action() == OperationAction.RETRY) {
252-
detailsBuilder
253-
.attempt(attempt)
254-
.error(update.error())
255-
.nextAttemptTimestamp(
256-
Instant.now().plusSeconds(update.stepOptions().nextAttemptDelaySeconds()));
257-
}
258-
259-
if (update.payload() != null) {
260-
detailsBuilder.result(update.payload());
261-
}
262-
263-
return detailsBuilder.build();
264-
}
265-
266-
private CallbackDetails buildCallbackDetails(OperationUpdate update) {
267-
var existingOp = existingOperations.get(update.id());
268-
var existing = existingOp != null ? existingOp.callbackDetails() : null;
269-
270-
// Preserve existing callbackId, or generate new one on START
271-
var callbackId =
272-
existing != null ? existing.callbackId() : UUID.randomUUID().toString();
273-
274-
return CallbackDetails.builder()
275-
.callbackId(callbackId)
276-
.result(existing != null ? existing.result() : null)
277-
.build();
159+
var updatedOp = OperationProcessor.applyUpdate(update, existingOp);
160+
updateOperation(update, updatedOp);
278161
}
279162

280163
/** Get callback ID for a named callback operation. */
@@ -292,14 +175,47 @@ public void completeCallback(String callbackId, OperationResult result) {
292175
if (op == null) {
293176
throw new IllegalStateException("Callback not found: " + callbackId);
294177
}
295-
var updated = op.toBuilder()
296-
.status(result.operationStatus())
297-
.callbackDetails(op.callbackDetails().toBuilder()
298-
.result(result.result())
299-
.error(result.error())
300-
.build())
301-
.build();
302-
updateOperation(updated);
178+
if (op.type() != OperationType.CALLBACK || op.status() != OperationStatus.STARTED) {
179+
throw new IllegalStateException("Operation is not a CALLBACK or not in STARTED state");
180+
}
181+
182+
applyResult(op, result);
183+
}
184+
185+
private void applyResult(Operation op, OperationResult result) {
186+
// derive a possible action from the target status
187+
OperationAction action = deriveAction(result.operationStatus());
188+
if (action != null) {
189+
var update = OperationUpdate.builder()
190+
.id(op.id())
191+
.name(op.name())
192+
.type(op.type())
193+
.action(action)
194+
.parentId(op.parentId())
195+
.payload(result.result())
196+
.error(result.error())
197+
.build();
198+
applyUpdate(update);
199+
} else if (result.operationStatus() == OperationStatus.TIMED_OUT
200+
|| result.operationStatus() == OperationStatus.STOPPED
201+
|| result.operationStatus() == OperationStatus.READY) {
202+
var newOp = OperationProcessor.applyResult(op, result);
203+
updateOperation(null, newOp);
204+
} else {
205+
throw new IllegalStateException("Unsupported OperationStatus in result: " + result.operationStatus());
206+
}
207+
}
208+
209+
private static OperationAction deriveAction(OperationStatus status) {
210+
return switch (status) {
211+
case STARTED -> OperationAction.START;
212+
case SUCCEEDED -> OperationAction.SUCCEED;
213+
case FAILED -> OperationAction.FAIL;
214+
case PENDING -> OperationAction.RETRY;
215+
case CANCELLED -> OperationAction.CANCEL;
216+
case READY, TIMED_OUT, STOPPED -> null; // no action for these operation statuses
217+
case UNKNOWN_TO_SDK_VERSION -> OperationAction.UNKNOWN_TO_SDK_VERSION; // Todo: Check this
218+
};
303219
}
304220

305221
private Operation findOperationByCallbackId(String callbackId) {
@@ -310,18 +226,13 @@ private Operation findOperationByCallbackId(String callbackId) {
310226
.orElse(null);
311227
}
312228

313-
private OperationStatus deriveStatus(OperationAction action) {
314-
return switch (action) {
315-
case START -> OperationStatus.STARTED;
316-
case SUCCEED -> OperationStatus.SUCCEEDED;
317-
case FAIL -> OperationStatus.FAILED;
318-
case RETRY -> OperationStatus.PENDING;
319-
case CANCEL -> OperationStatus.CANCELLED;
320-
case UNKNOWN_TO_SDK_VERSION -> OperationStatus.UNKNOWN_TO_SDK_VERSION; // Todo: Check this
321-
};
322-
}
323-
324-
private void updateOperation(Operation op) {
229+
private void updateOperation(OperationUpdate update, Operation op) {
230+
// update can be null when an operation is updated without an OperationUpdate
231+
if (update == null) {
232+
eventProcessor.processUpdate(op);
233+
} else {
234+
eventProcessor.processUpdate(update, op);
235+
}
325236
existingOperations.put(op.id(), op);
326237
synchronized (updatedOperations) {
327238
updatedOperations.put(op.id(), op);

0 commit comments

Comments
 (0)