Skip to content

Commit 75a6da4

Browse files
committed
feat: simplify operation id
1 parent 49cf1e9 commit 75a6da4

15 files changed

Lines changed: 228 additions & 152 deletions

File tree

docs/design-run-in-child-context.md

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,21 @@ This aligns the Java SDK with the TypeScript and Python reference implementation
3131
A child context is a `CONTEXT` operation in the checkpoint log. Its lifecycle:
3232

3333
1. **START** (fire-and-forget) — marks the child context as in-progress
34-
2. Inner operations checkpoint with `parentId` set to the child context's ID
34+
2. Inner operations checkpoint with `parentId` set to the child context's operation ID
3535
3. **SUCCEED** or **FAIL** (blocking) — finalizes the child context
3636

3737
```
3838
Op ID | Parent ID | Type | Action | Payload
3939
------|-----------|---------|---------|--------
4040
3 | null | CONTEXT | START | —
41-
1 | 3 | STEP | START | —
42-
1 | 3 | STEP | SUCCEED | "result"
41+
3-1 | 3 | STEP | START | —
42+
3-1 | 3 | STEP | SUCCEED | "result"
4343
3 | null | CONTEXT | SUCCEED | "final result"
4444
```
4545

46-
For nested child contexts, the `parentId` uses the qualified context path (e.g., `"3:2"` for a child context created as operation `"2"` inside parent context `"3"`).
46+
Inner operation IDs are prefixed with the parent context's operation ID using `-` as separator (e.g., `"3-1"`, `"3-2"`). This matches the JavaScript SDK's `stepPrefix` convention and ensures operation IDs are globally unique — the backend validates type consistency by operation ID, so bare sequential IDs inside child contexts would collide with root-level operations.
47+
48+
For nested child contexts, the prefix chains naturally (e.g., `"3-2-1"` for the first operation inside a nested child context that is operation `"2"` inside parent context `"3"`).
4749

4850
### Replay behavior
4951

