Skip to content

Commit 55442b1

Browse files
authored
fix: paginated checkpoint token
* fix: checkpoint token for paginated execution state * fix: checkpoint token for paginated execution state
1 parent 39d2143 commit 55442b1

12 files changed

Lines changed: 180 additions & 5 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ buildNumber.properties
2121
# Kiro and SOP
2222
.kiro
2323
.sop
24+
.factorypath
2425

2526
# Local testing
2627

AGENTS.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ public DurableExecutor(DurableExecutionClient client, SerDes serDes) {
9494

9595
Prefer descriptive domain names: `model`, `execution`, `operation`, `serde`, `exception`
9696

97+
### Logging in Examples
98+
99+
Use `context.getLogger()` instead of SLF4J's `LoggerFactory` in example handlers. It includes execution metadata and suppresses duplicate logs during replay.
100+
97101
## Do Not
98102

99103
- Add new dependencies without explicit approval

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ mvn test -Dtest=CloudBasedIntegrationTest \
8787
| [WaitAtLeastExample](src/main/java/com/amazonaws/lambda/durable/examples/WaitAtLeastExample.java) | Concurrent `stepAsync()` with `wait()` |
8888
| [RetryInProcessExample](src/main/java/com/amazonaws/lambda/durable/examples/RetryInProcessExample.java) | In-process retry with concurrent operations |
8989
| [WaitAtLeastInProcessExample](src/main/java/com/amazonaws/lambda/durable/examples/WaitAtLeastInProcessExample.java) | Wait completes before async step (no suspension) |
90+
| [ManyAsyncStepsExample](src/main/java/com/amazonaws/lambda/durable/examples/ManyAsyncStepsExample.java) | Performance test with 500 concurrent async steps |
9091

9192
## Cleanup
9293

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package com.amazonaws.lambda.durable.examples;
4+
5+
import com.amazonaws.lambda.durable.DurableContext;
6+
import com.amazonaws.lambda.durable.DurableFuture;
7+
import com.amazonaws.lambda.durable.DurableHandler;
8+
import java.time.Duration;
9+
import java.util.ArrayList;
10+
11+
/**
12+
* Performance test example demonstrating concurrent async steps.
13+
*
14+
* <p>This example tests the SDK's ability to handle many concurrent operations:
15+
*
16+
* <ul>
17+
* <li>Creates async steps in a loop
18+
* <li>Each step performs a simple computation
19+
* <li>All results are collected and summed
20+
* </ul>
21+
*/
22+
public class ManyAsyncStepsExample extends DurableHandler<ManyAsyncStepsExample.Input, String> {
23+
24+
private static final int STEP_COUNT = 500;
25+
26+
public record Input(int multiplier) {}
27+
28+
@Override
29+
public String handleRequest(Input input, DurableContext context) {
30+
var startTime = System.currentTimeMillis();
31+
var multiplier = input.multiplier() > 0 ? input.multiplier() : 1;
32+
33+
context.getLogger().info("Starting {} async steps with multiplier {}", STEP_COUNT, multiplier);
34+
35+
// Create 100 async steps
36+
var futures = new ArrayList<DurableFuture<Integer>>(STEP_COUNT);
37+
for (var i = 0; i < STEP_COUNT; i++) {
38+
var index = i;
39+
var future = context.stepAsync("compute-" + i, Integer.class, () -> index * multiplier);
40+
futures.add(future);
41+
}
42+
43+
context.getLogger().info("All {} async steps created, collecting results", STEP_COUNT);
44+
45+
// Collect all results
46+
var totalSum = 0L;
47+
for (var future : futures) {
48+
totalSum += future.get();
49+
}
50+
51+
var executionTimeMs = System.currentTimeMillis() - startTime;
52+
context.getLogger()
53+
.info("Completed {} steps, total sum: {}, execution time: {}ms", STEP_COUNT, totalSum, executionTimeMs);
54+
55+
// Wait 10 seconds to test replay
56+
context.wait("post-compute-wait", Duration.ofSeconds(10));
57+
58+
return String.format("Completed %d async steps. Sum: %d, Time: %dms", STEP_COUNT, totalSum, executionTimeMs);
59+
}
60+
}

examples/src/test/java/com/amazonaws/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,4 +332,22 @@ void testCallbackExampleWithTimeout() {
332332
assertNotNull(approvalOp);
333333
assertEquals(OperationStatus.TIMED_OUT, approvalOp.getStatus());
334334
}
335+
336+
@Test
337+
void testManyAsyncStepsExample() {
338+
var runner = CloudDurableTestRunner.create(
339+
arn("many-async-steps-example"), ManyAsyncStepsExample.Input.class, String.class);
340+
var result = runner.run(new ManyAsyncStepsExample.Input(2));
341+
342+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
343+
344+
var finalResult = result.getResult(String.class);
345+
assertNotNull(finalResult);
346+
assertTrue(finalResult.contains("500 async steps"));
347+
assertTrue(finalResult.contains("249500")); // Sum of 0..499 * 2
348+
349+
// Verify some operations are tracked
350+
assertNotNull(runner.getOperation("compute-0"));
351+
assertNotNull(runner.getOperation("compute-499"));
352+
}
335353
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package com.amazonaws.lambda.durable.examples;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
import com.amazonaws.lambda.durable.model.ExecutionStatus;
8+
import com.amazonaws.lambda.durable.testing.LocalDurableTestRunner;
9+
import org.junit.jupiter.api.Test;
10+
11+
class ManyAsyncStepsExampleTest {
12+
13+
@Test
14+
void testManyAsyncSteps() {
15+
var handler = new ManyAsyncStepsExample();
16+
var runner = LocalDurableTestRunner.create(ManyAsyncStepsExample.Input.class, handler);
17+
18+
var input = new ManyAsyncStepsExample.Input(2);
19+
var result = runner.runUntilComplete(input);
20+
21+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
22+
23+
var output = result.getResult(String.class);
24+
assertNotNull(output);
25+
assertTrue(output.contains("500 async steps"));
26+
27+
// Sum of 0..499 * 2 = 499 * 500 / 2 * 2 = 249500
28+
assertTrue(output.contains("249500"));
29+
}
30+
31+
@Test
32+
void testManyAsyncStepsWithDefaultMultiplier() {
33+
var handler = new ManyAsyncStepsExample();
34+
var runner = LocalDurableTestRunner.create(ManyAsyncStepsExample.Input.class, handler);
35+
36+
var input = new ManyAsyncStepsExample.Input(1);
37+
var result = runner.runUntilComplete(input);
38+
39+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
40+
41+
// Sum of 0..499 = 499 * 500 / 2 = 124750
42+
assertTrue(result.getResult(String.class).contains("124750"));
43+
}
44+
45+
@Test
46+
void testOperationsAreTracked() {
47+
var handler = new ManyAsyncStepsExample();
48+
var runner = LocalDurableTestRunner.create(ManyAsyncStepsExample.Input.class, handler);
49+
50+
var result = runner.runUntilComplete(new ManyAsyncStepsExample.Input(1));
51+
52+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
53+
54+
// Verify some operations are tracked
55+
assertNotNull(result.getOperation("compute-0"));
56+
assertNotNull(result.getOperation("compute-499"));
57+
assertNotNull(result.getOperation("compute-250"));
58+
}
59+
}

examples/template.yaml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,28 @@ Resources:
252252
DockerContext: ../
253253
DockerTag: durable-examples
254254

255+
ManyAsyncStepsExampleFunction:
256+
Type: AWS::Serverless::Function
257+
Properties:
258+
PackageType: Image
259+
FunctionName: many-async-steps-example
260+
ImageConfig:
261+
Command: ["com.amazonaws.lambda.durable.examples.ManyAsyncStepsExample::handleRequest"]
262+
DurableConfig:
263+
ExecutionTimeout: 300
264+
RetentionPeriodInDays: 7
265+
Policies:
266+
- Statement:
267+
- Effect: Allow
268+
Action:
269+
- lambda:CheckpointDurableExecutions
270+
- lambda:GetDurableExecutionState
271+
Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:many-async-steps-example"
272+
Metadata:
273+
Dockerfile: examples/Dockerfile
274+
DockerContext: ../
275+
DockerTag: durable-examples
276+
255277
Outputs:
256278
SimpleStepExampleFunction:
257279
Description: Simple Step Example Function ARN
@@ -340,3 +362,11 @@ Outputs:
340362
CallbackExampleFunctionName:
341363
Description: Callback Example Function Name
342364
Value: !Ref CallbackExampleFunction
365+
366+
ManyAsyncStepsExampleFunction:
367+
Description: Many Async Steps Example Function ARN
368+
Value: !GetAtt ManyAsyncStepsExampleFunction.Arn
369+
370+
ManyAsyncStepsExampleFunctionName:
371+
Description: Many Async Steps Example Function Name
372+
Value: !Ref ManyAsyncStepsExampleFunction

sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/LocalDurableTestRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,8 @@ private DurableExecutionInput createDurableInput(I input) {
239239
.build();
240240

241241
// Load previous operations and include them in InitialExecutionState
242-
var existingOps = storage.getExecutionState("arn:aws:lambda:us-east-1:123456789012:function:test", null)
242+
var existingOps = storage.getExecutionState(
243+
"arn:aws:lambda:us-east-1:123456789012:function:test", "test-token", null)
243244
.operations();
244245
var allOps = new ArrayList<>(List.of(executionOp));
245246
allOps.addAll(existingOps);

sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/LocalMemoryExecutionClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public CheckpointDurableExecutionResponse checkpoint(String arn, String token, L
4242
}
4343

4444
@Override
45-
public GetDurableExecutionStateResponse getExecutionState(String arn, String marker) {
45+
public GetDurableExecutionStateResponse getExecutionState(String arn, String checkpointToken, String marker) {
4646
return GetDurableExecutionStateResponse.builder()
4747
.operations(operations.values())
4848
.build();

sdk/src/main/java/com/amazonaws/lambda/durable/client/DurableExecutionClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@
1010
public interface DurableExecutionClient {
1111
CheckpointDurableExecutionResponse checkpoint(String arn, String token, List<OperationUpdate> updates);
1212

13-
GetDurableExecutionStateResponse getExecutionState(String arn, String marker);
13+
GetDurableExecutionStateResponse getExecutionState(String arn, String checkpointToken, String marker);
1414
}

0 commit comments

Comments
 (0)