Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ void testStepInterruptedExceptionForAtMostOnceAfterCheckpointLoss() {
return "result";
},
StepConfig.builder()
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
.build());
});

Expand Down Expand Up @@ -199,7 +200,8 @@ void testStepInterruptedExceptionCanBeCaughtForRecovery() {
return "payment-success";
},
StepConfig.builder()
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
.build());
} catch (StepInterruptedException e) {
// Recovery: check external status and return verified result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
Expand All @@ -18,29 +18,6 @@ class StepSemanticsIntegrationTest {
void testAtLeastOnceCompletesSuccessfully() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(
String.class,
(input, ctx) -> ctx.step(
"my-step",
String.class,
stepCtx -> {
executionCount.incrementAndGet();
return "result";
},
StepConfig.builder()
.semantics(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
.build()));

var result = runner.run("test-input");

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(1, executionCount.get());
}

@Test
void testSemanticsPerRetry_atLeastOnceCompletesSuccessfully() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(
String.class,
(input, ctx) -> ctx.step(
Expand All @@ -64,29 +41,6 @@ void testSemanticsPerRetry_atLeastOnceCompletesSuccessfully() {
void testAtMostOnceCompletesSuccessfully() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(
String.class,
(input, ctx) -> ctx.step(
"my-step",
String.class,
stepCtx -> {
executionCount.incrementAndGet();
return "result";
},
StepConfig.builder()
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.build()));

var result = runner.run("test-input");

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(1, executionCount.get());
}

@Test
void testSemanticsPerRetry_atMostOnceCompletesSuccessfully() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(
String.class,
(input, ctx) -> ctx.step(
Expand All @@ -110,30 +64,6 @@ void testSemanticsPerRetry_atMostOnceCompletesSuccessfully() {
void testAtMostOnceNoRetryFailsImmediately() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(
String.class,
(input, ctx) -> ctx.step(
"my-step",
String.class,
stepCtx -> {
executionCount.incrementAndGet();
throw new RuntimeException("Always fails");
},
StepConfig.builder()
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
.build()));

var result = runner.run("test-input");

assertEquals(ExecutionStatus.FAILED, result.getStatus());
assertEquals(1, executionCount.get());
}

@Test
void testSemanticsPerRetry_atMostOnceNoRetryFailsImmediately() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(
String.class,
(input, ctx) -> ctx.step(
Expand All @@ -155,7 +85,7 @@ void testSemanticsPerRetry_atMostOnceNoRetryFailsImmediately() {
}

@Test
void testDefaultSemanticsIsAtLeastOnce() {
void testDefaultSemanticsPerRetryIsAtLeastOnce() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(
Expand All @@ -175,46 +105,18 @@ void testDefaultSemanticsIsAtLeastOnce() {
void testAtLeastOnceReExecutesAfterCheckpointLoss() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
return context.step(
"step",
String.class,
stepCtx -> {
var count = executionCount.incrementAndGet();
return "Executed " + count + " times";
},
StepConfig.builder()
.semantics(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
.build());
});

runner.run("test");
assertEquals(1, executionCount.get());

runner.simulateFireAndForgetCheckpointLoss("step");

var result = runner.run("test");

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(2, executionCount.get());
}

@Test
void testSemanticsPerRetry_atLeastOnceReExecutesAfterCheckpointLoss() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
return context.step(
"step",
String.class,
stepCtx -> {
var count = executionCount.incrementAndGet();
return "Executed " + count + " times";
},
StepConfig.builder()
.semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
.build());
});
var runner = LocalDurableTestRunner.create(
String.class,
(input, context) -> context.step(
"step",
String.class,
stepCtx -> {
var count = executionCount.incrementAndGet();
return "Executed " + count + " times";
},
StepConfig.builder()
.semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
.build()));

runner.run("test");
assertEquals(1, executionCount.get());
Expand All @@ -231,45 +133,18 @@ void testSemanticsPerRetry_atLeastOnceReExecutesAfterCheckpointLoss() {
void testAtLeastOnceReExecutesAfterCheckpointFailure() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
return context.step(
"step",
String.class,
stepCtx -> {
var count = executionCount.incrementAndGet();
return "Executed " + count + " times";
},
StepConfig.builder()
.semantics(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
.build());
});

runner.run("test");
assertEquals(1, executionCount.get());

runner.resetCheckpointToStarted("step");
var result = runner.runUntilComplete("test");

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
assertEquals(2, executionCount.get());
}

@Test
void testSemanticsPerRetry_atLeastOnceReExecutesAfterCheckpointFailure() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
return context.step(
"step",
String.class,
stepCtx -> {
var count = executionCount.incrementAndGet();
return "Executed " + count + " times";
},
StepConfig.builder()
.semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
.build());
});
var runner = LocalDurableTestRunner.create(
String.class,
(input, context) -> context.step(
"step",
String.class,
stepCtx -> {
var count = executionCount.incrementAndGet();
return "Executed " + count + " times";
},
StepConfig.builder()
.semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
.build()));

runner.run("test");
assertEquals(1, executionCount.get());
Expand All @@ -281,53 +156,22 @@ void testSemanticsPerRetry_atLeastOnceReExecutesAfterCheckpointFailure() {
assertEquals(2, executionCount.get());
}

// This behavior is incorrect (the step should retry after interruption), but is kept for backward
// compatibility. The deprecated StepConfig.semantics() method preserves this behavior.
// Use StepConfig.semanticsPerRetry() for the corrected behavior (see below test).
@Test
void testAtMostOnceThrowsExceptionAfterCheckpointFailure_deprecatedBackwardCompatibility() {
void testAtMostOnceRetriesAfterInterruption() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
return context.step(
"step",
String.class,
stepCtx -> {
executionCount.incrementAndGet();
return "Should not re-execute";
},
StepConfig.builder()
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.build());
});

runner.run("test");
assertEquals(1, executionCount.get());

runner.resetCheckpointToStarted("step");

var result = runner.run("test");

assertEquals(ExecutionStatus.FAILED, result.getStatus());
assertEquals(1, executionCount.get());
}

@Test
void testSemanticsPerRetry_atMostOnceRetriesAfterInterruption() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
return context.step(
"step",
String.class,
stepCtx -> {
executionCount.incrementAndGet();
return "result";
},
StepConfig.builder()
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.build());
});
var runner = LocalDurableTestRunner.create(
String.class,
(input, context) -> context.step(
"step",
String.class,
stepCtx -> {
executionCount.incrementAndGet();
return "result";
},
StepConfig.builder()
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
.build()));

runner.run("test");
assertEquals(1, executionCount.get());
Expand Down
Loading
Loading