Skip to content

Commit 7bfcc88

Browse files
authored
feat: durable future allof
* feat: all of * feat: all of
1 parent 55442b1 commit 7bfcc88

3 files changed

Lines changed: 124 additions & 7 deletions

File tree

examples/src/main/java/com/amazonaws/lambda/durable/examples/ManyAsyncStepsExample.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* <ul>
1717
* <li>Creates async steps in a loop
1818
* <li>Each step performs a simple computation
19-
* <li>All results are collected and summed
19+
* <li>All results are collected using {@link DurableFuture#allOf}
2020
* </ul>
2121
*/
2222
public class ManyAsyncStepsExample extends DurableHandler<ManyAsyncStepsExample.Input, String> {
@@ -32,7 +32,7 @@ public String handleRequest(Input input, DurableContext context) {
3232

3333
context.getLogger().info("Starting {} async steps with multiplier {}", STEP_COUNT, multiplier);
3434

35-
// Create 100 async steps
35+
// Create async steps
3636
var futures = new ArrayList<DurableFuture<Integer>>(STEP_COUNT);
3737
for (var i = 0; i < STEP_COUNT; i++) {
3838
var index = i;
@@ -42,11 +42,9 @@ public String handleRequest(Input input, DurableContext context) {
4242

4343
context.getLogger().info("All {} async steps created, collecting results", STEP_COUNT);
4444

45-
// Collect all results
46-
var totalSum = 0L;
47-
for (var future : futures) {
48-
totalSum += future.get();
49-
}
45+
// Collect all results using allOf
46+
var results = DurableFuture.allOf(futures);
47+
var totalSum = results.stream().mapToInt(Integer::intValue).sum();
5048

5149
var executionTimeMs = System.currentTimeMillis() - startTime;
5250
context.getLogger()

sdk/src/main/java/com/amazonaws/lambda/durable/DurableFuture.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
package com.amazonaws.lambda.durable;
44

55
import com.amazonaws.lambda.durable.operation.DurableOperation;
6+
import java.util.Arrays;
7+
import java.util.List;
68

79
public class DurableFuture<T> {
810
private final DurableOperation<T> operation;
@@ -22,4 +24,33 @@ public DurableFuture(DurableOperation<T> operation) {
2224
public T get() {
2325
return operation.get();
2426
}
27+
28+
/**
29+
* Waits for all provided futures to complete and returns their results in order.
30+
*
31+
* <p>The futures are resolved sequentially, but since the underlying operations run concurrently, this effectively
32+
* waits for all operations to complete. During replay, completed operations return immediately.
33+
*
34+
* @param futures the futures to wait for
35+
* @param <T> the result type of the futures
36+
* @return a list of results in the same order as the input futures
37+
*/
38+
@SafeVarargs
39+
public static <T> List<T> allOf(DurableFuture<T>... futures) {
40+
return Arrays.stream(futures).map(DurableFuture::get).toList();
41+
}
42+
43+
/**
44+
* Waits for all provided futures to complete and returns their results in order.
45+
*
46+
* <p>The futures are resolved sequentially, but since the underlying operations run concurrently, this effectively
47+
* waits for all operations to complete. During replay, completed operations return immediately.
48+
*
49+
* @param futures the list of futures to wait for
50+
* @param <T> the result type of the futures
51+
* @return a list of results in the same order as the input futures
52+
*/
53+
public static <T> List<T> allOf(List<DurableFuture<T>> futures) {
54+
return futures.stream().map(DurableFuture::get).toList();
55+
}
2556
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package com.amazonaws.lambda.durable;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
import static org.mockito.Mockito.*;
7+
8+
import com.amazonaws.lambda.durable.operation.DurableOperation;
9+
import java.util.List;
10+
import org.junit.jupiter.api.Test;
11+
12+
class DurableFutureTest {
13+
14+
@Test
15+
void allOfVarargsReturnsResultsInOrder() {
16+
var op1 = mockOperation("first");
17+
var op2 = mockOperation("second");
18+
var op3 = mockOperation("third");
19+
20+
var future1 = new DurableFuture<>(op1);
21+
var future2 = new DurableFuture<>(op2);
22+
var future3 = new DurableFuture<>(op3);
23+
24+
var results = DurableFuture.allOf(future1, future2, future3);
25+
26+
assertEquals(List.of("first", "second", "third"), results);
27+
verify(op1).get();
28+
verify(op2).get();
29+
verify(op3).get();
30+
}
31+
32+
@Test
33+
void allOfListReturnsResultsInOrder() {
34+
var op1 = mockOperation(1);
35+
var op2 = mockOperation(2);
36+
var op3 = mockOperation(3);
37+
38+
var futures = List.of(new DurableFuture<>(op1), new DurableFuture<>(op2), new DurableFuture<>(op3));
39+
40+
var results = DurableFuture.allOf(futures);
41+
42+
assertEquals(List.of(1, 2, 3), results);
43+
}
44+
45+
@Test
46+
void allOfVarargsEmptyReturnsEmptyList() {
47+
var results = DurableFuture.<String>allOf();
48+
49+
assertTrue(results.isEmpty());
50+
}
51+
52+
@Test
53+
void allOfListEmptyReturnsEmptyList() {
54+
var results = DurableFuture.allOf(List.<DurableFuture<String>>of());
55+
56+
assertTrue(results.isEmpty());
57+
}
58+
59+
@Test
60+
void allOfSingleFutureReturnsSingleResult() {
61+
var op = mockOperation("only");
62+
var future = new DurableFuture<>(op);
63+
64+
var results = DurableFuture.allOf(future);
65+
66+
assertEquals(List.of("only"), results);
67+
}
68+
69+
@Test
70+
void allOfPropagatesException() {
71+
var op1 = mockOperation("first");
72+
@SuppressWarnings("unchecked")
73+
DurableOperation<String> op2 = mock(DurableOperation.class);
74+
when(op2.get()).thenThrow(new RuntimeException("Step failed"));
75+
76+
var future1 = new DurableFuture<>(op1);
77+
var future2 = new DurableFuture<>(op2);
78+
79+
assertThrows(RuntimeException.class, () -> DurableFuture.allOf(future1, future2));
80+
}
81+
82+
@SuppressWarnings("unchecked")
83+
private <T> DurableOperation<T> mockOperation(T result) {
84+
DurableOperation<T> op = mock(DurableOperation.class);
85+
when(op.get()).thenReturn(result);
86+
return op;
87+
}
88+
}

0 commit comments

Comments
 (0)