Skip to content

Commit d9ef38e

Browse files
authored
[bugfix]: handle large input and empty initial operations (#112)
* fix large payload input * fix large payload input * add test case for large input * add test case for large input
1 parent f40431b commit d9ef38e

10 files changed

Lines changed: 168 additions & 50 deletions

File tree

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package com.amazonaws.lambda.durable.examples;
4+
5+
import com.amazonaws.lambda.durable.DurableContext;
6+
import com.amazonaws.lambda.durable.DurableHandler;
7+
8+
/**
9+
* Simple example demonstrating a durable function doesn't have any durable operation
10+
*
11+
* <p>This handler processes a greeting request and returns a greeting message
12+
*/
13+
public class NoopExample extends DurableHandler<GreetingRequest, String> {
14+
15+
@Override
16+
public String handleRequest(GreetingRequest input, DurableContext context) {
17+
return "HELLO, " + input.getName() + "!";
18+
}
19+
}

examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,17 @@ void testSimpleStepExample() {
7171
assertEquals("create-greeting", createGreetingOp.getName());
7272
}
7373

74+
@Test
75+
void testNoopExampleWithLargeInput() {
76+
var runner = CloudDurableTestRunner.create(arn("noop-example"), Map.class, String.class);
77+
// 6MB large input
78+
var largeInput = "A".repeat(1024 * 1024 * 6 - 12);
79+
var result = runner.run(Map.of("name", largeInput));
80+
81+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
82+
assertEquals("HELLO, " + largeInput + "!", result.getResult(String.class));
83+
}
84+
7485
@Test
7586
void testSimpleInvokeExample() {
7687
var runner = CloudDurableTestRunner.create(arn("simple-invoke-example"), Map.class, String.class);

examples/src/test/java/com/amazonaws/lambda/durable/examples/SimpleStepExampleTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,25 @@ void testSimpleStepExample() {
2727
assertEquals("HELLO, ALICE!", result.getResult(String.class));
2828
}
2929

30+
@Test
31+
void testWithLargePayload() {
32+
// Create handler
33+
var handler = new SimpleStepExample();
34+
35+
// Create test runner
36+
var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler);
37+
// 6MB large input
38+
var largeInput = "A".repeat(1024).repeat(1024).repeat(6);
39+
40+
// Run with input
41+
var input = new GreetingRequest(largeInput);
42+
var result = runner.run(input);
43+
44+
// Verify result
45+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
46+
assertEquals("HELLO, " + largeInput + "!", result.getResult(String.class));
47+
}
48+
3049
@Test
3150
void testWithDefaultName() {
3251
var handler = new SimpleStepExample();

examples/template.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,24 @@ Globals:
2727
- !Ref Architecture
2828

2929
Resources:
30+
NoopExampleFunction:
31+
Type: AWS::Serverless::Function
32+
Properties:
33+
PackageType: Image
34+
FunctionName: !Join
35+
- ''
36+
- - noop-example
37+
- !Ref FunctionNameSuffix
38+
ImageConfig:
39+
Command: [ "com.amazonaws.lambda.durable.examples.NoopExample::handleRequest" ]
40+
DurableConfig:
41+
ExecutionTimeout: 300
42+
RetentionPeriodInDays: 7
43+
Metadata:
44+
Dockerfile: !Ref DockerFile
45+
DockerContext: ../
46+
DockerTag: durable-examples
47+
3048
SimpleStepExampleFunction:
3149
Type: AWS::Serverless::Function
3250
Properties:
@@ -376,6 +394,14 @@ Resources:
376394
DockerTag: durable-examples
377395

378396
Outputs:
397+
NoopExampleFunction:
398+
Description: Noop Example Function ARN
399+
Value: !GetAtt NoopExampleFunction.Arn
400+
401+
NoopExampleFunctionName:
402+
Description: Noop Example Function Name
403+
Value: !Ref NoopExampleFunction
404+
379405
SimpleStepExampleFunction:
380406
Description: Simple Step Example Function ARN
381407
Value: !GetAtt SimpleStepExampleFunction.Arn

sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/TestResult.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.stream.Collectors;
1313
import software.amazon.awssdk.services.lambda.model.ErrorObject;
1414
import software.amazon.awssdk.services.lambda.model.Event;
15+
import software.amazon.awssdk.services.lambda.model.EventType;
1516
import software.amazon.awssdk.services.lambda.model.OperationStatus;
1617

1718
public class TestResult<O> {
@@ -50,7 +51,12 @@ public <T> T getResult(Class<T> resultType) {
5051
if (status != ExecutionStatus.SUCCEEDED) {
5152
throw new IllegalStateException("Execution did not succeed: " + status);
5253
}
53-
if (resultPayload == null) {
54+
if (resultPayload == null || resultPayload.isEmpty()) {
55+
var lastEvent = allEvents.get(allEvents.size() - 1);
56+
if (lastEvent.eventType() == EventType.EXECUTION_SUCCEEDED) {
57+
return serDes.deserialize(
58+
lastEvent.executionSucceededDetails().result().payload(), TypeToken.get(resultType));
59+
}
5460
return null;
5561
}
5662
return serDes.deserialize(resultPayload, TypeToken.get(resultType));

sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,30 +36,9 @@ public static <I, O> DurableExecutionOutput execute(
3636
Class<I> inputType,
3737
BiFunction<I, DurableContext, O> handler,
3838
DurableConfig config) {
39-
logger.debug("DurableExecution.execute() called");
40-
logger.debug("DurableExecutionArn: {}", input.durableExecutionArn());
41-
logger.debug(
42-
"Initial operations count: {}",
43-
input.initialExecutionState() != null
44-
&& input.initialExecutionState().operations() != null
45-
? input.initialExecutionState().operations().size()
46-
: 0);
47-
48-
// Validate initial operation is an EXECUTION operation
49-
if (input.initialExecutionState() == null
50-
|| input.initialExecutionState().operations() == null
51-
|| input.initialExecutionState().operations().isEmpty()
52-
|| input.initialExecutionState().operations().get(0).type() != OperationType.EXECUTION) {
53-
throw new IllegalStateException("First operation must be EXECUTION");
54-
}
55-
5639
var executionManager = new ExecutionManager(
5740
input.durableExecutionArn(), input.checkpointToken(), input.initialExecutionState(), config);
5841

59-
logger.debug(
60-
"EXECUTION operation found: {}",
61-
executionManager.getExecutionOperation().id());
62-
6342
var handlerFuture = CompletableFuture.supplyAsync(
6443
() -> {
6544
var userInput =

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
2020
import software.amazon.awssdk.services.lambda.model.Operation;
2121
import software.amazon.awssdk.services.lambda.model.OperationStatus;
22+
import software.amazon.awssdk.services.lambda.model.OperationType;
2223
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
2324

2425
/**
@@ -48,7 +49,7 @@ public class ExecutionManager {
4849

4950
// ===== Execution State =====
5051
private final Map<String, Operation> operationStorage;
51-
private final String executionOperationId;
52+
private final Operation executionOp;
5253
private final String durableExecutionArn;
5354
private final AtomicReference<ExecutionMode> executionMode;
5455

@@ -68,7 +69,6 @@ public ExecutionManager(
6869
CheckpointUpdatedExecutionState initialExecutionState,
6970
DurableConfig config) {
7071
this.durableExecutionArn = durableExecutionArn;
71-
this.executionOperationId = initialExecutionState.operations().get(0).id();
7272

7373
// Create checkpoint batcher for internal coordination
7474
this.checkpointBatcher =
@@ -80,6 +80,17 @@ public ExecutionManager(
8080
// Start in REPLAY mode if we have more than just the initial EXECUTION operation
8181
this.executionMode =
8282
new AtomicReference<>(operationStorage.size() > 1 ? ExecutionMode.REPLAY : ExecutionMode.EXECUTION);
83+
84+
executionOp = findExecutionOp(initialExecutionState);
85+
86+
// Validate initial operation is an EXECUTION operation
87+
if (executionOp == null) {
88+
throw new IllegalStateException("First operation must be EXECUTION");
89+
}
90+
logger.debug("DurableExecution.execute() called");
91+
logger.debug("DurableExecutionArn: {}", durableExecutionArn);
92+
logger.debug("Initial operations count: {}", operationStorage.size());
93+
logger.debug("EXECUTION operation found: {}", executionOp.id());
8394
}
8495

8596
// ===== State Management =====
@@ -128,7 +139,27 @@ public Operation getOperationAndUpdateReplayState(String operationId) {
128139
}
129140

130141
public Operation getExecutionOperation() {
131-
return operationStorage.get(executionOperationId);
142+
return executionOp;
143+
}
144+
145+
private Operation findExecutionOp(CheckpointUpdatedExecutionState initialExecutionState) {
146+
// find execution OP in the input
147+
if (initialExecutionState != null
148+
&& initialExecutionState.operations() != null
149+
&& !initialExecutionState.operations().isEmpty()) {
150+
var op = initialExecutionState.operations().get(0);
151+
if (op.type() != OperationType.EXECUTION) {
152+
throw new IllegalStateException("First operation must be EXECUTION");
153+
}
154+
return op;
155+
}
156+
// find execution OP in the checkpoint result
157+
for (Operation op : operationStorage.values()) {
158+
if (op.type() == OperationType.EXECUTION) {
159+
return op;
160+
}
161+
}
162+
return null;
132163
}
133164

134165
/**

sdk/src/test/java/com/amazonaws/lambda/durable/DurableContextTest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,28 @@
88
import com.amazonaws.lambda.durable.execution.SuspendExecutionException;
99
import com.amazonaws.lambda.durable.retry.RetryStrategies;
1010
import java.time.Duration;
11+
import java.util.ArrayList;
1112
import java.util.List;
1213
import org.junit.jupiter.api.Test;
1314
import software.amazon.awssdk.services.lambda.model.*;
1415

1516
class DurableContextTest {
17+
private static final Operation EXECUTION_OP = Operation.builder()
18+
.id("0")
19+
.type(OperationType.EXECUTION)
20+
.status(OperationStatus.STARTED)
21+
.build();
1622

1723
private DurableContext createTestContext() {
18-
var executionOp = Operation.builder()
19-
.id("0")
20-
.type(OperationType.EXECUTION)
21-
.status(OperationStatus.STARTED)
22-
.build();
23-
return createTestContext(List.of(executionOp));
24+
return createTestContext(List.of());
2425
}
2526

2627
private DurableContext createTestContext(List<Operation> initialOperations) {
2728
var client = TestUtils.createMockClient();
28-
var initialExecutionState = CheckpointUpdatedExecutionState.builder()
29-
.operations(initialOperations)
30-
.build();
29+
var operations = new ArrayList<>(List.of(EXECUTION_OP));
30+
operations.addAll(initialOperations);
31+
var initialExecutionState =
32+
CheckpointUpdatedExecutionState.builder().operations(operations).build();
3133
var executionManager = new ExecutionManager(
3234
"arn:aws:lambda:us-east-1:123456789012:function:test:$LATEST/durable-execution/"
3335
+ "349beff4-a89d-4bc8-a56f-af7a8af67a5f/20dae574-53da-37a1-bfd5-b0e2e6ec715d",

sdk/src/test/java/com/amazonaws/lambda/durable/execution/ExecutionManagerTest.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,26 @@
33
package com.amazonaws.lambda.durable.execution;
44

55
import static org.junit.jupiter.api.Assertions.*;
6+
import static org.mockito.ArgumentMatchers.any;
7+
import static org.mockito.Mockito.mock;
8+
import static org.mockito.Mockito.when;
69

710
import com.amazonaws.lambda.durable.DurableConfig;
811
import com.amazonaws.lambda.durable.TestUtils;
12+
import com.amazonaws.lambda.durable.client.DurableExecutionClient;
913
import java.util.List;
1014
import org.junit.jupiter.api.Test;
1115
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
16+
import software.amazon.awssdk.services.lambda.model.GetDurableExecutionStateResponse;
1217
import software.amazon.awssdk.services.lambda.model.Operation;
1318
import software.amazon.awssdk.services.lambda.model.OperationStatus;
1419
import software.amazon.awssdk.services.lambda.model.OperationType;
1520

1621
class ExecutionManagerTest {
22+
private DurableExecutionClient client;
1723

1824
private ExecutionManager createManager(List<Operation> operations) {
19-
var client = TestUtils.createMockClient();
25+
client = TestUtils.createMockClient();
2026
var initialState =
2127
CheckpointUpdatedExecutionState.builder().operations(operations).build();
2228
return new ExecutionManager(
@@ -109,4 +115,26 @@ void staysInReplayModeForFailedOperation() {
109115
assertNotNull(op);
110116
assertTrue(manager.isReplaying());
111117
}
118+
119+
@Test
120+
void emptyInitialState() {
121+
client = mock(DurableExecutionClient.class);
122+
when(client.getExecutionState(any(), any(), any()))
123+
.thenReturn(GetDurableExecutionStateResponse.builder()
124+
.operations(List.of(executionOp()))
125+
.nextMarker(null)
126+
.build());
127+
var initialState = CheckpointUpdatedExecutionState.builder()
128+
.operations(List.of())
129+
.nextMarker("marker")
130+
.build();
131+
var executionManager = new ExecutionManager(
132+
"arn:aws:lambda:us-east-1:123456789012:function:test",
133+
"test-token",
134+
initialState,
135+
DurableConfig.builder().withDurableExecutionClient(client).build());
136+
137+
assertNotNull(executionManager.getExecutionOperation());
138+
assertEquals("0", executionManager.getExecutionOperation().id());
139+
}
112140
}

sdk/src/test/java/com/amazonaws/lambda/durable/operation/CallbackOperationTest.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.amazonaws.lambda.durable.serde.JacksonSerDes;
1818
import com.amazonaws.lambda.durable.serde.SerDes;
1919
import java.time.Duration;
20+
import java.util.ArrayList;
2021
import java.util.List;
2122
import java.util.concurrent.atomic.AtomicInteger;
2223
import org.junit.jupiter.api.Test;
@@ -60,9 +61,15 @@ public <T> T deserialize(String data, TypeToken<T> typeToken) {
6061

6162
private ExecutionManager createExecutionManager(List<Operation> initialOperations) {
6263
var client = TestUtils.createMockClient();
63-
var initialState = CheckpointUpdatedExecutionState.builder()
64-
.operations(initialOperations)
65-
.build();
64+
var operations = new ArrayList<Operation>();
65+
operations.add(Operation.builder()
66+
.id("0")
67+
.type(OperationType.EXECUTION)
68+
.status(OperationStatus.STARTED)
69+
.build());
70+
operations.addAll(initialOperations);
71+
var initialState =
72+
CheckpointUpdatedExecutionState.builder().operations(operations).build();
6673
var executionManager = new ExecutionManager(
6774
"arn:aws:lambda:us-east-1:123456789012:function:test",
6875
"test-token",
@@ -74,12 +81,7 @@ private ExecutionManager createExecutionManager(List<Operation> initialOperation
7481

7582
@Test
7683
void executeCreatesCheckpointAndGetsCallbackId() {
77-
var executionOp = Operation.builder()
78-
.id("0")
79-
.type(OperationType.EXECUTION)
80-
.status(OperationStatus.STARTED)
81-
.build();
82-
var executionManager = createExecutionManager(List.of(executionOp));
84+
var executionManager = createExecutionManager(List.of());
8385
var serDes = new JacksonSerDes();
8486

8587
var operation = new CallbackOperation<>(
@@ -95,12 +97,7 @@ void executeCreatesCheckpointAndGetsCallbackId() {
9597

9698
@Test
9799
void executeWithConfigSetsOptions() {
98-
var executionOp = Operation.builder()
99-
.id("0")
100-
.type(OperationType.EXECUTION)
101-
.status(OperationStatus.STARTED)
102-
.build();
103-
var executionManager = createExecutionManager(List.of(executionOp));
100+
var executionManager = createExecutionManager(List.of());
104101
var serDes = new JacksonSerDes();
105102
var config = CallbackConfig.builder()
106103
.timeout(Duration.ofMinutes(5))

0 commit comments

Comments
 (0)