Skip to content

Commit c45eac8

Browse files
authored
remove redundant InitialExecutionState (#75)
1 parent 211ba67 commit c45eac8

12 files changed

Lines changed: 74 additions & 47 deletions

File tree

sdk-integration-tests/src/test/java/com/amazonaws/lambda/durable/DurableExecutionCheckpointTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ void testLargePayloadCheckpointing() {
3333
var input = new DurableExecutionInput(
3434
"arn:aws:lambda:us-east-1:123456789012:function:test",
3535
"token1",
36-
new DurableExecutionInput.InitialExecutionState(List.of(executionOp), null));
36+
CheckpointUpdatedExecutionState.builder()
37+
.operations(List.of(executionOp))
38+
.build());
3739

3840
var largeString = "x".repeat(7 * 1024 * 1024); // 7MB string
3941

@@ -67,7 +69,9 @@ void testSmallPayloadNoExtraCheckpoint() {
6769
var input = new DurableExecutionInput(
6870
"arn:aws:lambda:us-east-1:123456789012:function:test",
6971
"token1",
70-
new DurableExecutionInput.InitialExecutionState(List.of(executionOp), null));
72+
CheckpointUpdatedExecutionState.builder()
73+
.operations(List.of(executionOp))
74+
.build());
7175

7276
var smallResult = "Small result";
7377

sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/LocalDurableTestRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.ArrayList;
1515
import java.util.List;
1616
import java.util.function.BiFunction;
17+
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
1718
import software.amazon.awssdk.services.lambda.model.ErrorObject;
1819
import software.amazon.awssdk.services.lambda.model.ExecutionDetails;
1920
import software.amazon.awssdk.services.lambda.model.Operation;
@@ -268,7 +269,7 @@ private DurableExecutionInput createDurableInput(I input) {
268269
return new DurableExecutionInput(
269270
"arn:aws:lambda:us-east-1:123456789012:function:test",
270271
"test-token",
271-
new DurableExecutionInput.InitialExecutionState(allOps, null));
272+
CheckpointUpdatedExecutionState.builder().operations(allOps).build());
272273
}
273274

274275
private Context mockLambdaContext() {

sdk/src/main/java/com/amazonaws/lambda/durable/execution/CheckpointBatcher.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.function.Consumer;
1414
import org.slf4j.Logger;
1515
import org.slf4j.LoggerFactory;
16+
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
1617
import software.amazon.awssdk.services.lambda.model.Operation;
1718
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
1819

@@ -72,11 +73,15 @@ void shutdown() {
7273
req -> req.completion().completeExceptionally(new IllegalStateException("CheckpointManager shutdown")));
7374
}
7475