@@ -54,28 +56,24 @@ For nested child contexts, the `parentId` uses the qualified context path (e.g.,
5456
| FAILED | Re-throw cached error |
5557
| STARTED | Re-execute (was interrupted mid-flight) |
5658

57-
### Operation scoping
58-
59-
Child contexts restart their operation counter at 1. To avoid ID collisions, `ExecutionManager` uses a composite key:
60-
61-
```java
62-
record OperationKey(String parentId, String operationId) { ... }
63-
```
64-
65-
This applies to the `operations` map, `openPhasers` map, and all checkpoint completion handlers. The backend's `ParentId` field on each `Operation` is the source of truth for scoping.
59+
### Operation ID prefixing
6660

67-
### Nested context ID qualification
61+
To ensure global uniqueness, `DurableContext.nextOperationId()` prefixes operation IDs with the context's `parentId` when inside a child context:
6862

69-
To prevent `OperationKey` collisions when sibling contexts at different nesting levels share the same local operation ID, child contexts build a globally unique `contextId` by qualifying with the parent's context path:
70-
71-
- Root-level child contexts use just their operation ID (e.g., `"3"`)
72-
- Nested child contexts include the parent path (e.g., `"3:2"` for operation `"2"` inside parent context `"3"`)
63+
- Root context: IDs are `"1"`, `"2"`, `"3"` (no prefix)
64+
- Child context `"1"`: IDs are `"1-1"`, `"1-2"`, `"1-3"`
65+
- Nested child context `"1-2"`: IDs are `"1-2-1"`, `"1-2-2"`
7366

7467
```java
75-
var contextId = getParentId() != null ? getParentId() + ":" + getOperationId() : getOperationId();
68+
private String nextOperationId() {
69+
var counter = String.valueOf(operationCounter.incrementAndGet());
70+
return parentId != null ? parentId + "-" + counter : counter;
71+
}
7672
```
7773

78-
This qualified `contextId` is used as the `parentId` for all operations within the child context.
74+
This matches the JavaScript SDK's `_stepPrefix` mechanism. The backend validates type consistency by operation ID alone, so without prefixing, a CONTEXT operation with ID `"1"` and an inner STEP with ID `"1"` (different `parentId`) would trigger an `InvalidParameterValueException`.
75+
76+
`ExecutionManager` still uses plain `String` keys (the globally unique operation ID) for its internal maps, since prefixed IDs are inherently unique across all contexts.
7977

8078
### Thread model
8179

@@ -95,13 +93,14 @@ The `DurableContext` stores its context identity in a `parentId` field — `null
9593

9694
| File | Change |
9795
|------|--------|
98-
| `ChildContextOperation` (new) | Extends `BaseDurableOperation<T>`. Manages child context lifecycle, thread coordination, large result handling. Builds qualified `contextId` for nested contexts (e.g., `"3:2"`). |
96+
| `ChildContextOperation` (new) | Extends `BaseDurableOperation<T>`. Manages child context lifecycle, thread coordination, large result handling. Uses `getOperationId()` directly as `contextId` (already globally unique via prefixed IDs). |
9997
| `ChildContextFailedException` (new) | Extends `DurableOperationException`. Wraps the `Operation` object; extracts error from `contextDetails()`. |
100-
| `DurableContext` | New `runInChildContext`/`runInChildContextAsync` methods. New `createChildContext` factory (skips thread registration). Stores `parentId` field (null for root, contextId for child). Per-context replay tracking via `isReplaying` field. |
98+
| `DurableContext` | New `runInChildContext`/`runInChildContextAsync` methods. New `createChildContext` factory (skips thread registration). Stores `parentId` field (null for root, contextId for child). Per-context replay tracking via `isReplaying` field. `nextOperationId()` prefixes with `parentId` for child contexts (e.g., `"1-1"`). |
10199
| `BaseDurableOperation` | New `parentId` constructor parameter. `sendOperationUpdateAsync` uses it instead of hardcoded `null`. Protected `getParentId()` getter. |
102-
| `ExecutionManager` | `OperationKey` record for composite keys. All maps (`operations`, `openPhasers`) use composite keys. `getOperationAndUpdateReplayState` accepts `parentId`. `startPhaser` takes `parentId` + `operationId`. New `hasOperationsForContext(parentId)` method for per-context replay initialization. |
103-
| `StepOperation` | Thread ID includes parent context: `(parentId != null ? parentId + ":" : "") + operationId + "-step"` |
100+
| `ExecutionManager` | All maps (`operations`, `openPhasers`) use plain `String` keys (globally unique operation IDs). `getOperationAndUpdateReplayState` and `startPhaser` take a single `operationId` argument. New `hasOperationsForContext(parentId)` method for per-context replay initialization. |
101+
| `StepOperation` | Thread ID uses `getOperationId() + "-step"` (operation IDs are globally unique via prefixing). |
104102
| `LocalMemoryExecutionClient` | Handles `CONTEXT` operations (was `throw UnsupportedOperationException`). Propagates `parentId` for all operation types. |
103+
| `HistoryEventProcessor` | Handles `CONTEXT_STARTED`, `CONTEXT_SUCCEEDED`, `CONTEXT_FAILED` events (was `throw UnsupportedOperationException`). Builds `ContextDetails` with result/error extraction. |
105104

106105
## Large result handling
107106

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
AWSTemplateFormatVersion: "2010-09-09"
2+
Transform: AWS::Serverless-2016-10-31
3+
Description: AWS Lambda Durable Execution SDK - ChildContext Example
4+
5+
Parameters:
6+
Architecture:
7+
Type: String
8+
Default: arm64
9+
Description: Lambda Function Architecture
10+
AllowedValues:
11+
- x86_64
12+
- arm64
13+
DockerFile:
14+
Type: String
15+
Default: examples/Dockerfile
16+
Description: path to Dockerfile
17+
FunctionNameSuffix:
18+
Type: String
19+
Default: ''
20+
Description: Suffix to append to all function names
21+
22+
Globals:
23+
Function:
24+
Timeout: 900
25+
MemorySize: 512
26+
Architectures:
27+
- !Ref Architecture
28+
29+
Resources:
30+
ChildContextExampleFunction:
31+
Type: AWS::Serverless::Function
32+
Properties:
33+
PackageType: Image
34+
FunctionName: !Join
35+
- ''
36+
- - 'child-context-example'
37+
- !Ref FunctionNameSuffix
38+
ImageConfig:
39+
Command: ["com.amazonaws.lambda.durable.examples.ChildContextExample::handleRequest"]
40+
DurableConfig:
41+
ExecutionTimeout: 300
42+
RetentionPeriodInDays: 7
43+
Policies:
44+
- Statement:
45+
- Effect: Allow
46+
Action:
47+
- lambda:CheckpointDurableExecutions
48+
- lambda:GetDurableExecutionState
49+
Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:child-context-example${FunctionNameSuffix}"
50+
Metadata:
51+
Dockerfile: !Ref DockerFile
52+
DockerContext: ../
53+
DockerTag: durable-examples
54+
55+
Outputs:
56+
ChildContextExampleFunction:
57+
Description: Child Context Example Function ARN
58+
Value: !GetAtt ChildContextExampleFunction.Arn
59+
60+
ChildContextExampleFunctionName:
61+
Description: Child Context Example Function Name
62+
Value: !Ref ChildContextExampleFunction

sdk-testing/src/main/java/com/amazonaws/lambda/durable/testing/HistoryEventProcessor.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.List;
1010
import software.amazon.awssdk.services.lambda.model.CallbackDetails;
1111
import software.amazon.awssdk.services.lambda.model.ChainedInvokeDetails;
12+
import software.amazon.awssdk.services.lambda.model.ContextDetails;
1213
import software.amazon.awssdk.services.lambda.model.ErrorObject;
1314
import software.amazon.awssdk.services.lambda.model.Event;
1415
import software.amazon.awssdk.services.lambda.model.Operation;
@@ -142,8 +143,26 @@ public <O> TestResult<O> processEvents(List<Event> events, Class<O> outputType)
142143
// Unknown event type - log and ignore gracefully
143144
}
144145

145-
case CONTEXT_STARTED, CONTEXT_SUCCEEDED, CONTEXT_FAILED -> {
146-
throw new UnsupportedOperationException("Context operations currently not supported");
146+
case CONTEXT_STARTED -> {
147+
if (operationId != null) {
148+
operations.putIfAbsent(
149+
operationId,
150+
createContextOperation(operationId, event.name(), OperationStatus.STARTED, event));
151+
}
152+
}
153+
case CONTEXT_SUCCEEDED -> {
154+
if (operationId != null) {
155+
operations.put(
156+
operationId,
157+
createContextOperation(operationId, event.name(), OperationStatus.SUCCEEDED, event));
158+
}
159+
}
160+
case CONTEXT_FAILED -> {
161+
if (operationId != null) {
162+
operations.put(
163+
operationId,
164+
createContextOperation(operationId, event.name(), OperationStatus.FAILED, event));
165+
}
147166
}
148167

149168
case CHAINED_INVOKE_STARTED,
@@ -292,4 +311,28 @@ private Operation createInvokeOperation(String id, Event event) {
292311
.chainedInvokeDetails(builder.build())
293312
.build();
294313
}
314+
315+
private Operation createContextOperation(String id, String name, OperationStatus status, Event event) {
316+
var builder = ContextDetails.builder();
317+
318+
if (event.contextSucceededDetails() != null) {
319+
var details = event.contextSucceededDetails();
320+
if (details.result() != null && details.result().payload() != null) {
321+
builder.result(details.result().payload());
322+
}
323+
} else if (event.contextFailedDetails() != null) {
324+
var details = event.contextFailedDetails();
325+
if (details.error() != null && details.error().payload() != null) {
326+
builder.error(details.error().payload());
327+
}
328+
}
329+
330+
return Operation.builder()
331+
.id(id)
332+
.name(name)
333+
.status(status)
334+
.type(OperationType.CONTEXT)
335+
.contextDetails(builder.build())
336+
.build();
337+
}
295338
}

sdk/src/main/java/com/amazonaws/lambda/durable/DurableContext.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,13 @@ void setExecutionMode() {
319319
this.isReplaying = false;
320320
}
321321

322-
/** Get the next operationId (latest operationId + 1) */
322+
/**
323+
* Get the next operationId. For root contexts, returns sequential IDs like "1", "2", "3". For child contexts,
324+
* prefixes with the parentId to ensure global uniqueness, e.g. "1-1", "1-2" for operations inside child context
325+
* "1". This matches the JavaScript SDK's stepPrefix convention and prevents ID collisions in checkpoint batches.
326+
*/
323327
private String nextOperationId() {
324-
return String.valueOf(operationCounter.incrementAndGet());
328+
var counter = String.valueOf(operationCounter.incrementAndGet());
329+
return parentId != null ? parentId + "-" + counter : counter;
325330
}
326331
}

sdk/src/main/java/com/amazonaws/lambda/durable/execution/ExecutionManager.java

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,25 @@
3737
* <p>This is the single entry point for all execution coordination. Internal coordination (polling, checkpointing) uses
3838
* a dedicated SDK thread pool, while user-defined operations run on a customer-configured executor.
3939
*
40+
* <p>Operations are keyed by their globally unique operation ID. Child context operations use prefixed IDs (e.g.,
41+
* "1-1", "1-2") to avoid collisions with root-level operations.
42+
*
4043
* @see InternalExecutor
4144
*/
4245
public class ExecutionManager {
4346

4447
private static final Logger logger = LoggerFactory.getLogger(ExecutionManager.class);
4548

4649
// ===== Execution State =====
47-
private final Map<OperationKey, Operation> operations;
50+
private final Map<String, Operation> operations;
4851
private final String executionOperationId;
4952
private final String durableExecutionArn;
5053
private final AtomicReference<ExecutionMode> executionMode;
5154

5255
// ===== Thread Coordination =====
5356
private final Map<String, ThreadType> activeThreads = Collections.synchronizedMap(new HashMap<>());
5457
private static final ThreadLocal<OperationContext> currentContext = new ThreadLocal<>();
55-
private final Map<OperationKey, Phaser> openPhasers = Collections.synchronizedMap(new HashMap<>());
58+
private final Map<String, Phaser> openPhasers = Collections.synchronizedMap(new HashMap<>());
5659
private final CompletableFuture<Void> executionExceptionFuture = new CompletableFuture<>();
5760

5861
// ===== Checkpoint Batching =====
@@ -71,7 +74,7 @@ public ExecutionManager(
7174
new CheckpointBatcher(config, durableExecutionArn, checkpointToken, this::onCheckpointComplete);
7275

7376
this.operations = checkpointBatcher.fetchAllPages(initialExecutionState).stream()
74-
.collect(Collectors.toConcurrentMap(OperationKey::fromOperation, op -> op));
77+
.collect(Collectors.toConcurrentMap(Operation::id, op -> op));
7578

7679
// Start in REPLAY mode if we have more than just the initial EXECUTION operation
7780
this.executionMode =
@@ -92,13 +95,13 @@ public boolean isReplaying() {
9295
/** Called by CheckpointManager when a checkpoint completes. Updates state and advances phasers. */
9396
private void onCheckpointComplete(List<Operation> newOperations) {
9497
// Update operation storage
95-
newOperations.forEach(op -> operations.put(OperationKey.fromOperation(op), op));
98+
newOperations.forEach(op -> operations.put(op.id(), op));
9699

97100
// Advance phasers for completed operations
98101
for (Operation operation : newOperations) {
99-
var key = OperationKey.fromOperation(operation);
100-
if (openPhasers.containsKey(key) && isTerminalStatus(operation.status())) {
101-
var phaser = openPhasers.get(key);
102+
var id = operation.id();
103+
if (openPhasers.containsKey(id) && isTerminalStatus(operation.status())) {
104+
var phaser = openPhasers.get(id);
102105

103106
// Two-phase completion
104107
logger.trace("Advancing phaser 0 -> 1: {}", phaser);
@@ -110,28 +113,24 @@ private void onCheckpointComplete(List<Operation> newOperations) {
110113
}
111114

112115
/**
113-
* Gets an operation by parentId and operationId, and updates replay state. Transitions from REPLAY to EXECUTION
114-
* mode if the operation is not found or is not in a terminal state (still in progress).
116+
* Gets an operation by its globally unique operationId, and updates replay state. Transitions from REPLAY to
117+
* EXECUTION mode if the operation is not found or is not in a terminal state (still in progress).
115118
*
116-
* @param parentId the parent context ID (null for root context operations)
117-
* @param operationId the operation ID to get
119+
* @param operationId the globally unique operation ID (e.g., "1" for root, "1-1" for child context)
118120
* @return the existing operation, or null if not found (first execution)
119121
*/
120-
public Operation getOperationAndUpdateReplayState(String parentId, String operationId) {
121-
var key = OperationKey.of(parentId, operationId);
122-
var existing = operations.get(key);
123-
if (executionMode.get() == ExecutionMode.REPLAY) {
124-
if (existing == null || !isTerminalStatus(existing.status())) {
125-
if (executionMode.compareAndSet(ExecutionMode.REPLAY, ExecutionMode.EXECUTION)) {
126-
logger.debug("Transitioned to EXECUTION mode at operation '{}'", operationId);
127-
}
122+
public Operation getOperationAndUpdateReplayState(String operationId) {
123+
var existing = operations.get(operationId);
124+
if (executionMode.get() == ExecutionMode.REPLAY && (existing == null || !isTerminalStatus(existing.status()))) {
125+
if (executionMode.compareAndSet(ExecutionMode.REPLAY, ExecutionMode.EXECUTION)) {
126+
logger.debug("Transitioned to EXECUTION mode at operation '{}'", operationId);
128127
}
129128
}
130129
return existing;
131130
}
132131

133132
public Operation getExecutionOperation() {
134-
return operations.get(OperationKey.of(null, executionOperationId));
133+
return operations.get(executionOperationId);
135134
}
136135

137136
/**
@@ -142,7 +141,7 @@ public Operation getExecutionOperation() {
142141
* @return true if at least one operation exists with the given parentId
143142
*/
144143
public boolean hasOperationsForContext(String parentId) {
145-
return operations.keySet().stream().anyMatch(key -> Objects.equals(key.parentId(), parentId));
144+
return operations.values().stream().anyMatch(op -> Objects.equals(op.parentId(), parentId));
146145
}
147146

148147
// ===== Thread Coordination =====
@@ -202,11 +201,10 @@ public void deregisterActiveThreadAndUnsetCurrentContext(String threadId) {
202201

203202
// ===== Phaser Management =====
204203

205-
public Phaser startPhaser(String parentId, String operationId) {
206-
var key = OperationKey.of(parentId, operationId);
204+
public Phaser startPhaser(String operationId) {
207205
var phaser = new Phaser(1);
208-
openPhasers.put(key, phaser);
209-
logger.trace("Started phaser for operation '{}' (parentId='{}')", operationId, parentId);
206+
openPhasers.put(operationId, phaser);
207+
logger.trace("Started phaser for operation '{}'", operationId);
210208
return phaser;
211209
}
212210

sdk/src/main/java/com/amazonaws/lambda/durable/execution/OperationKey.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

0 commit comments

Comments
 (0)