Skip to content

Commit 0b387a3

Browse files
committed
fix large payload input
1 parent d2e1973 commit 0b387a3

5 files changed

Lines changed: 73 additions & 36 deletions

File tree

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public ExecutionManager(
7777
this.executionMode =
7878
new AtomicReference<>(operationStorage.size() > 1 ? ExecutionMode.REPLAY : ExecutionMode.EXECUTION);
7979

80-
executionOp = findExecutionOp();
80+
executionOp = findExecutionOp(initialExecutionState);
8181

8282
// Validate initial operation is an EXECUTION operation
8383
if (executionOp == null) {
@@ -89,15 +89,6 @@ public ExecutionManager(
8989
logger.debug("EXECUTION operation found: {}", executionOp.id());
9090
}
9191

92-
private Operation findExecutionOp() {
93-
for (Operation op : operationStorage.values()) {
94-
if (op.type() == OperationType.EXECUTION) {
95-
return op;
96-
}
97-
}
98-
return null;
99-
}
100-
10192
// ===== State Management =====
10293

10394
public String getDurableExecutionArn() {
@@ -149,6 +140,26 @@ public Operation getExecutionOperation() {
149140
return executionOp;
150141
}
151142

143+
private Operation findExecutionOp(CheckpointUpdatedExecutionState initialExecutionState) {
144+
// find execution OP in the input
145+
if (initialExecutionState != null
146+
&& initialExecutionState.operations() != null
147+
&& !initialExecutionState.operations().isEmpty()) {
148+
var op = initialExecutionState.operations().get(0);
149+
if (op.type() != OperationType.EXECUTION) {
150+
throw new IllegalStateException("First operation must be EXECUTION");
151+
}
152+
return op;
153+
}
154+
// find execution OP in the checkpoint result
155+
for (Operation op : operationStorage.values()) {
156+
if (op.type() == OperationType.EXECUTION) {
157+
return op;
158+
}
159+
}
160+
return null;
161+
}
162+
152163
// ===== Thread Coordination =====
153164
/**
154165
* Registers a thread as active without setting the thread local OperationContext. Use this when registration must

sdk/src/test/java/com/amazonaws/lambda/durable/DurableContextTest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,28 @@
88
import com.amazonaws.lambda.durable.execution.SuspendExecutionException;
99
import com.amazonaws.lambda.durable.retry.RetryStrategies;
1010
import java.time.Duration;
11+
import java.util.ArrayList;
1112
import java.util.List;
1213
import org.junit.jupiter.api.Test;
1314
import software.amazon.awssdk.services.lambda.model.*;
1415

1516
class DurableContextTest {
17+
private static final Operation EXECUTION_OP = Operation.builder()
18+
.id("0")
19+
.type(OperationType.EXECUTION)
20+
.status(OperationStatus.STARTED)
21+
.build();
1622

1723
private DurableContext createTestContext() {
18-
var executionOp = Operation.builder()
19-
.id("0")
20-
.type(OperationType.EXECUTION)
21-
.status(OperationStatus.STARTED)
22-
.build();
23-
return createTestContext(List.of(executionOp));
24+
return createTestContext(List.of());
2425
}
2526

2627
private DurableContext createTestContext(List<Operation> initialOperations) {
2728
var client = TestUtils.createMockClient();
28-
var initialExecutionState = CheckpointUpdatedExecutionState.builder()
29-
.operations(initialOperations)
30-
.build();
29+
var operations = new ArrayList<>(List.of(EXECUTION_OP));
30+
operations.addAll(initialOperations);
31+
var initialExecutionState =
32+
CheckpointUpdatedExecutionState.builder().operations(operations).build();
3133
var executionManager = new ExecutionManager(
3234
"arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/"
3335
+ "349beff4-a89d-4bc8-a56f-af7a8af67a5f/20dae574-53da-37a1-bfd5-b0e2e6ec715d",

sdk/src/test/java/com/amazonaws/lambda/durable/execution/ExecutionManagerTest.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,26 @@
33
package com.amazonaws.lambda.durable.execution;
44

55
import static org.junit.jupiter.api.Assertions.*;
6+
import static org.mockito.ArgumentMatchers.any;
7+
import static org.mockito.Mockito.mock;
8+
import static org.mockito.Mockito.when;
69

710
import com.amazonaws.lambda.durable.DurableConfig;
811
import com.amazonaws.lambda.durable.TestUtils;
12+
import com.amazonaws.lambda.durable.client.DurableExecutionClient;
913
import java.util.List;
1014
import org.junit.jupiter.api.Test;
1115
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
16+
import software.amazon.awssdk.services.lambda.model.GetDurableExecutionStateResponse;
1217
import software.amazon.awssdk.services.lambda.model.Operation;
1318
import software.amazon.awssdk.services.lambda.model.OperationStatus;
1419
import software.amazon.awssdk.services.lambda.model.OperationType;
1520

1621
class ExecutionManagerTest {
22+
private DurableExecutionClient client;
1723

1824
private ExecutionManager createManager(List<Operation> operations) {
19-
var client = TestUtils.createMockClient();
25+
client = TestUtils.createMockClient();
2026
var initialState =
2127
CheckpointUpdatedExecutionState.builder().operations(operations).build();
2228
return new ExecutionManager(
@@ -109,4 +115,26 @@ void staysInReplayModeForFailedOperation() {
109115
assertNotNull(op);
110116
assertTrue(manager.isReplaying());
111117
}
118+
119+
@Test
120+
void emptyInitialState() {
121+
client = mock(DurableExecutionClient.class);
122+
when(client.getExecutionState(any(), any(), any()))
123+
.thenReturn(GetDurableExecutionStateResponse.builder()
124+
.operations(List.of(executionOp()))
125+
.nextMarker(null)
126+
.build());
127+
var initialState = CheckpointUpdatedExecutionState.builder()
128+
.operations(List.of())
129+
.nextMarker("marker")
130+
.build();
131+
var executionManager = new ExecutionManager(
132+
"arn:aws:lambda:us-east-1:123456789012:function:test",
133+
"test-token",
134+
initialState,
135+
DurableConfig.builder().withDurableExecutionClient(client).build());
136+
137+
assertNotNull(executionManager.getExecutionOperation());
138+
assertEquals("0", executionManager.getExecutionOperation().id());
139+
}
112140
}

sdk/src/test/java/com/amazonaws/lambda/durable/operation/CallbackOperationTest.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.amazonaws.lambda.durable.serde.JacksonSerDes;
1818
import com.amazonaws.lambda.durable.serde.SerDes;
1919
import java.time.Duration;
20+
import java.util.ArrayList;
2021
import java.util.List;
2122
import java.util.concurrent.atomic.AtomicInteger;
2223
import org.junit.jupiter.api.Test;
@@ -60,9 +61,15 @@ public <T> T deserialize(String data, TypeToken<T> typeToken) {
6061

6162
private ExecutionManager createExecutionManager(List<Operation> initialOperations) {
6263
var client = TestUtils.createMockClient();
63-
var initialState = CheckpointUpdatedExecutionState.builder()
64-
.operations(initialOperations)
65-
.build();
64+
var operations = new ArrayList<Operation>();
65+
operations.add(Operation.builder()
66+
.id("0")
67+
.type(OperationType.EXECUTION)
68+
.status(OperationStatus.STARTED)
69+
.build());
70+
operations.addAll(initialOperations);
71+
var initialState =
72+
CheckpointUpdatedExecutionState.builder().operations(operations).build();
6673
var executionManager = new ExecutionManager(
6774
"arn:aws:lambda:us-east-1:123456789012:function:test",
6875
"test-token",
@@ -74,12 +81,7 @@ private ExecutionManager createExecutionManager(List<Operation> initialOperation
7481

7582
@Test
7683
void executeCreatesCheckpointAndGetsCallbackId() {
77-
var executionOp = Operation.builder()
78-
.id("0")
79-
.type(OperationType.EXECUTION)
80-
.status(OperationStatus.STARTED)
81-
.build();
82-
var executionManager = createExecutionManager(List.of(executionOp));
84+
var executionManager = createExecutionManager(List.of());
8385
var serDes = new JacksonSerDes();
8486

8587
var operation = new CallbackOperation<>(
@@ -95,12 +97,7 @@ void executeCreatesCheckpointAndGetsCallbackId() {
9597

9698
@Test
9799
void executeWithConfigSetsOptions() {
98-
var executionOp = Operation.builder()
99-
.id("0")
100-
.type(OperationType.EXECUTION)
101-
.status(OperationStatus.STARTED)
102-
.build();
103-
var executionManager = createExecutionManager(List.of(executionOp));
100+
var executionManager = createExecutionManager(List.of());
104101
var serDes = new JacksonSerDes();
105102
var config = CallbackConfig.builder()
106103
.timeout(Duration.ofMinutes(5))

sdk/src/test/java/com/amazonaws/lambda/durable/operation/WaitOperationTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import static org.junit.jupiter.api.Assertions.assertNull;
77
import static org.junit.jupiter.api.Assertions.assertThrows;
88
import static org.junit.jupiter.api.Assertions.assertTrue;
9-
import static org.mockito.Mockito.any;
109
import static org.mockito.Mockito.mock;
1110
import static org.mockito.Mockito.when;
1211

0 commit comments

Comments
 (0)