75-
public List<Operation> fetchAllPages(List<Operation> initialOperations, String nextMarker) {
76+
public List<Operation> fetchAllPages(CheckpointUpdatedExecutionState checkpointUpdatedExecutionState) {
7677
List<Operation> operations = new ArrayList<>();
77-
if (initialOperations != null) {
78-
operations.addAll(initialOperations);
78+
if (checkpointUpdatedExecutionState == null) {
79+
return operations;
7980
}
81+
if (checkpointUpdatedExecutionState.operations() != null) {
82+
operations.addAll(checkpointUpdatedExecutionState.operations());
83+
}
84+
var nextMarker = checkpointUpdatedExecutionState.nextMarker();
8085
while (nextMarker != null && !nextMarker.isEmpty()) {
8186
var response = client.getExecutionState(durableExecutionArn, checkpointToken, nextMarker);
8287
logger.debug("DAR getExecutionState called: {}.", response);
@@ -105,14 +110,9 @@ private void processQueue() {
105110
// This means the polling will never receive an operation update and complete
106111
// the Phaser.
107112
checkpointToken = response.checkpointToken();
108-
if (response.newExecutionState() != null) {
109-
var operations = fetchAllPages(
110-
response.newExecutionState().operations(),
111-
response.newExecutionState().nextMarker());
112-
if (!operations.isEmpty()) {
113-
callback.accept(operations);
114-
}
115-
}
113+
114+
// fetch all pages of operations and call the callback
115+
callback.accept(fetchAllPages(response.newExecutionState()));
116116

117117
batch.forEach(req -> req.completion().complete(null));
118118
}

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import com.amazonaws.lambda.durable.client.DurableExecutionClient;
66
import com.amazonaws.lambda.durable.exception.UnrecoverableDurableExecutionException;
7-
import com.amazonaws.lambda.durable.model.DurableExecutionInput.InitialExecutionState;
87
import java.time.Duration;
98
import java.time.Instant;
109
import java.util.Collections;
@@ -17,6 +16,7 @@
1716
import java.util.stream.Collectors;
1817
import org.slf4j.Logger;
1918
import org.slf4j.LoggerFactory;
19+
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
2020
import software.amazon.awssdk.services.lambda.model.Operation;
2121
import software.amazon.awssdk.services.lambda.model.OperationStatus;
2222
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
@@ -61,7 +61,7 @@ public class ExecutionManager {
6161
public ExecutionManager(
6262
String durableExecutionArn,
6363
String checkpointToken,
64-
InitialExecutionState initialExecutionState,
64+
CheckpointUpdatedExecutionState initialExecutionState,
6565
DurableExecutionClient client) {
6666
this.durableExecutionArn = durableExecutionArn;
6767
this.executionOperationId = initialExecutionState.operations().get(0).id();
@@ -70,11 +70,8 @@ public ExecutionManager(
7070
this.checkpointBatcher =
7171
new CheckpointBatcher(client, durableExecutionArn, checkpointToken, this::onCheckpointComplete);
7272

73-
this.operations =
74-
checkpointBatcher
75-
.fetchAllPages(initialExecutionState.operations(), initialExecutionState.nextMarker())
76-
.stream()
77-
.collect(Collectors.toConcurrentMap(Operation::id, op -> op));
73+
this.operations = checkpointBatcher.fetchAllPages(initialExecutionState).stream()
74+
.collect(Collectors.toConcurrentMap(Operation::id, op -> op));
7875

7976
// Start in REPLAY mode if we have more than just the initial EXECUTION operation
8077
this.executionMode =

sdk/src/main/java/com/amazonaws/lambda/durable/model/DurableExecutionInput.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package com.amazonaws.lambda.durable.model;
44

5-
import java.util.List;
6-
import software.amazon.awssdk.services.lambda.model.Operation;
5+
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
76

87
public record DurableExecutionInput(
9-
String durableExecutionArn, String checkpointToken, InitialExecutionState initialExecutionState) {
10-
public record InitialExecutionState(List<Operation> operations, String nextMarker) {}
11-
}
8+
String durableExecutionArn, String checkpointToken, CheckpointUpdatedExecutionState initialExecutionState) {}

sdk/src/main/java/com/amazonaws/lambda/durable/serde/AwsSdkV2Module.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.lang.reflect.InvocationTargetException;
1414
import java.lang.reflect.Method;
1515
import java.util.List;
16+
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
1617
import software.amazon.awssdk.services.lambda.model.ErrorObject;
1718
import software.amazon.awssdk.services.lambda.model.Operation;
1819

@@ -24,7 +25,8 @@ public class AwsSdkV2Module extends SimpleModule {
2425
*
2526
* <p>See https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-serialization-changes.html
2627
*/
27-
private static final List<Class<?>> SDK_CLASSES = List.of(Operation.class, ErrorObject.class);
28+
private static final List<Class<?>> SDK_CLASSES =
29+
List.of(Operation.class, ErrorObject.class, CheckpointUpdatedExecutionState.class);
2830

2931
public AwsSdkV2Module() {
3032
super("AwsSdkV2Module");

sdk/src/test/java/com/amazonaws/lambda/durable/DurableContextTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import com.amazonaws.lambda.durable.execution.ExecutionManager;
88
import com.amazonaws.lambda.durable.execution.SuspendExecutionException;
9-
import com.amazonaws.lambda.durable.model.DurableExecutionInput.InitialExecutionState;
109
import com.amazonaws.lambda.durable.retry.RetryStrategies;
1110
import java.time.Duration;
1211
import java.util.List;
@@ -26,7 +25,9 @@ private DurableContext createTestContext() {
2625

2726
private DurableContext createTestContext(List<Operation> initialOperations) {
2827
var client = TestUtils.createMockClient();
29-
var initialExecutionState = new InitialExecutionState(initialOperations, null);
28+
var initialExecutionState = CheckpointUpdatedExecutionState.builder()
29+
.operations(initialOperations)
30+
.build();
3031
var executionManager = new ExecutionManager(
3132
"arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/"
3233
+ "349beff4-a89d-4bc8-a56f-af7a8af67a5f/20dae574-53da-37a1-bfd5-b0e2e6ec715d",

sdk/src/test/java/com/amazonaws/lambda/durable/DurableExecutionTest.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.List;
1515
import java.util.concurrent.ExecutorService;
1616
import org.junit.jupiter.api.Test;
17+
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
1718
import software.amazon.awssdk.services.lambda.model.ExecutionDetails;
1819
import software.amazon.awssdk.services.lambda.model.Operation;
1920
import software.amazon.awssdk.services.lambda.model.OperationStatus;
@@ -42,7 +43,9 @@ void testExecuteSuccess() {
4243
var input = new DurableExecutionInput(
4344
"arn:aws:lambda:us-east-1:123456789012:function:test",
4445
"token1",
45-
new DurableExecutionInput.InitialExecutionState(List.of(executionOp), null));
46+
CheckpointUpdatedExecutionState.builder()
47+
.operations(List.of(executionOp))
48+
.build());
4649

4750
var output = DurableExecutor.execute(
4851
input,
@@ -70,7 +73,9 @@ void testExecutePending() {
7073
var input = new DurableExecutionInput(
7174
"arn:aws:lambda:us-east-1:123456789012:function:test",
7275
"token1",
73-
new DurableExecutionInput.InitialExecutionState(List.of(executionOp), null));
76+
CheckpointUpdatedExecutionState.builder()
77+
.operations(List.of(executionOp))
78+
.build());
7479

7580
var output = DurableExecutor.execute(
7681
input,
@@ -101,7 +106,9 @@ void testExecuteFailure() {
101106
var input = new DurableExecutionInput(
102107
"arn:aws:lambda:us-east-1:123456789012:function:test",
103108
"token1",
104-
new DurableExecutionInput.InitialExecutionState(List.of(executionOp), null));
109+
CheckpointUpdatedExecutionState.builder()
110+
.operations(List.of(executionOp))
111+
.build());
105112

106113
var output = DurableExecutor.execute(
107114
input,
@@ -140,7 +147,9 @@ void testExecuteReplay() {
140147
var input = new DurableExecutionInput(
141148
"arn:aws:lambda:us-east-1:123456789012:function:test",
142149
"token2",
143-
new DurableExecutionInput.InitialExecutionState(List.of(executionOp, completedStep), null));
150+
CheckpointUpdatedExecutionState.builder()
151+
.operations(List.of(executionOp, completedStep))
152+
.build());
144153

145154
var output = DurableExecutor.execute(
146155
input,
@@ -158,7 +167,7 @@ void testValidationNoOperations() {
158167
var input = new DurableExecutionInput(
159168
"arn:aws:lambda:us-east-1:123456789012:function:test",
160169
"token1",
161-
new DurableExecutionInput.InitialExecutionState(List.of(), null));
170+
CheckpointUpdatedExecutionState.builder().operations(List.of()).build());
162171

163172
var exception = assertThrows(
164173
IllegalStateException.class,
@@ -180,7 +189,9 @@ void testValidationWrongFirstOperation() {
180189
var input = new DurableExecutionInput(
181190
"arn:aws:lambda:us-east-1:123456789012:function:test",
182191
"token1",
183-
new DurableExecutionInput.InitialExecutionState(List.of(stepOp), null));
192+
CheckpointUpdatedExecutionState.builder()
193+
.operations(List.of(stepOp))
194+
.build());
184195

185196
var exception = assertThrows(
186197
IllegalStateException.class,
@@ -201,7 +212,9 @@ void testValidationMissingExecutionDetails() {
201212
var input = new DurableExecutionInput(
202213
"arn:aws:lambda:us-east-1:123456789012:function:test",
203214
"token1",
204-
new DurableExecutionInput.InitialExecutionState(List.of(executionOp), null));
215+
CheckpointUpdatedExecutionState.builder()
216+
.operations(List.of(executionOp))
217+
.build());
205218

206219
var result = DurableExecutor.execute(
207220
input, null, String.class, (userInput, ctx) -> "result", configWithMockClient());
@@ -232,7 +245,9 @@ void testExecutorNotShutdownAfterMultipleHandlerInvocations() {
232245
var input1 = new DurableExecutionInput(
233246
"arn:aws:lambda:us-east-1:123456789012:function:test",
234247
"token1",
235-
new DurableExecutionInput.InitialExecutionState(List.of(executionOp), null));
248+
CheckpointUpdatedExecutionState.builder()
249+
.operations(List.of(executionOp))
250+
.build());
236251

237252
// Execute first handler
238253
var output1 = DurableExecutor.execute(
@@ -258,7 +273,9 @@ void testExecutorNotShutdownAfterMultipleHandlerInvocations() {
258273
var input2 = new DurableExecutionInput(
259274
"arn:aws:lambda:us-east-1:123456789012:function:test",
260275
"token2",
261-
new DurableExecutionInput.InitialExecutionState(List.of(executionOp2), null));
276+
CheckpointUpdatedExecutionState.builder()
277+
.operations(List.of(executionOp2))
278+
.build());
262279

263280
// Execute second handler using the same config (and thus same executor)
264281
var output2 = DurableExecutor.execute(

sdk/src/test/java/com/amazonaws/lambda/durable/DurableExecutionWrapperTest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package com.amazonaws.lambda.durable;
44

5-
import static java.util.List.of;
65
import static org.junit.jupiter.api.Assertions.assertEquals;
76
import static org.junit.jupiter.api.Assertions.assertNotNull;
87

@@ -12,7 +11,9 @@
1211
import com.amazonaws.lambda.durable.model.ExecutionStatus;
1312
import com.amazonaws.lambda.durable.serde.JacksonSerDes;
1413
import com.amazonaws.services.lambda.runtime.RequestHandler;
14+
import java.util.List;
1515
import org.junit.jupiter.api.Test;
16+
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
1617
import software.amazon.awssdk.services.lambda.model.ExecutionDetails;
1718
import software.amazon.awssdk.services.lambda.model.Operation;
1819
import software.amazon.awssdk.services.lambda.model.OperationStatus;
@@ -71,7 +72,9 @@ void testWrapperPattern() {
7172
var input = new DurableExecutionInput(
7273
"arn:aws:lambda:us-east-1:123456789012:function:test",
7374
"token-1",
74-
new DurableExecutionInput.InitialExecutionState(of(executionOp), null));
75+
CheckpointUpdatedExecutionState.builder()
76+
.operations(List.of(executionOp))
77+
.build());
7578

7679
// Execute
7780
var output = handler.handleRequest(input, null);
@@ -105,7 +108,9 @@ void testWrapperWithMethodReference() {
105108
var input = new DurableExecutionInput(
106109
"arn:aws:lambda:us-east-1:123456789012:function:test",
107110
"token-1",
108-
new DurableExecutionInput.InitialExecutionState(of(executionOp), null));
111+
CheckpointUpdatedExecutionState.builder()
112+
.operations(List.of(executionOp))
113+
.build());
109114

110115
var output = handler.handleRequest(input, null);
111116

sdk/src/test/java/com/amazonaws/lambda/durable/ReplayValidationTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88

99
import com.amazonaws.lambda.durable.exception.NonDeterministicExecutionException;
1010
import com.amazonaws.lambda.durable.execution.ExecutionManager;
11-
import com.amazonaws.lambda.durable.model.DurableExecutionInput.InitialExecutionState;
1211
import java.time.Duration;
1312
import java.util.List;
1413
import java.util.stream.Stream;
1514
import org.junit.jupiter.api.Test;
15+
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
1616
import software.amazon.awssdk.services.lambda.model.Operation;
1717
import software.amazon.awssdk.services.lambda.model.OperationStatus;
1818
import software.amazon.awssdk.services.lambda.model.OperationType;
@@ -29,7 +29,8 @@ private DurableContext createTestContext(List<Operation> initialOperations) {
2929
.build();
3030
var operations = Stream.concat(Stream.of(executionOp), initialOperations.stream())
3131
.toList();
32-
var initialExecutionState = new InitialExecutionState(operations, null);
32+
var initialExecutionState =
33+
CheckpointUpdatedExecutionState.builder().operations(operations).build();
3334
var executionManager = new ExecutionManager(
3435
"arn:aws:lambda:us-east-1:123456789012:function:test", "test-token", initialExecutionState, client);
3536
return new DurableContext(executionManager, DurableConfig.builder().build(), null);

0 commit comments

Comments
 (0)