Skip to content

Commit 5acdf2f

Browse files
committed
add test cases for virtual thread pool
1 parent 5e5f31c commit 5acdf2f

4 files changed

Lines changed: 173 additions & 1 deletion

File tree

examples/pom.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,32 @@
7979
</dependency>
8080
</dependencies>
8181

82+
<profiles>
83+
<profile>
84+
<id>exclude-java21-code-on-lower-jdk</id>
85+
<activation>
86+
<!-- Activates if JDK is strictly lower than 21 -->
87+
<jdk>(,21)</jdk>
88+
</activation>
89+
<build>
90+
<plugins>
91+
<plugin>
92+
<groupId>org.apache.maven.plugins</groupId>
93+
<artifactId>maven-compiler-plugin</artifactId>
94+
<!-- Version 3.11.0+ recommended for best support -->
95+
<version>3.13.0</version>
96+
<configuration>
97+
<excludes>
98+
<!-- Path is relative to src/main/java -->
99+
<exclude>software/amazon/lambda/durable/examples/vt</exclude>
100+
</excludes>
101+
</configuration>
102+
</plugin>
103+
</plugins>
104+
</build>
105+
</profile>
106+
</profiles>
107+
82108
<build>
83109
<plugins>
84110
<plugin>
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.vt;
4+
5+
import java.time.Duration;
6+
import java.util.ArrayList;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.TimeUnit;
9+
import software.amazon.lambda.durable.DurableConfig;
10+
import software.amazon.lambda.durable.DurableContext;
11+
import software.amazon.lambda.durable.DurableFuture;
12+
import software.amazon.lambda.durable.DurableHandler;
13+
14+
/**
15+
* Performance test example demonstrating concurrent async steps.
16+
*
17+
* <p>This example tests the SDK's ability to handle many concurrent operations:
18+
*
19+
* <ul>
20+
* <li>Creates async steps in a loop
21+
* <li>Each step performs a simple computation
22+
* <li>All results are collected using {@link DurableFuture#allOf}
23+
* </ul>
24+
*/
25+
public class ManyAsyncStepsVirtualThreadPoolExample
26+
extends DurableHandler<
27+
ManyAsyncStepsVirtualThreadPoolExample.Input, ManyAsyncStepsVirtualThreadPoolExample.Output> {
28+
29+
public record Input(int multiplier, int steps) {}
30+
31+
public record Output(long result, long executionTimeMs, long replayTimeMs) {}
32+
33+
@Override
34+
public Output handleRequest(Input input, DurableContext context) {
35+
var startTime = System.nanoTime();
36+
var multiplier = input.multiplier();
37+
var steps = input.steps();
38+
var logger = context.getLogger();
39+
40+
logger.info("Starting {} async steps with multiplier {}", steps, multiplier);
41+
42+
// Create async steps
43+
var futures = new ArrayList<DurableFuture<Integer>>(steps);
44+
for (var i = 0; i < steps; i++) {
45+
var index = i;
46+
var future = context.stepAsync("compute-" + i, Integer.class, stepCtx -> index * multiplier);
47+
futures.add(future);
48+
}
49+
50+
logger.info("All {} async steps created, collecting results", steps);
51+
52+
// Collect all results using allOf
53+
var results = DurableFuture.allOf(futures);
54+
var totalSum = results.stream().mapToInt(Integer::intValue).sum();
55+
56+
// checkpoint the executionTime so that we can have the same value when replay
57+
var executionTimeMs = context.step(
58+
"execution-time", Long.class, stepCtx -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
59+
logger.info("Completed {} steps, total sum: {}, execution time: {}ms", steps, totalSum, executionTimeMs);
60+
61+
// Wait 2 seconds to test replay
62+
context.wait("post-compute-wait", Duration.ofSeconds(2));
63+
64+
var replayTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
65+
66+
return new Output(totalSum, executionTimeMs, replayTimeMs);
67+
}
68+
69+
@Override
70+
protected DurableConfig createConfiguration() {
71+
// Add a small checkpoint delay to help batch the checkpoint requests and reduce the overall latencies
72+
// when the function has many concurrent operations
73+
return DurableConfig.builder()
74+
.withCheckpointDelay(Duration.ofMillis(10))
75+
.withExecutorService(Executors.newVirtualThreadPerTaskExecutor())
76+
.build();
77+
}
78+
}

examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ void testManyAsyncStepsExample(int steps, long maxExecutionTime, long maxReplayT
544544
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
545545

546546
var finalResult = result.getResult();
547-
System.out.printf("ManyAsyncStepsExample result (%d steps): %s\n", steps, finalResult);
547+
System.out.printf("ManyAsyncStepsVirtualThreadPoolExample result (%d steps): %s\n", steps, finalResult);
548548
assertNotNull(finalResult);
549549
assertEquals((long) steps * (steps - 1), finalResult.result()); // Sum of 0..steps * 2
550550

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples.vt;
4+
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
import static org.junit.jupiter.api.Assertions.assertNotNull;
7+
8+
import org.junit.jupiter.api.Test;
9+
import org.junit.jupiter.api.condition.EnabledForJreRange;
10+
import org.junit.jupiter.api.condition.JRE;
11+
import software.amazon.lambda.durable.model.ExecutionStatus;
12+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
13+
14+
@EnabledForJreRange(min = JRE.JAVA_21)
15+
class ManyAsyncStepsVirtualTheadPoolExampleTest {
16+
17+
@Test
18+
void testManyAsyncSteps() {
19+
var handler = new ManyAsyncStepsVirtualThreadPoolExample();
20+
var runner = LocalDurableTestRunner.create(ManyAsyncStepsVirtualThreadPoolExample.Input.class, handler);
21+
22+
var input = new ManyAsyncStepsVirtualThreadPoolExample.Input(2, 500);
23+
var result = runner.runUntilComplete(input);
24+
25+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
26+
27+
var output = result.getResult(ManyAsyncStepsVirtualThreadPoolExample.Output.class);
28+
assertNotNull(output);
29+
30+
// Sum of 0..499 * 2 = 499 * 500 / 2 * 2 = 249500
31+
assertEquals(
32+
249500,
33+
result.getResult(ManyAsyncStepsVirtualThreadPoolExample.Output.class)
34+
.result());
35+
}
36+
37+
@Test
38+
void testManyAsyncStepsWithDefaultMultiplier() {
39+
var handler = new ManyAsyncStepsVirtualThreadPoolExample();
40+
var runner = LocalDurableTestRunner.create(ManyAsyncStepsVirtualThreadPoolExample.Input.class, handler);
41+
42+
var input = new ManyAsyncStepsVirtualThreadPoolExample.Input(1, 500);
43+
var result = runner.runUntilComplete(input);
44+
45+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
46+
47+
// Sum of 0..499 = 499 * 500 / 2 = 124750
48+
assertEquals(
49+
124750,
50+
result.getResult(ManyAsyncStepsVirtualThreadPoolExample.Output.class)
51+
.result());
52+
}
53+
54+
@Test
55+
void testOperationsAreTracked() {
56+
var handler = new ManyAsyncStepsVirtualThreadPoolExample();
57+
var runner = LocalDurableTestRunner.create(ManyAsyncStepsVirtualThreadPoolExample.Input.class, handler);
58+
59+
var result = runner.runUntilComplete(new ManyAsyncStepsVirtualThreadPoolExample.Input(1, 500));
60+
61+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
62+
63+
// Verify some operations are tracked
64+
assertNotNull(result.getOperation("compute-0"));
65+
assertNotNull(result.getOperation("compute-499"));
66+
assertNotNull(result.getOperation("compute-250"));
67+
}
68+
}

0 commit comments

Comments
 (0)