Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import static org.junit.jupiter.api.Assertions.*;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.config.StepConfig;
Expand Down Expand Up @@ -157,7 +158,7 @@ void testAtLeastOnceReExecutesAfterCheckpointFailure() {
}

@Test
void testAtMostOnceThrowsExceptionAfterCheckpointFailure() {
void testAtMostOnceNoRetryFailsAfterCheckpointFailure() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
Expand All @@ -170,6 +171,7 @@ void testAtMostOnceThrowsExceptionAfterCheckpointFailure() {
},
StepConfig.builder()
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
.build());
});

Expand All @@ -183,4 +185,33 @@ void testAtMostOnceThrowsExceptionAfterCheckpointFailure() {
assertEquals(ExecutionStatus.FAILED, result.getStatus());
assertEquals(1, executionCount.get());
}

@Test
void testAtMostOnceRetriesAfterCheckpointFailure() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
return context.step(
"step",
String.class,
stepCtx -> {
var count = executionCount.incrementAndGet();
return "Executed " + count + " times";
},
StepConfig.builder()
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.retryStrategy(RetryStrategies.fixedDelay(3, Duration.ofSeconds(1)))
.build());
});

runner.run("test");
assertEquals(1, executionCount.get());

runner.resetCheckpointToStarted("step");
var result = runner.runUntilComplete("test");

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(2, executionCount.get());
assertEquals("Executed 2 times", result.getResult(String.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,9 @@ private void handleStepFailure(Throwable exception, int attempt) {
errorObject = serializeException(exception);
}

var isRetryable = !(exception instanceof StepInterruptedException);
var retryDecision = config.retryStrategy().makeRetryDecision(exception, attempt);

if (isRetryable && retryDecision.shouldRetry()) {
if (retryDecision.shouldRetry()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this change leading to retry and break the at-most-once semantic?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this keeps the at-most-once guarantee scoped to each retry attempt.

For a replayed STARTED at-most-once step, we still do not call executeStepLogic for that same attempt. The interrupted attempt is converted through the retry strategy into either RETRY/PENDING or FAIL. User code only runs again after the operation becomes READY, which is the next retry attempt.

That is what the two tests are intended to pin down: testAtMostOnceNoRetryFailsAfterCheckpointFailure proves the started attempt is not re-executed when retries are disabled, and testAtMostOnceRetriesAfterCheckpointFailure proves a configured retry creates exactly one next-attempt execution.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a breaking change to users who rely on the existing behavior.

// Send RETRY
var retryDelayInSeconds = Math.toIntExact(retryDecision.delay().toSeconds());
var retryUpdate = OperationUpdate.builder()
Expand Down
Loading