Skip to content

Commit 66c7047

Browse files
committed
move parentId to base class
1 parent fd4c0be commit 66c7047

7 files changed

Lines changed: 15 additions & 19 deletions

File tree

sdk/src/main/java/com/amazonaws/lambda/durable/DurableExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ public static <I, O> DurableExecutionOutput execute(
7878
},
7979
config.getExecutorService()); // Get executor from config for running user code
8080

81-
// Get suspend future from ExecutionManager. If this future completes, it
82-
// indicates that no threads are active and we can safely suspend.
81+
// Execute the handlerFuture in ExecutionManager. If it completes successfully, the output of user function
82+
// will be returned. Otherwise, it will complete exceptionally with a SuspendExecutionException or a failure.
8383
return executionManager
8484
.execute(handlerFuture)
8585
.handle((result, ex) -> {

sdk/src/main/java/com/amazonaws/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,12 @@ protected void sendOperationUpdate(OperationUpdate.Builder builder) {
218218
}
219219

220220
protected CompletableFuture<Void> sendOperationUpdateAsync(OperationUpdate.Builder builder) {
221-
return executionManager.sendOperationUpdate(
222-
builder.id(operationId).name(name).type(operationType).build());
221+
// todo: add parentId when we support operations in child context
222+
return executionManager.sendOperationUpdate(builder.id(operationId)
223+
.name(name)
224+
.type(operationType)
225+
.parentId(null)
226+
.build());
223227
}
224228

225229
// serialization/deserialization utilities

sdk/src/main/java/com/amazonaws/lambda/durable/operation/CallbackOperation.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,8 @@ public void execute() {
6767
}
6868
} else {
6969
// First execution: checkpoint and get callback ID
70-
var update = OperationUpdate.builder()
71-
.parentId(null)
72-
.action(OperationAction.START)
73-
.callbackOptions(buildCallbackOptions());
70+
var update =
71+
OperationUpdate.builder().action(OperationAction.START).callbackOptions(buildCallbackOptions());
7472

7573
sendOperationUpdate(update);
7674

sdk/src/main/java/com/amazonaws/lambda/durable/operation/InvokeOperation.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ private void waitTimeout() {
7474

7575
private void startInvocation() {
7676
var update = OperationUpdate.builder()
77-
.parentId(null)
7877
.action(OperationAction.START)
7978
.chainedInvokeOptions(ChainedInvokeOptions.builder()
8079
.functionName(functionName)

sdk/src/main/java/com/amazonaws/lambda/durable/operation/StepOperation.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ private void executeStepLogic(int attempt) {
124124
// Check if we need to send START
125125
var existing = getOperation();
126126
if (existing == null || existing.status() != OperationStatus.STARTED) {
127-
var startUpdate = OperationUpdate.builder().parentId(null).action(OperationAction.START);
127+
var startUpdate = OperationUpdate.builder().action(OperationAction.START);
128128

129129
if (config.semantics() == StepSemantics.AT_MOST_ONCE_PER_RETRY) {
130130
// AT_MOST_ONCE: await START checkpoint before executing user code
@@ -140,7 +140,6 @@ private void executeStepLogic(int attempt) {
140140

141141
// Send SUCCEED
142142
var successUpdate = OperationUpdate.builder()
143-
.parentId(null)
144143
.action(OperationAction.SUCCEED)
145144
.payload(serializeResult(result));
146145
sendOperationUpdate(successUpdate);
@@ -180,7 +179,6 @@ private void handleStepFailure(Throwable exception, int attempt) {
180179
if (isRetryable && retryDecision.shouldRetry()) {
181180
// Send RETRY
182181
var retryUpdate = OperationUpdate.builder()
183-
.parentId(null)
184182
.action(OperationAction.RETRY)
185183
.error(errorObject)
186184
.stepOptions(StepOptions.builder()
@@ -199,10 +197,8 @@ private void handleStepFailure(Throwable exception, int attempt) {
199197
pollUntilReady(pendingFuture, nextAttemptTime, Duration.ofMillis(200));
200198
} else {
201199
// Send FAIL - retries exhausted
202-
var failUpdate = OperationUpdate.builder()
203-
.parentId(null)
204-
.action(OperationAction.FAIL)
205-
.error(errorObject);
200+
var failUpdate =
201+
OperationUpdate.builder().action(OperationAction.FAIL).error(errorObject);
206202
sendOperationUpdate(failUpdate);
207203
}
208204
}

sdk/src/main/java/com/amazonaws/lambda/durable/operation/WaitOperation.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ public void execute() {
5050
} else {
5151
// First execution - checkpoint with full duration
5252
var update = OperationUpdate.builder()
53-
.parentId(null)
5453
.action(OperationAction.START)
5554
.waitOptions(WaitOptions.builder()
5655
.waitSeconds((int) duration.toSeconds())

sdk/src/test/java/com/amazonaws/lambda/durable/operation/BaseDurableOperationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public String get() {
120120
@Test
121121
void waitForOperationCompletionWhenRunningAndReadyToComplete() {
122122
Phaser phaser = new Phaser(0);
123-
OperationContext context = new OperationContext("step", ThreadType.STEP);
123+
OperationContext context = new OperationContext("step", ThreadType.CONTEXT);
124124
ExecutionManager executionManager = mock(ExecutionManager.class);
125125
when(executionManager.getCurrentContext()).thenReturn(context);
126126
when(executionManager.startPhaser(OPERATION_ID)).thenReturn(phaser);
@@ -151,7 +151,7 @@ public String get() {
151151
void waitForOperationCompletionWhenAlreadyCompleted() {
152152
Phaser phaser = new Phaser(1);
153153
phaser.arrive(); // completed
154-
OperationContext context = new OperationContext("step", ThreadType.STEP);
154+
OperationContext context = new OperationContext("step", ThreadType.CONTEXT);
155155
ExecutionManager executionManager = mock(ExecutionManager.class);
156156
when(executionManager.getCurrentContext()).thenReturn(context);
157157
when(executionManager.startPhaser(OPERATION_ID)).thenReturn(phaser);

0 commit comments

Comments
 (0)