Skip to content

Commit 03269fc

Browse files
authored
Add child context to concurrency operation (#224)
1 parent 993bc55 commit 03269fc

3 files changed

Lines changed: 39 additions & 1 deletion

File tree

sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public abstract class ConcurrencyOperation<T> extends BaseDurableOperation<T> {
5757
private final Set<String> completedOperations = Collections.synchronizedSet(new HashSet<String>());
5858
private ConcurrencyCompletionStatus completionStatus;
5959
private OperationIdGenerator operationIdGenerator;
60+
private final DurableContextImpl rootContext;
6061

6162
protected ConcurrencyOperation(
6263
OperationIdentifier operationIdentifier,
@@ -73,6 +74,7 @@ protected ConcurrencyOperation(
7374
this.toleratedFailureCount = toleratedFailureCount;
7475
this.failureRateThreshold = failureRateThreshold;
7576
this.operationIdGenerator = new OperationIdGenerator(getOperationId());
77+
this.rootContext = durableContext.createChildContext(getOperationId(), getName());
7678
}
7779

7880
protected ConcurrencyOperation(
@@ -142,7 +144,7 @@ public <R> ChildContextOperation<R> addItem(
142144
String name, Function<DurableContext, R> function, TypeToken<R> resultType, SerDes serDes) {
143145
if (isOperationCompleted()) throw new IllegalStateException("Cannot add items to a completed operation");
144146
var operationId = this.operationIdGenerator.nextOperationId();
145-
var childOp = createItem(operationId, name, function, resultType, serDes, getContext());
147+
var childOp = createItem(operationId, name, function, resultType, serDes, this.rootContext);
146148
childOperations.add(childOp);
147149
pendingQueue.add(childOp);
148150
logger.debug("Item added {}", name);

sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class ConcurrencyOperationTest {
3838
private static final TypeToken<Void> RESULT_TYPE = TypeToken.get(Void.class);
3939

4040
private DurableContextImpl durableContext;
41+
private DurableContextImpl childContext;
4142
private ExecutionManager executionManager;
4243
private AtomicInteger operationIdCounter;
4344
private OperationIdGenerator mockIdGenerator;
@@ -48,11 +49,20 @@ void setUp() {
4849
executionManager = mock(ExecutionManager.class);
4950
operationIdCounter = new AtomicInteger(0);
5051

52+
var childContext = mock(DurableContextImpl.class);
53+
this.childContext = childContext;
54+
when(childContext.getExecutionManager()).thenReturn(executionManager);
55+
when(childContext.getDurableConfig())
56+
.thenReturn(DurableConfig.builder()
57+
.withExecutorService(Executors.newCachedThreadPool())
58+
.build());
59+
5160
when(durableContext.getExecutionManager()).thenReturn(executionManager);
5261
when(durableContext.getDurableConfig())
5362
.thenReturn(DurableConfig.builder()
5463
.withExecutorService(Executors.newCachedThreadPool())
5564
.build());
65+
when(durableContext.createChildContext(anyString(), anyString())).thenReturn(childContext);
5666
when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext("Root", ThreadType.CONTEXT));
5767
mockIdGenerator = mock(OperationIdGenerator.class);
5868
when(mockIdGenerator.nextOperationId()).thenAnswer(inv -> "child-" + operationIdCounter.incrementAndGet());
@@ -167,6 +177,18 @@ void singleChildAlreadySucceeds_fullCycle() throws Exception {
167177
assertFalse(functionCalled.get(), "Function should not be called during SUCCEEDED replay");
168178
}
169179

180+
@Test
181+
void addItem_usesRootChildContextAsParent() throws Exception {
182+
var op = createOperation(-1, -1, 0);
183+
184+
op.addItem("branch-1", ctx -> "result", TypeToken.get(String.class), SER_DES);
185+
186+
// rootContext is created via durableContext.createChildContext(...) in the constructor,
187+
// so the parentContext passed to createItem must be that child context, not durableContext itself
188+
assertNotSame(durableContext, op.getLastParentContext());
189+
assertSame(childContext, op.getLastParentContext());
190+
}
191+
170192
// ===== Helpers =====
171193

172194
private void runJoin(TestConcurrencyOperation op) throws InterruptedException {
@@ -182,6 +204,7 @@ static class TestConcurrencyOperation extends ConcurrencyOperation<Void> {
182204
private boolean successHandled = false;
183205
private boolean failureHandled = false;
184206
private final AtomicInteger executingCount = new AtomicInteger(0);
207+
private DurableContextImpl lastParentContext;
185208

186209
TestConcurrencyOperation(
187210
OperationIdentifier operationIdentifier,
@@ -209,6 +232,7 @@ protected <R> ChildContextOperation<R> createItem(
209232
TypeToken<R> resultType,
210233
SerDes serDes,
211234
DurableContextImpl parentContext) {
235+
lastParentContext = parentContext;
212236
return new ChildContextOperation<R>(
213237
OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.PARALLEL_BRANCH),
214238
function,
@@ -260,5 +284,9 @@ boolean isSuccessHandled() {
260284
boolean isFailureHandled() {
261285
return failureHandled;
262286
}
287+
288+
DurableContextImpl getLastParentContext() {
289+
return lastParentContext;
290+
}
263291
}
264292
}

sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,19 @@ void setUp() {
4747
executionManager = mock(ExecutionManager.class);
4848
operationIdCounter = new AtomicInteger(0);
4949

50+
var childContext = mock(DurableContextImpl.class);
51+
when(childContext.getExecutionManager()).thenReturn(executionManager);
52+
when(childContext.getDurableConfig())
53+
.thenReturn(DurableConfig.builder()
54+
.withExecutorService(Executors.newCachedThreadPool())
55+
.build());
56+
5057
when(durableContext.getExecutionManager()).thenReturn(executionManager);
5158
when(durableContext.getDurableConfig())
5259
.thenReturn(DurableConfig.builder()
5360
.withExecutorService(Executors.newCachedThreadPool())
5461
.build());
62+
when(durableContext.createChildContext(anyString(), anyString())).thenReturn(childContext);
5563
when(executionManager.getCurrentThreadContext()).thenReturn(new ThreadContext("Root", ThreadType.CONTEXT));
5664
// Default: no existing operations (fresh execution)
5765
mockIdGenerator = mock(OperationIdGenerator.class);

0 commit comments

Comments
 (0)