Skip to content

Commit 73c482c

Browse files
authored
[refactor] split execute method (#132)
1 parent ea5c08e commit 73c482c

7 files changed

Lines changed: 198 additions & 145 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,23 @@ public OperationType getType() {
100100
return operationType;
101101
}
102102

103-
/** Starts the operation. Returns immediately after starting background work or checkpointing. Does not block. */
104-
public abstract void execute();
103+
/** Starts the operation, processes the operation updates from backend. Does not block. */
104+
public void execute() {
105+
var existing = getOperation();
106+
107+
if (existing != null) {
108+
validateReplay(existing);
109+
replay(existing);
110+
} else {
111+
start();
112+
}
113+
}
114+
115+
/** Starts the operation. */
116+
protected abstract void start();
117+
118+
/** Replays the operation. */
119+
protected abstract void replay(Operation existing);
105120

106121
/**
107122
* Gets the Operation from ExecutionManager and update the replay state from REPLAY to EXECUTE if operation is not

sdk/src/main/java/software/amazon/lambda/durable/operation/CallbackOperation.java

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package software.amazon.lambda.durable.operation;
44

55
import software.amazon.awssdk.services.lambda.model.CallbackOptions;
6+
import software.amazon.awssdk.services.lambda.model.Operation;
67
import software.amazon.awssdk.services.lambda.model.OperationAction;
78
import software.amazon.awssdk.services.lambda.model.OperationType;
89
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
@@ -34,44 +35,40 @@ public String callbackId() {
3435
return callbackId;
3536
}
3637

38+
/** Starts the operation. */
3739
@Override
38-
public void execute() {
39-
var existing = getOperation();
40+
protected void start() {
41+
// First execution: checkpoint and get callback ID
42+
var update = OperationUpdate.builder().action(OperationAction.START).callbackOptions(buildCallbackOptions());
4043

41-
if (existing != null) {
42-
validateReplay(existing);
43-
}
44-
45-
if (existing != null && existing.callbackDetails() != null) {
46-
// Replay: use existing callback ID
47-
callbackId = existing.callbackDetails().callbackId();
44+
sendOperationUpdate(update);
4845

49-
switch (existing.status()) {
50-
case SUCCEEDED, FAILED, TIMED_OUT -> {
51-
// Terminal state - complete the operation immediately
52-
markAlreadyCompleted();
53-
return;
54-
}
55-
case STARTED -> {
56-
// Still waiting - continue to polling
57-
}
58-
default ->
59-
terminateExecutionWithIllegalDurableOperationException(
60-
"Unexpected callback status: " + existing.status());
61-
}
62-
} else {
63-
// First execution: checkpoint and get callback ID
64-
var update =
65-
OperationUpdate.builder().action(OperationAction.START).callbackOptions(buildCallbackOptions());
46+
// Get the callback ID from the updated operation
47+
var op = getOperation();
48+
callbackId = op.callbackDetails().callbackId();
6649

67-
sendOperationUpdate(update);
50+
pollForOperationUpdates();
51+
}
6852

69-
// Get the callback ID from the updated operation
70-
var op = getOperation();
71-
callbackId = op.callbackDetails().callbackId();
53+
/** Replays the operation. */
54+
@Override
55+
protected void replay(Operation existing) {
56+
// Replay: use existing callback ID
57+
callbackId = existing.callbackDetails().callbackId();
58+
59+
switch (existing.status()) {
60+
case SUCCEEDED, FAILED, TIMED_OUT -> {
61+
// Terminal state - complete the operation immediately
62+
markAlreadyCompleted();
63+
return;
64+
}
65+
case STARTED -> {
66+
// Still waiting - continue to polling
67+
}
68+
default ->
69+
terminateExecutionWithIllegalDurableOperationException(
70+
"Unexpected callback status: " + existing.status());
7271
}
73-
74-
// Start polling for callback completion (delay first poll to allow suspension)
7572
pollForOperationUpdates();
7673
}
7774

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.function.Function;
1010
import software.amazon.awssdk.services.lambda.model.ContextOptions;
1111
import software.amazon.awssdk.services.lambda.model.ErrorObject;
12+
import software.amazon.awssdk.services.lambda.model.Operation;
1213
import software.amazon.awssdk.services.lambda.model.OperationAction;
1314
import software.amazon.awssdk.services.lambda.model.OperationStatus;
1415
import software.amazon.awssdk.services.lambda.model.OperationType;
@@ -51,34 +52,34 @@ public ChildContextOperation(
5152
this.userExecutor = getContext().getDurableConfig().getExecutorService();
5253
}
5354

55+
/** Starts the operation. */
5456
@Override
55-
public void execute() {
56-
var existing = getOperation();
57-
58-
if (existing != null) {
59-
validateReplay(existing);
60-
switch (existing.status()) {
61-
case SUCCEEDED -> {
62-
if (existing.contextDetails() != null
63-
&& Boolean.TRUE.equals(existing.contextDetails().replayChildren())) {
64-
// Large result: re-execute child context to reconstruct result
65-
replayChildContext = true;
66-
executeChildContext();
67-
} else {
68-
markAlreadyCompleted();
69-
}
57+
protected void start() {
58+
// First execution: fire-and-forget START checkpoint, then run
59+
sendOperationUpdateAsync(
60+
OperationUpdate.builder().action(OperationAction.START).subType(RUN_IN_CHILD_CONTEXT.getValue()));
61+
executeChildContext();
62+
}
63+
64+
/** Replays the operation. */
65+
@Override
66+
protected void replay(Operation existing) {
67+
switch (existing.status()) {
68+
case SUCCEEDED -> {
69+
if (existing.contextDetails() != null
70+
&& Boolean.TRUE.equals(existing.contextDetails().replayChildren())) {
71+
// Large result: re-execute child context to reconstruct result
72+
replayChildContext = true;
73+
executeChildContext();
74+
} else {
75+
markAlreadyCompleted();
7076
}
71-
case FAILED -> markAlreadyCompleted();
72-
case STARTED -> executeChildContext();
73-
default ->
74-
terminateExecutionWithIllegalDurableOperationException(
75-
"Unexpected child context status: " + existing.status());
7677
}
77-
} else {
78-
// First execution: fire-and-forget START checkpoint, then run
79-
sendOperationUpdateAsync(
80-
OperationUpdate.builder().action(OperationAction.START).subType(RUN_IN_CHILD_CONTEXT.getValue()));
81-
executeChildContext();
78+
case FAILED -> markAlreadyCompleted();
79+
case STARTED -> executeChildContext();
80+
default ->
81+
terminateExecutionWithIllegalDurableOperationException(
82+
"Unexpected child context status: " + existing.status());
8283
}
8384
}
8485

sdk/src/main/java/software/amazon/lambda/durable/operation/InvokeOperation.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package software.amazon.lambda.durable.operation;
44

55
import software.amazon.awssdk.services.lambda.model.ChainedInvokeOptions;
6+
import software.amazon.awssdk.services.lambda.model.Operation;
67
import software.amazon.awssdk.services.lambda.model.OperationAction;
78
import software.amazon.awssdk.services.lambda.model.OperationType;
89
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
@@ -37,25 +38,23 @@ public InvokeOperation(
3738
this.payloadSerDes = config.payloadSerDes() != null ? config.payloadSerDes() : config.serDes();
3839
}
3940

40-
/** Starts the operation. Returns immediately after starting background work or checkpointing. Does not block. */
41+
/** Starts the operation. */
4142
@Override
42-
public void execute() {
43-
var existing = getOperation();
44-
if (existing == null) {
45-
// first execution
46-
startInvocation();
47-
pollForOperationUpdates();
48-
} else {
49-
validateReplay(existing);
50-
// replay
51-
switch (existing.status()) {
52-
// The result isn't ready. Need to wait more
53-
case STARTED -> pollForOperationUpdates();
54-
case SUCCEEDED, FAILED, TIMED_OUT, STOPPED -> markAlreadyCompleted();
55-
default ->
56-
terminateExecutionWithIllegalDurableOperationException(
57-
"Unexpected invoke status: " + existing.statusAsString());
58-
}
43+
protected void start() {
44+
startInvocation();
45+
pollForOperationUpdates();
46+
}
47+
48+
/** Replays the operation. */
49+
@Override
50+
protected void replay(Operation existing) {
51+
switch (existing.status()) {
52+
// The result isn't ready. Need to wait more
53+
case STARTED -> pollForOperationUpdates();
54+
case SUCCEEDED, FAILED, TIMED_OUT, STOPPED -> markAlreadyCompleted();
55+
default ->
56+
terminateExecutionWithIllegalDurableOperationException(
57+
"Unexpected invoke status: " + existing.statusAsString());
5958
}
6059
}
6160

sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.concurrent.ExecutorService;
77
import java.util.function.Function;
88
import software.amazon.awssdk.services.lambda.model.ErrorObject;
9+
import software.amazon.awssdk.services.lambda.model.Operation;
910
import software.amazon.awssdk.services.lambda.model.OperationAction;
1011
import software.amazon.awssdk.services.lambda.model.OperationStatus;
1112
import software.amazon.awssdk.services.lambda.model.OperationType;
@@ -45,39 +46,35 @@ public StepOperation(
4546
this.userExecutor = durableContext.getDurableConfig().getExecutorService();
4647
}
4748

49+
/** Starts the operation. */
4850
@Override
49-
public void execute() {
50-
var existing = getOperation();
51-
52-
if (existing != null) {
53-
validateReplay(existing);
54-
// This means we are in a replay scenario
55-
switch (existing.status()) {
56-
case SUCCEEDED, FAILED -> markAlreadyCompleted();
57-
case STARTED -> {
58-
var attempt = existing.stepDetails().attempt() != null
59-
? existing.stepDetails().attempt()
60-
: 0;
61-
if (config.semantics() == StepSemantics.AT_MOST_ONCE_PER_RETRY) {
62-
// AT_MOST_ONCE: treat as interrupted, go through retry logic
63-
handleStepFailure(new StepInterruptedException(existing), attempt + 1);
64-
} else {
65-
// AT_LEAST_ONCE: re-execute the step
66-
executeStepLogic(attempt);
67-
}
51+
protected void start() {
52+
executeStepLogic(0);
53+
}
54+
55+
/** Replays the operation. */
56+
@Override
57+
protected void replay(Operation existing) {
58+
switch (existing.status()) {
59+
case SUCCEEDED, FAILED -> markAlreadyCompleted();
60+
case STARTED -> {
61+
var attempt = existing.stepDetails().attempt() != null
62+
? existing.stepDetails().attempt()
63+
: 0;
64+
if (config.semantics() == StepSemantics.AT_MOST_ONCE_PER_RETRY) {
65+
// AT_MOST_ONCE: treat as interrupted, go through retry logic
66+
handleStepFailure(new StepInterruptedException(existing), attempt + 1);
67+
} else {
68+
// AT_LEAST_ONCE: re-execute the step
69+
executeStepLogic(attempt);
6870
}
69-
// Step is pending retry - Start polling for PENDING -> READY transition
70-
case PENDING ->
71-
pollReadyAndExecuteStepLogic(existing.stepDetails().attempt());
72-
// Execute with current attempt
73-
case READY -> executeStepLogic(existing.stepDetails().attempt());
74-
default ->
75-
terminateExecutionWithIllegalDurableOperationException(
76-
"Unexpected step status: " + existing.status());
7771
}
78-
} else {
79-
// First execution
80-
executeStepLogic(0);
72+
// Step is pending retry - Start polling for PENDING -> READY transition
73+
case PENDING -> pollReadyAndExecuteStepLogic(existing.stepDetails().attempt());
74+
// Execute with current attempt
75+
case READY -> executeStepLogic(existing.stepDetails().attempt());
76+
default ->
77+
terminateExecutionWithIllegalDurableOperationException("Unexpected step status: " + existing.status());
8178
}
8279
}
8380

sdk/src/main/java/software/amazon/lambda/durable/operation/WaitOperation.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.time.Instant;
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
9+
import software.amazon.awssdk.services.lambda.model.Operation;
910
import software.amazon.awssdk.services.lambda.model.OperationAction;
1011
import software.amazon.awssdk.services.lambda.model.OperationStatus;
1112
import software.amazon.awssdk.services.lambda.model.OperationType;
@@ -30,36 +31,40 @@ public WaitOperation(String operationId, String name, Duration duration, Durable
3031
this.duration = duration;
3132
}
3233

34+
/** Starts the operation. */
3335
@Override
34-
public void execute() {
35-
var existing = getOperation();
36+
protected void start() {
3637
Duration remainingWaitTime = duration;
3738

38-
if (existing != null) {
39-
validateReplay(existing);
40-
if (existing.status() == OperationStatus.SUCCEEDED) {
41-
// Wait already completed
42-
markAlreadyCompleted();
43-
return;
44-
}
45-
// Replay - calculate remaining time from scheduledEndTimestamp
46-
// TODO: if the checkpoint is slow remaining wait time might be off. Track
47-
// endTimestamp instead and move calculation in front of polling start.
48-
if (existing.waitDetails() != null && existing.waitDetails().scheduledEndTimestamp() != null) {
49-
remainingWaitTime =
50-
Duration.between(Instant.now(), existing.waitDetails().scheduledEndTimestamp());
51-
}
52-
} else {
53-
// First execution - checkpoint with full duration
54-
var update = OperationUpdate.builder()
55-
.action(OperationAction.START)
56-
.waitOptions(WaitOptions.builder()
57-
.waitSeconds((int) duration.toSeconds())
58-
.build());
39+
// First execution - checkpoint with full duration
40+
var update = OperationUpdate.builder()
41+
.action(OperationAction.START)
42+
.waitOptions(WaitOptions.builder()
43+
.waitSeconds((int) duration.toSeconds())
44+
.build());
5945

60-
sendOperationUpdate(update);
61-
}
46+
sendOperationUpdate(update);
47+
logger.debug("Remaining wait time: {} seconds", remainingWaitTime.getSeconds());
48+
pollForOperationUpdates(remainingWaitTime);
49+
}
6250

51+
/** Replays the operation. */
52+
@Override
53+
protected void replay(Operation existing) {
54+
Duration remainingWaitTime = duration;
55+
56+
if (existing.status() == OperationStatus.SUCCEEDED) {
57+
// Wait already completed
58+
markAlreadyCompleted();
59+
return;
60+
}
61+
// Replay - calculate remaining time from scheduledEndTimestamp
62+
// TODO: if the checkpoint is slow remaining wait time might be off. Track
63+
// endTimestamp instead and move calculation in front of polling start.
64+
if (existing.waitDetails() != null && existing.waitDetails().scheduledEndTimestamp() != null) {
65+
remainingWaitTime =
66+
Duration.between(Instant.now(), existing.waitDetails().scheduledEndTimestamp());
67+
}
6368
logger.debug("Remaining wait time: {} seconds", remainingWaitTime.getSeconds());
6469
pollForOperationUpdates(remainingWaitTime);
6570
}

0 commit comments

Comments
 (0)