Skip to content

Commit 03dd8c0

Browse files
wangyb-AAlex Wang
andauthored
feature: ConcurrencyOperation and Parallel (#201)
* Add parallel design - Implement ConcurrencyOperation - Implement ParallelOperation, ParallelContext * Add parallel examples and unit tests --------- Co-authored-by: Alex Wang <wangyb@amazon.com>
1 parent bc57488 commit 03dd8c0

18 files changed

Lines changed: 1825 additions & 0 deletions

File tree

docs/core/parallel.md

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# Parallel Operations Design Plan
2+
3+
## Overview
4+
5+
Add parallel execution capability to the AWS Lambda Durable Execution SDK, allowing multiple branches to run concurrently within a single durable function execution.
6+
7+
## API Design
8+
9+
### User Interface
10+
11+
```java
12+
try (var parallelContext = ctx.parallel(ParallelConfig.builder().build())) {
13+
DurableFuture<Boolean> task1 = parallelContext.branch("validate", Boolean.class, branchContext -> validate());
14+
DurableFuture<String> task2 = parallelContext.branch("process", String.class, branchContext -> process());
15+
parallelContext.join(); // Wait for completion based on config
16+
17+
// Access results
18+
Boolean validated = task1.get();
19+
String processed = task2.get();
20+
}
21+
```
22+
23+
### Core Components
24+
25+
#### 1. ParallelConfig
26+
Configuration object controlling parallel execution behavior:
27+
28+
```java
29+
ParallelConfig config = ParallelConfig.builder()
30+
.maxConcurrency(5) // Max branches running simultaneously
31+
.minSuccessful(3) // Minimum successful branches required (-1 = all)
32+
.toleratedFailureCount(2) // Max failures before stopping execution
33+
.build();
34+
```
35+
36+
**Configuration Rules:**
37+
- `maxConcurrency`: Controls resource usage, prevents overwhelming the system
38+
- `minSuccessful`: Enables "best effort" scenarios where not all branches need to succeed
39+
- `toleratedFailureCount`: Fail-fast behavior when too many branches fail
40+
41+
#### 2. ParallelContext
42+
Manages the lifecycle of parallel branches:
43+
44+
```java
45+
public class ParallelContext implements AutoCloseable {
46+
// Create branches
47+
public <T> DurableFuture<T> branch(String name, Class<T> resultType, Function<DurableContext, T> func);
48+
public <T> DurableFuture<T> branch(String name, TypeToken<T> resultType, Function<DurableContext, T> func);
49+
50+
// Wait for completion
51+
public void join();
52+
53+
// AutoCloseable ensures join() is called
54+
public void close();
55+
}
56+
```
57+
58+
#### 3. DurableContext Integration
59+
Add single method to existing `DurableContext`:
60+
61+
```java
62+
public ParallelContext parallel(ParallelConfig config);
63+
```
64+
65+
## Implementation Strategy
66+
67+
### 1. Leverage Existing Child Context Infrastructure
68+
69+
Each parallel branch will be implemented as a `ChildContextOperation`:
70+
- **Isolation**: Each branch has its own checkpoint log
71+
- **Replay Safety**: Branches replay independently
72+
- **Error Handling**: Branch failures don't affect other branches directly
73+
74+
### 2. Execution Flow
75+
76+
1. **Branch Registration**: `branch()` calls create `ChildContextOperation` instances but don't execute immediately
77+
2. **Execution Start**: `join()` triggers execution of branches respecting `maxConcurrency`
78+
3. **Concurrency Control**: Use a queue to manage pending branches when `maxConcurrency` is reached
79+
4. **Completion Logic**: Monitor success/failure counts against configuration thresholds
80+
5. **Result Collection**: Return results via `DurableFuture` instances
81+
82+
83+
### 4. Error Handling Strategy
84+
85+
**Branch-Level Failures:**
86+
- Individual branch failures are captured in their respective `DurableFuture`
87+
- Don't immediately fail the entire parallel operation
88+
- Count towards `failureCount` for threshold checking
89+
90+
**Parallel-Level Failures:**
91+
- Exceed `toleratedFailureCount`: Stop starting new branches, wait for running ones
92+
- Insufficient `minSuccessful`: Throw `ParallelExecutionException` after all branches complete
93+
- Configuration validation errors: Fail immediately
94+
95+
## Key Design Decisions
96+
97+
### 1. Build on Child Contexts
98+
- **Pros**: Reuses existing isolation and checkpointing logic
99+
- **Cons**: Each branch has overhead of a separate child context
100+
- **Decision**: Acceptable trade-off for clean isolation and replay safety
101+
102+
### 2. Eager vs Lazy Execution
103+
- **Chosen**: Lazy execution (branches start only on `join()`)
104+
- **Rationale**: Allows all branches to be registered before execution starts, enabling better concurrency planning
105+
106+
### 3. AutoCloseable Pattern
107+
- **Purpose**: Ensures `join()` is called even if user forgets
108+
- **Behavior**: If `close()` is called before `join()`, automatically call `join()`
109+
110+
### 4. Configuration Validation
111+
- Validate at `ParallelConfig.build()` time:
112+
- `maxConcurrency > 0`
113+
- `minSuccessful >= -1` (where -1 means "all")
114+
- `toleratedFailureCount >= 0`
115+
- `minSuccessful + toleratedFailureCount <= total branches` (validated at runtime)
116+
117+
## Implementation Files
118+
119+
### New Files to Create
120+
1. `ParallelConfig.java` - Configuration builder
121+
2. `ParallelContext.java` - User-facing parallel context
122+
3. `operation/ParallelOperation.java` - Core execution logic
123+
4. `exception/ParallelExecutionException.java` - Parallel-specific exceptions
124+
125+
### Files to Modify
126+
1. `DurableContext.java` - Add `parallel()` method
127+
2. `DurableFuture.java` - Ensure compatibility with parallel results (likely no changes needed)
128+
129+
## Testing Strategy
130+
131+
### Unit Tests
132+
- `ParallelConfigTest` - Configuration validation
133+
- `ParallelOperationTest` - Core execution logic with mocked child contexts
134+
135+
### Integration Tests
136+
- Success scenarios with various configurations
137+
- Failure scenarios (exceeding thresholds)
138+
- Concurrency limits
139+
- Replay behavior
140+
141+
### Example Implementation
142+
- `ParallelExample.java` in examples module
143+
- Demonstrate common patterns and error handling
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import software.amazon.lambda.durable.DurableContext;
8+
import software.amazon.lambda.durable.DurableFuture;
9+
import software.amazon.lambda.durable.DurableHandler;
10+
import software.amazon.lambda.durable.ParallelConfig;
11+
12+
/**
13+
* Example demonstrating parallel branch execution with the Durable Execution SDK.
14+
*
15+
* <p>This handler processes a list of items concurrently using {@code context.parallel()}:
16+
*
17+
* <ol>
18+
* <li>Each item is processed in its own branch (child context)
19+
* <li>All branches run concurrently and their results are collected
20+
* <li>A final step combines the results into a summary
21+
* </ol>
22+
*
23+
* <p>The {@link software.amazon.lambda.durable.ParallelContext} implements {@link AutoCloseable}, so try-with-resources
24+
* guarantees {@code join()} is called even if an exception occurs.
25+
*/
26+
public class ParallelExample extends DurableHandler<ParallelExample.Input, ParallelExample.Output> {
27+
28+
public record Input(List<String> items) {}
29+
30+
public record Output(List<String> results, int totalProcessed) {}
31+
32+
@Override
33+
public Output handleRequest(Input input, DurableContext context) {
34+
var logger = context.getLogger();
35+
var items = input.items();
36+
logger.info("Starting parallel processing of {} items", items.size());
37+
38+
var config = ParallelConfig.builder().build();
39+
40+
var futures = new ArrayList<DurableFuture<String>>(items.size());
41+
42+
try (var parallel = context.parallel("process-items", config)) {
43+
for (var item : items) {
44+
var future = parallel.branch("process-" + item, String.class, branchCtx -> {
45+
branchCtx.getLogger().info("Processing item: {}", item);
46+
return branchCtx.step("transform-" + item, String.class, stepCtx -> item.toUpperCase());
47+
});
48+
futures.add(future);
49+
}
50+
} // join() called here via AutoCloseable
51+
52+
logger.info("All branches complete, collecting results");
53+
54+
var results = futures.stream().map(DurableFuture::get).toList();
55+
56+
return new Output(results, results.size());
57+
}
58+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.examples;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
import java.util.List;
8+
import org.junit.jupiter.api.Test;
9+
import software.amazon.lambda.durable.model.ExecutionStatus;
10+
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
11+
12+
class ParallelExampleTest {
13+
14+
@Test
15+
void testParallelExampleRunsSuccessfully() {
16+
var handler = new ParallelExample();
17+
var runner = LocalDurableTestRunner.create(ParallelExample.Input.class, handler);
18+
19+
var input = new ParallelExample.Input(List.of("apple", "banana", "cherry"));
20+
var result = runner.runUntilComplete(input);
21+
22+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
23+
24+
var output = result.getResult(ParallelExample.Output.class);
25+
assertEquals(3, output.totalProcessed());
26+
assertTrue(output.results().contains("APPLE"));
27+
assertTrue(output.results().contains("BANANA"));
28+
assertTrue(output.results().contains("CHERRY"));
29+
}
30+
31+
@Test
32+
void testParallelExampleWithSingleItem() {
33+
var handler = new ParallelExample();
34+
var runner = LocalDurableTestRunner.create(ParallelExample.Input.class, handler);
35+
36+
var input = new ParallelExample.Input(List.of("hello"));
37+
var result = runner.runUntilComplete(input);
38+
39+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
40+
41+
var output = result.getResult(ParallelExample.Output.class);
42+
assertEquals(1, output.totalProcessed());
43+
assertEquals(List.of("HELLO"), output.results());
44+
}
45+
46+
@Test
47+
void testParallelExampleWithEmptyInput() {
48+
var handler = new ParallelExample();
49+
var runner = LocalDurableTestRunner.create(ParallelExample.Input.class, handler);
50+
51+
var input = new ParallelExample.Input(List.of());
52+
var result = runner.runUntilComplete(input);
53+
54+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
55+
56+
var output = result.getResult(ParallelExample.Output.class);
57+
assertEquals(0, output.totalProcessed());
58+
assertTrue(output.results().isEmpty());
59+
}
60+
}

sdk/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@
6363
<artifactId>mockito-core</artifactId>
6464
<scope>test</scope>
6565
</dependency>
66+
<dependency>
67+
<groupId>org.slf4j</groupId>
68+
<artifactId>slf4j-simple</artifactId>
69+
<version>${slf4j.version}</version>
70+
<scope>test</scope>
71+
</dependency>
6672
</dependencies>
6773

6874
<build>

sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import software.amazon.lambda.durable.operation.ChildContextOperation;
2424
import software.amazon.lambda.durable.operation.InvokeOperation;
2525
import software.amazon.lambda.durable.operation.MapOperation;
26+
import software.amazon.lambda.durable.operation.ParallelOperation;
2627
import software.amazon.lambda.durable.operation.StepOperation;
2728
import software.amazon.lambda.durable.operation.WaitOperation;
2829
import software.amazon.lambda.durable.validation.ParameterValidator;
@@ -596,6 +597,32 @@ public <I, O> DurableFuture<MapResult<O>> mapAsync(
596597
return operation;
597598
}
598599

600+
// ========== parallel methods ==========
601+
602+
/**
603+
* Creates a {@link ParallelContext} for executing multiple branches concurrently.
604+
*
605+
* @param config the parallel execution configuration
606+
* @return a new ParallelContext for registering and executing branches
607+
*/
608+
public ParallelContext parallel(String name, ParallelConfig config) {
609+
Objects.requireNonNull(config, "config cannot be null");
610+
var operationId = nextOperationId();
611+
612+
var parallelOp = new ParallelOperation<>(
613+
OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.PARALLEL),
614+
TypeToken.get(Void.class),
615+
getDurableConfig().getSerDes(),
616+
this,
617+
config.maxConcurrency(),
618+
config.minSuccessful(),
619+
config.toleratedFailureCount());
620+
621+
parallelOp.execute();
622+
623+
return new ParallelContext(parallelOp, this);
624+
}
625+
599626
// ========= waitForCallback methods =============
600627

601628
/**

0 commit comments

Comments
 (0)