Skip to content

Commit 4bbf964

Browse files
authored
[testing]: make local runner checkpoint API return only updated operations (#321)
* make local runner checkpoint API return only updated operations * update ParallelIntegrationTest * add a lock around updatedOperations to prevent race conditions * create OperationProcessor
1 parent 74b793a commit 4bbf964

6 files changed

Lines changed: 367 additions & 226 deletions

File tree

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/ParallelIntegrationTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void testParallelPartialFailure_failedBranchDoesNotPreventOthers() {
7676
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
7777
var config = ParallelConfig.builder().build();
7878
var futures = new ArrayList<DurableFuture<String>>();
79-
var parallel = context.parallel("partial-fail", config);
79+
ParallelDurableFuture parallel = context.parallel("partial-fail", config);
8080

8181
try (parallel) {
8282
futures.add(parallel.branch("branch-a", String.class, ctx -> "A"));
@@ -322,15 +322,15 @@ void testStepBeforeAndAfterParallel() {
322322
void testSequentialParallelBlocks() {
323323
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
324324
var futures1 = new ArrayList<DurableFuture<String>>();
325-
var parallel1 =
325+
ParallelDurableFuture parallel1 =
326326
context.parallel("parallel-1", ParallelConfig.builder().build());
327327
try (parallel1) {
328328
futures1.add(parallel1.branch("branch-a", String.class, ctx -> "A"));
329329
futures1.add(parallel1.branch("branch-b", String.class, ctx -> "B"));
330330
}
331331

332332
var futures2 = new ArrayList<DurableFuture<String>>();
333-
var parallel2 =
333+
ParallelDurableFuture parallel2 =
334334
context.parallel("parallel-2", ParallelConfig.builder().build());
335335
try (parallel2) {
336336
futures2.add(parallel2.branch("branch-x", String.class, ctx -> "x!"));
@@ -354,7 +354,7 @@ void testParallelReplayWithFailedBranches() {
354354
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
355355
var config = ParallelConfig.builder().build();
356356
var futures = new ArrayList<DurableFuture<String>>();
357-
var parallel = context.parallel("replay-fail-parallel", config);
357+
ParallelDurableFuture parallel = context.parallel("replay-fail-parallel", config);
358358

359359
try (parallel) {
360360
futures.add(parallel.branch("branch-ok", String.class, ctx -> {
@@ -371,7 +371,7 @@ void testParallelReplayWithFailedBranches() {
371371
}));
372372
}
373373

374-
var result = parallel.get();
374+
parallel.get();
375375
assertEquals("OK", futures.get(0).get());
376376
assertEquals("OK2", futures.get(2).get());
377377
return "done";
@@ -392,7 +392,7 @@ void testParallelWithSingleBranch() {
392392
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
393393
var config = ParallelConfig.builder().build();
394394
var futures = new ArrayList<DurableFuture<String>>();
395-
var parallel = context.parallel("single-branch", config);
395+
ParallelDurableFuture parallel = context.parallel("single-branch", config);
396396

397397
try (parallel) {
398398
futures.add(parallel.branch(
@@ -418,7 +418,7 @@ void testParallelWithWaitInsideBranches_replay() {
418418
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
419419
var config = ParallelConfig.builder().build();
420420
var futures = new ArrayList<DurableFuture<String>>();
421-
var parallel = context.parallel("wait-replay-parallel", config);
421+
ParallelDurableFuture parallel = context.parallel("wait-replay-parallel", config);
422422

423423
try (parallel) {
424424
for (var item : List.of("a", "b")) {
@@ -546,7 +546,7 @@ void testParallelWithToleratedFailureCount_earlyTermination() {
546546
.completionConfig(CompletionConfig.toleratedFailureCount(1))
547547
.build();
548548
var futures = new ArrayList<DurableFuture<String>>();
549-
var parallel = context.parallel("tolerated-fail", config);
549+
ParallelDurableFuture parallel = context.parallel("tolerated-fail", config);
550550

551551
try (parallel) {
552552
futures.add(parallel.branch("branch-ok", String.class, ctx -> "OK"));
@@ -580,7 +580,7 @@ void testParallelWithMinSuccessful_earlyTermination() {
580580
.completionConfig(CompletionConfig.minSuccessful(2))
581581
.build();
582582
var futures = new ArrayList<DurableFuture<String>>();
583-
var parallel = context.parallel("min-successful", config);
583+
ParallelDurableFuture parallel = context.parallel("min-successful", config);
584584

585585
try (parallel) {
586586
for (var item : List.of("a", "b", "c", "d", "e")) {
@@ -617,7 +617,7 @@ void testParallelWithAllSuccessful_stopsOnFirstFailure() {
617617
.completionConfig(CompletionConfig.allSuccessful())
618618
.build();
619619
var futures = new ArrayList<DurableFuture<String>>();
620-
var parallel = context.parallel("all-successful", config);
620+
ParallelDurableFuture parallel = context.parallel("all-successful", config);
621621

622622
try (parallel) {
623623
futures.add(parallel.branch("branch-ok1", String.class, ctx -> "OK1"));

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,7 @@ private DurableExecutionInput createDurableInput(I input) {
340340
.build();
341341

342342
// Load previous operations and include them in InitialExecutionState
343-
var existingOps =
344-
storage.getExecutionState(executionArn, "test-token", null).operations();
343+
var existingOps = storage.getAllOperations();
345344
var allOps = new ArrayList<>(List.of(executionOp));
346345
allOps.addAll(existingOps);
347346

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/EventProcessor.java

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,86 @@
55
import static software.amazon.awssdk.services.lambda.model.EventType.*;
66

77
import java.time.Instant;
8+
import java.util.List;
9+
import java.util.concurrent.CopyOnWriteArrayList;
810
import java.util.concurrent.atomic.AtomicInteger;
911
import software.amazon.awssdk.services.lambda.model.*;
1012

1113
/** Generates Event objects from OperationUpdate for local testing. */
1214
class EventProcessor {
1315
private final AtomicInteger eventId = new AtomicInteger(1);
16+
private final List<Event> allEvents = new CopyOnWriteArrayList<>();
1417

15-
Event processUpdate(OperationUpdate update, Operation operation) {
18+
void processUpdate(OperationUpdate update, Operation operation) {
1619
var builder = Event.builder()
1720
.eventId(eventId.getAndIncrement())
1821
.eventTimestamp(Instant.now())
1922
.id(update.id())
2023
.name(update.name());
2124

22-
return switch (update.type()) {
23-
case STEP -> buildStepEvent(builder, update, operation);
24-
case WAIT -> buildWaitEvent(builder, update, operation);
25-
case CHAINED_INVOKE -> buildInvokeEvent(builder, update, operation);
26-
case EXECUTION -> buildExecutionEvent(builder, update);
27-
case CALLBACK -> buildCallbackEvent(builder, update);
28-
case CONTEXT -> buildContextEvent(builder, update);
29-
default -> throw new IllegalArgumentException("Unsupported operation type: " + update.type());
30-
};
25+
Event event =
26+
switch (update.type()) {
27+
case STEP -> buildStepEvent(builder, update, operation);
28+
case WAIT -> buildWaitEvent(builder, update, operation);
29+
case CHAINED_INVOKE -> buildInvokeEvent(builder, update, operation);
30+
case EXECUTION -> buildExecutionEvent(builder, update);
31+
case CALLBACK -> buildCallbackEvent(builder, update);
32+
case CONTEXT -> buildContextEvent(builder, update);
33+
default -> throw new IllegalArgumentException("Unsupported operation type: " + update.type());
34+
};
35+
36+
allEvents.add(event);
37+
}
38+
39+
// process new status of an operation without an OperationUpdate
40+
void processUpdate(Operation updatedOperation) {
41+
var builder = Event.builder()
42+
.eventId(eventId.getAndIncrement())
43+
.eventTimestamp(Instant.now())
44+
.id(updatedOperation.id())
45+
.name(updatedOperation.name());
46+
// support the statuses that don't have a corresponding OperationAction
47+
switch (updatedOperation.status()) {
48+
case STARTED -> {
49+
// used by resetCheckpointToStarted
50+
return;
51+
}
52+
case READY -> {
53+
if (updatedOperation.type() == OperationType.STEP) {
54+
// no event type for this case
55+
return;
56+
} else {
57+
throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type());
58+
}
59+
}
60+
case TIMED_OUT -> {
61+
switch (updatedOperation.type()) {
62+
case EXECUTION -> builder.eventType(EXECUTION_TIMED_OUT);
63+
case CHAINED_INVOKE -> builder.eventType(CHAINED_INVOKE_TIMED_OUT);
64+
case CALLBACK -> builder.eventType(CALLBACK_TIMED_OUT);
65+
default ->
66+
throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type());
67+
}
68+
}
69+
case STOPPED -> {
70+
switch (updatedOperation.type()) {
71+
case EXECUTION -> builder.eventType(EXECUTION_STOPPED);
72+
case CHAINED_INVOKE -> builder.eventType(CHAINED_INVOKE_STOPPED);
73+
default ->
74+
throw new IllegalArgumentException("Unsupported operation type: " + updatedOperation.type());
75+
}
76+
}
77+
default -> throw new IllegalArgumentException("Unsupported operation status: " + updatedOperation.status());
78+
}
79+
allEvents.add(builder.build());
80+
}
81+
82+
List<Event> getAllEvents() {
83+
return List.copyOf(allEvents);
84+
}
85+
86+
public List<Event> getEventsForOperation(String operationId) {
87+
return allEvents.stream().filter(e -> e.id().equals(operationId)).toList();
3188
}
3289

3390
private Event buildStepEvent(Event.Builder builder, OperationUpdate update, Operation operation) {

0 commit comments

Comments
 (0)