Skip to content

Commit f3aa4ed

Browse files
authored
[bugfix]: fix polling (#302)
* fix polling * fix exception handling
1 parent 0454942 commit f3aa4ed

3 files changed

Lines changed: 24 additions & 7 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/execution/ExecutionManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,14 @@ public void suspendExecution() {
293293
throw ex;
294294
}
295295

296+
/**
297+
* returns {@code true} if the execution is terminated exceptionally (with a {@link SuspendExecutionException} or an
298+
* unrecoverable error).
299+
*/
300+
public boolean isExecutionCompletedExceptionally() {
301+
return executionExceptionFuture.isCompletedExceptionally();
302+
}
303+
296304
private void stopAllOperations(Exception cause) {
297305
registeredOperations.values().forEach(op -> op.getCompletionFuture().completeExceptionally(cause));
298306
}

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,15 @@ protected void runUserHandler(Runnable runnable, String contextId, ThreadType th
229229
executionManager.setCurrentThreadContext(new ThreadContext(contextId, threadType));
230230
try {
231231
runnable.run();
232+
} catch (Throwable throwable) {
233+
// Operations always wrap the user's function and handles all possible exceptions except for
234+
// SuspendExecutionException.
235+
if (!executionManager.isExecutionCompletedExceptionally()
236+
&& !(throwable instanceof SuspendExecutionException)) {
237+
logger.error("An unhandled exception is thrown from user function: ", throwable);
238+
throw terminateExecutionWithIllegalDurableOperationException(
239+
"An unhandled exception is thrown from user function: " + throwable);
240+
}
232241
} finally {
233242
if (contextId != null) {
234243
try {

sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33
package software.amazon.lambda.durable.operation;
44

5+
import java.time.Instant;
56
import java.util.concurrent.CompletableFuture;
67
import java.util.function.Function;
78
import software.amazon.awssdk.services.lambda.model.ErrorObject;
@@ -74,7 +75,7 @@ protected void replay(Operation existing) {
7475
}
7576
}
7677
// Step is pending retry - Start polling for PENDING -> READY transition
77-
case PENDING -> pollReadyAndExecuteStepLogic(existing, attempt);
78+
case PENDING -> pollReadyAndExecuteStepLogic(existing.stepDetails().nextAttemptTimestamp(), attempt);
7879
// Execute with current attempt
7980
case READY -> executeStepLogic(attempt);
8081
default ->
@@ -83,9 +84,8 @@ protected void replay(Operation existing) {
8384
}
8485
}
8586

86-
private CompletableFuture<Void> pollReadyAndExecuteStepLogic(Operation existing, int attempt) {
87-
var nextAttemptInstant = existing.stepDetails().nextAttemptTimestamp();
88-
return pollForOperationUpdates(nextAttemptInstant)
87+
private void pollReadyAndExecuteStepLogic(Instant nextAttemptInstant, int attempt) {
88+
pollForOperationUpdates(nextAttemptInstant)
8989
.thenCompose(op -> op.status() == OperationStatus.READY
9090
? CompletableFuture.completedFuture(op)
9191
: pollForOperationUpdates(nextAttemptInstant))
@@ -163,19 +163,19 @@ private void handleStepFailure(Throwable exception, int attempt) {
163163

164164
if (isRetryable && retryDecision.shouldRetry()) {
165165
// Send RETRY
166+
var retryDelayInSeconds = Math.toIntExact(retryDecision.delay().toSeconds());
166167
var retryUpdate = OperationUpdate.builder()
167168
.action(OperationAction.RETRY)
168169
.error(errorObject)
169170
.stepOptions(StepOptions.builder()
170171
// RetryDecisions always produce integer number of seconds greater or equals to
171172
// 1 (no sub-second numbers)
172-
.nextAttemptDelaySeconds(
173-
Math.toIntExact(retryDecision.delay().toSeconds()))
173+
.nextAttemptDelaySeconds(retryDelayInSeconds)
174174
.build());
175175
sendOperationUpdate(retryUpdate);
176176

177177
// Poll for READY status and then execute the step again
178-
pollReadyAndExecuteStepLogic(getOperation(), attempt + 1);
178+
pollReadyAndExecuteStepLogic(Instant.now().plusSeconds(retryDelayInSeconds), attempt + 1);
179179
} else {
180180
// Send FAIL - retries exhausted
181181
var failUpdate =

0 commit comments

Comments
 (0)