|
1 | | -# Parallel Operations Design Plan |
| 1 | +## parallel() – Concurrent Branch Execution |
2 | 2 |
|
3 | | -## Overview |
4 | | - |
5 | | -Add parallel execution capability to the AWS Lambda Durable Execution SDK, allowing multiple branches to run concurrently within a single durable function execution. |
6 | | - |
7 | | -## API Design |
8 | | - |
9 | | -### User Interface |
| 3 | +`parallel()` runs multiple independent branches concurrently, each in its own child context. Branches are registered via `branch()` and execute immediately (respecting `maxConcurrency`). The operation completes when all branches finish or completion criteria are met. |
10 | 4 |
|
11 | 5 | ```java |
12 | | -try (var parallelContext = ctx.parallel(ParallelConfig.builder().build())) { |
13 | | - DurableFuture<Boolean> task1 = parallelContext.branch("validate", Boolean.class, branchContext -> validate()); |
14 | | - DurableFuture<String> task2 = parallelContext.branch("process", String.class, branchContext -> process()); |
15 | | - parallelContext.join(); // Wait for completion based on config |
16 | | - |
17 | | - // Access results |
18 | | - Boolean validated = task1.get(); |
19 | | - String processed = task2.get(); |
20 | | -} |
| 6 | +// Basic parallel execution |
| 7 | +var parallel = ctx.parallel("validate-and-process"); |
| 8 | +DurableFuture<Boolean> task1 = parallel.branch("validate", Boolean.class, branchCtx -> { |
| 9 | + return branchCtx.step("check", Boolean.class, stepCtx -> validate()); |
| 10 | +}); |
| 11 | +DurableFuture<String> task2 = parallel.branch("process", String.class, branchCtx -> { |
| 12 | + return branchCtx.step("work", String.class, stepCtx -> process()); |
| 13 | +}); |
| 14 | + |
| 15 | +// Wait for all branches and get the aggregate result |
| 16 | +ParallelResult result = parallel.get(); |
| 17 | + |
| 18 | +// Access individual branch results |
| 19 | +Boolean validated = task1.get(); |
| 20 | +String processed = task2.get(); |
21 | 21 | ``` |
22 | 22 |
|
23 | | -### Core Components |
24 | | - |
25 | | -#### 1. ParallelConfig |
26 | | -Configuration object controlling parallel execution behavior: |
| 23 | +`ParallelDurableFuture` implements `AutoCloseable` — calling `close()` triggers `get()` if it hasn't been called yet, ensuring all branches complete. |
27 | 24 |
|
28 | 25 | ```java |
29 | | -ParallelConfig config = ParallelConfig.builder() |
30 | | - .maxConcurrency(5) // Max branches running simultaneously |
31 | | - .minSuccessful(3) // Minimum successful branches required (-1 = all) |
32 | | - .toleratedFailureCount(2) // Max failures before stopping execution |
33 | | - .build(); |
| 26 | +// AutoCloseable pattern |
| 27 | +try (var parallel = ctx.parallel("work")) { |
| 28 | + parallel.branch("a", String.class, branchCtx -> branchCtx.step("a1", String.class, stepCtx -> "a")); |
| 29 | + parallel.branch("b", String.class, branchCtx -> branchCtx.step("b1", String.class, stepCtx -> "b")); |
| 30 | +} // close() calls get() automatically |
34 | 31 | ``` |
35 | 32 |
|
36 | | -**Configuration Rules:** |
37 | | -- `maxConcurrency`: Controls resource usage, prevents overwhelming the system |
38 | | -- `minSuccessful`: Enables "best effort" scenarios where not all branches need to succeed |
39 | | -- `toleratedFailureCount`: Fail-fast behavior when too many branches fail |
| 33 | +### ParallelResult |
40 | 34 |
|
41 | | -#### 2. ParallelContext |
42 | | -Manages the lifecycle of parallel branches: |
| 35 | +`ParallelResult` is a summary of the parallel execution: |
43 | 36 |
|
44 | | -```java |
45 | | -public class ParallelContext implements AutoCloseable { |
46 | | - // Create branches |
47 | | - public <T> DurableFuture<T> branch(String name, Class<T> resultType, Function<DurableContext, T> func); |
48 | | - public <T> DurableFuture<T> branch(String name, TypeToken<T> resultType, Function<DurableContext, T> func); |
49 | | - |
50 | | - // Wait for completion |
51 | | - public void join(); |
52 | | - |
53 | | - // AutoCloseable ensures join() is called |
54 | | - public void close(); |
55 | | -} |
56 | | -``` |
| 37 | +| Field | Description | |
| 38 | +|-------|-------------| |
| 39 | +| `size()` | Total number of registered branches | |
| 40 | +| `succeeded()` | Number of branches that succeeded | |
| 41 | +| `failed()` | Number of branches that failed | |
| 42 | +| `completionStatus()` | Why the operation completed (`ALL_COMPLETED`, `MIN_SUCCESSFUL_REACHED`, `FAILURE_TOLERANCE_EXCEEDED`) | |
57 | 43 |
|
58 | | -#### 3. DurableContext Integration |
59 | | -Add single method to existing `DurableContext`: |
60 | | - |
61 | | -```java |
62 | | -public ParallelContext parallel(ParallelConfig config); |
63 | | -``` |
| 44 | +### ParallelConfig |
64 | 45 |
|
65 | | -## Implementation Strategy |
| 46 | +Configure concurrency limits and completion criteria: |
66 | 47 |
|
67 | | -### 1. Leverage Existing Child Context Infrastructure |
68 | | - |
69 | | -Each parallel branch will be implemented as a `ChildContextOperation`: |
70 | | -- **Isolation**: Each branch has its own checkpoint log |
71 | | -- **Replay Safety**: Branches replay independently |
72 | | -- **Error Handling**: Branch failures don't affect other branches directly |
73 | | - |
74 | | -### 2. Execution Flow |
75 | | - |
76 | | -1. **Branch Registration**: `branch()` calls create `ChildContextOperation` instances but don't execute immediately |
77 | | -2. **Execution Start**: `join()` triggers execution of branches respecting `maxConcurrency` |
78 | | -3. **Concurrency Control**: Use a queue to manage pending branches when `maxConcurrency` is reached |
79 | | -4. **Completion Logic**: Monitor success/failure counts against configuration thresholds |
80 | | -5. **Result Collection**: Return results via `DurableFuture` instances |
| 48 | +```java |
| 49 | +var config = ParallelConfig.builder() |
| 50 | + .maxConcurrency(5) // at most 5 branches run at once |
| 51 | + .completionConfig(CompletionConfig.allCompleted()) // default: run all branches |
| 52 | + .build(); |
81 | 53 |
|
| 54 | +var parallel = ctx.parallel("work", config); |
| 55 | +``` |
82 | 56 |
|
83 | | -### 4. Error Handling Strategy |
| 57 | +| Option | Default | Description | |
| 58 | +|--------|---------|-------------| |
| 59 | +| `maxConcurrency` | Unlimited | Maximum branches running simultaneously (must be ≥ 1) | |
| 60 | +| `completionConfig` | `allCompleted()` | Controls when the operation stops starting new branches | |
84 | 61 |
|
85 | | -**Branch-Level Failures:** |
86 | | -- Individual branch failures are captured in their respective `DurableFuture` |
87 | | -- Don't immediately fail the entire parallel operation |
88 | | -- Count towards `failureCount` for threshold checking |
| 62 | +#### CompletionConfig |
89 | 63 |
|
90 | | -**Parallel-Level Failures:** |
91 | | -- Exceed `toleratedFailureCount`: Stop starting new branches, wait for running ones |
92 | | -- Insufficient `minSuccessful`: Throw `ParallelExecutionException` after all branches complete |
93 | | -- Configuration validation errors: Fail immediately |
| 64 | +`CompletionConfig` controls when the parallel operation stops starting new branches: |
94 | 65 |
|
95 | | -## Key Design Decisions |
| 66 | +| Factory Method | Behavior | |
| 67 | +|----------------|----------| |
| 68 | +| `allCompleted()` (default) | All branches run regardless of failures | |
| 69 | +| `allSuccessful()` | Stop if any branch fails (zero failures tolerated) | |
| 70 | +| `firstSuccessful()` | Stop after the first branch succeeds | |
| 71 | +| `minSuccessful(n)` | Stop after `n` branches succeed | |
| 72 | +| `toleratedFailureCount(n)` | Stop after more than `n` failures | |
96 | 73 |
|
97 | | -### 1. Build on Child Contexts |
98 | | -- **Pros**: Reuses existing isolation and checkpointing logic |
99 | | -- **Cons**: Each branch has overhead of a separate child context |
100 | | -- **Decision**: Acceptable trade-off for clean isolation and replay safety |
| 74 | +Note: `toleratedFailurePercentage` is not supported for parallel operations. |
101 | 75 |
|
102 | | -### 2. Eager vs Lazy Execution |
103 | | -- **Chosen**: Lazy execution (branches start only on `join()`) |
104 | | -- **Rationale**: Allows all branches to be registered before execution starts, enabling better concurrency planning |
| 76 | +### ParallelBranchConfig |
105 | 77 |
|
106 | | -### 3. AutoCloseable Pattern |
107 | | -- **Purpose**: Ensures `join()` is called even if user forgets |
108 | | -- **Behavior**: If `close()` is called before `join()`, automatically call `join()` |
| 78 | +Per-branch configuration can be provided: |
109 | 79 |
|
110 | | -### 4. Configuration Validation |
111 | | -- Validate at `ParallelConfig.build()` time: |
112 | | - - `maxConcurrency > 0` |
113 | | - - `minSuccessful >= -1` (where -1 means "all") |
114 | | - - `toleratedFailureCount >= 0` |
115 | | - - `minSuccessful + toleratedFailureCount <= total branches` (validated at runtime) |
| 80 | +```java |
| 81 | +parallel.branch("work", String.class, branchCtx -> doWork(), |
| 82 | + ParallelBranchConfig.builder() |
| 83 | + .serDes(customSerDes) |
| 84 | + .build()); |
| 85 | +``` |
116 | 86 |
|
117 | | -## Implementation Files |
| 87 | +### Error Handling |
118 | 88 |
|
119 | | -### New Files to Create |
120 | | -1. `ParallelConfig.java` - Configuration builder |
121 | | -2. `ParallelContext.java` - User-facing parallel context |
122 | | -3. `operation/ParallelOperation.java` - Core execution logic |
123 | | -4. `exception/ParallelExecutionException.java` - Parallel-specific exceptions |
| 89 | +Branch failures are captured individually. A failed branch throws its exception when you call `get()` on its `DurableFuture`: |
124 | 90 |
|
125 | | -### Files to Modify |
126 | | -1. `DurableContext.java` - Add `parallel()` method |
127 | | -2. `DurableFuture.java` - Ensure compatibility with parallel results (likely no changes needed) |
| 91 | +```java |
| 92 | +var parallel = ctx.parallel("work"); |
| 93 | +var risky = parallel.branch("risky", String.class, branchCtx -> { |
| 94 | + throw new RuntimeException("failed"); |
| 95 | +}); |
| 96 | +var safe = parallel.branch("safe", String.class, branchCtx -> { |
| 97 | + return branchCtx.step("ok", String.class, stepCtx -> "done"); |
| 98 | +}); |
| 99 | + |
| 100 | +ParallelResult result = parallel.get(); |
| 101 | + |
| 102 | +String safeResult = safe.get(); // "done" |
| 103 | +try { |
| 104 | + risky.get(); // throws |
| 105 | +} catch (ParallelBranchFailedException e) { |
| 106 | + // Branch failed and the SDK could not reconstruct the original exception. |
| 107 | + // This happens when: the error info was not checkpointed, the exception |
| 108 | + // class is not on the classpath, or deserialization of the error data |
| 109 | + // failed. The original error type and message are in e.getMessage(). |
| 110 | +} |
| 111 | +``` |
128 | 112 |
|
129 | | -## Testing Strategy |
| 113 | +| Exception | When Thrown | |
| 114 | +|-----------|-------------| |
| 115 | +| `ParallelBranchFailedException` | Branch failed and the original exception could not be reconstructed | |
| 116 | +| User's exception | Branch threw a reconstructable exception — propagated through `get()` | |
130 | 117 |
|
131 | | -### Unit Tests |
132 | | -- `ParallelConfigTest` - Configuration validation |
133 | | -- `ParallelOperationTest` - Core execution logic with mocked child contexts |
| 118 | +### Checkpoint-and-Replay |
134 | 119 |
|
135 | | -### Integration Tests |
136 | | -- Success scenarios with various configurations |
137 | | -- Failure scenarios (exceeding thresholds) |
138 | | -- Concurrency limits |
139 | | -- Replay behavior |
| 120 | +Parallel operations are fully durable. On replay after interruption: |
140 | 121 |
|
141 | | -### Example Implementation |
142 | | -- `ParallelExample.java` in examples module |
143 | | -- Demonstrate common patterns and error handling |
| 122 | +- Completed branches return cached results without re-execution |
| 123 | +- Incomplete branches resume from their last checkpoint |
| 124 | +- Branches that never started execute fresh |
0 commit comments