Skip to content

Commit 91cb6ba

Browse files
committed
fix(sdk): Deprecate StepSemantics, add StepSemanticsPerRetry with correct AT_MOST_ONCE behaviour
1 parent d8fd15d commit 91cb6ba

5 files changed

Lines changed: 236 additions & 8 deletions

File tree

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/StepSemanticsIntegrationTest.java

Lines changed: 158 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.junit.jupiter.api.Test;
99
import software.amazon.lambda.durable.config.StepConfig;
1010
import software.amazon.lambda.durable.config.StepSemantics;
11+
import software.amazon.lambda.durable.config.StepSemanticsPerRetry;
1112
import software.amazon.lambda.durable.model.ExecutionStatus;
1213
import software.amazon.lambda.durable.retry.RetryStrategies;
1314
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
@@ -37,6 +38,29 @@ void testAtLeastOnceCompletesSuccessfully() {
3738
assertEquals(1, executionCount.get());
3839
}
3940

41+
@Test
42+
void testSemanticsPerRetry_atLeastOnceCompletesSuccessfully() {
43+
var executionCount = new AtomicInteger(0);
44+
45+
var runner = LocalDurableTestRunner.create(
46+
String.class,
47+
(input, ctx) -> ctx.step(
48+
"my-step",
49+
String.class,
50+
stepCtx -> {
51+
executionCount.incrementAndGet();
52+
return "result";
53+
},
54+
StepConfig.builder()
55+
.semanticsPerRetry(StepSemanticsPerRetry.AT_LEAST_ONCE_PER_RETRY)
56+
.build()));
57+
58+
var result = runner.run("test-input");
59+
60+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
61+
assertEquals(1, executionCount.get());
62+
}
63+
4064
@Test
4165
void testAtMostOnceCompletesSuccessfully() {
4266
var executionCount = new AtomicInteger(0);
@@ -60,6 +84,29 @@ void testAtMostOnceCompletesSuccessfully() {
6084
assertEquals(1, executionCount.get());
6185
}
6286

87+
@Test
88+
void testSemanticsPerRetry_atMostOnceCompletesSuccessfully() {
89+
var executionCount = new AtomicInteger(0);
90+
91+
var runner = LocalDurableTestRunner.create(
92+
String.class,
93+
(input, ctx) -> ctx.step(
94+
"my-step",
95+
String.class,
96+
stepCtx -> {
97+
executionCount.incrementAndGet();
98+
return "result";
99+
},
100+
StepConfig.builder()
101+
.semanticsPerRetry(StepSemanticsPerRetry.AT_MOST_ONCE_PER_RETRY)
102+
.build()));
103+
104+
var result = runner.run("test-input");
105+
106+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
107+
assertEquals(1, executionCount.get());
108+
}
109+
63110
@Test
64111
void testAtMostOnceNoRetryFailsImmediately() {
65112
var executionCount = new AtomicInteger(0);
@@ -84,6 +131,30 @@ void testAtMostOnceNoRetryFailsImmediately() {
84131
assertEquals(1, executionCount.get());
85132
}
86133

134+
@Test
135+
void testSemanticsPerRetry_atMostOnceNoRetryFailsImmediately() {
136+
var executionCount = new AtomicInteger(0);
137+
138+
var runner = LocalDurableTestRunner.create(
139+
String.class,
140+
(input, ctx) -> ctx.step(
141+
"my-step",
142+
String.class,
143+
stepCtx -> {
144+
executionCount.incrementAndGet();
145+
throw new RuntimeException("Always fails");
146+
},
147+
StepConfig.builder()
148+
.semanticsPerRetry(StepSemanticsPerRetry.AT_MOST_ONCE_PER_RETRY)
149+
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
150+
.build()));
151+
152+
var result = runner.run("test-input");
153+
154+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
155+
assertEquals(1, executionCount.get());
156+
}
157+
87158
@Test
88159
void testDefaultSemanticsIsAtLeastOnce() {
89160
var executionCount = new AtomicInteger(0);
@@ -129,6 +200,34 @@ void testAtLeastOnceReExecutesAfterCheckpointLoss() {
129200
assertEquals(2, executionCount.get());
130201
}
131202

203+
@Test
204+
void testSemanticsPerRetry_atLeastOnceReExecutesAfterCheckpointLoss() {
205+
var executionCount = new AtomicInteger(0);
206+
207+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
208+
return context.step(
209+
"step",
210+
String.class,
211+
stepCtx -> {
212+
var count = executionCount.incrementAndGet();
213+
return "Executed " + count + " times";
214+
},
215+
StepConfig.builder()
216+
.semanticsPerRetry(StepSemanticsPerRetry.AT_LEAST_ONCE_PER_RETRY)
217+
.build());
218+
});
219+
220+
runner.run("test");
221+
assertEquals(1, executionCount.get());
222+
223+
runner.simulateFireAndForgetCheckpointLoss("step");
224+
225+
var result = runner.run("test");
226+
227+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
228+
assertEquals(2, executionCount.get());
229+
}
230+
132231
@Test
133232
void testAtLeastOnceReExecutesAfterCheckpointFailure() {
134233
var executionCount = new AtomicInteger(0);
@@ -157,7 +256,37 @@ void testAtLeastOnceReExecutesAfterCheckpointFailure() {
157256
}
158257

159258
@Test
160-
void testAtMostOnceThrowsExceptionAfterCheckpointFailure() {
259+
void testSemanticsPerRetry_atLeastOnceReExecutesAfterCheckpointFailure() {
260+
var executionCount = new AtomicInteger(0);
261+
262+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
263+
return context.step(
264+
"step",
265+
String.class,
266+
stepCtx -> {
267+
var count = executionCount.incrementAndGet();
268+
return "Executed " + count + " times";
269+
},
270+
StepConfig.builder()
271+
.semanticsPerRetry(StepSemanticsPerRetry.AT_LEAST_ONCE_PER_RETRY)
272+
.build());
273+
});
274+
275+
runner.run("test");
276+
assertEquals(1, executionCount.get());
277+
278+
runner.resetCheckpointToStarted("step");
279+
var result = runner.runUntilComplete("test");
280+
281+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
282+
assertEquals(2, executionCount.get());
283+
}
284+
285+
// This behavior is incorrect (the step should retry after interruption), but is kept for backward
286+
// compatibility. The deprecated StepSemantics.AT_MOST_ONCE_PER_RETRY preserves this behavior.
287+
// Use StepSemanticsPerRetry.AT_MOST_ONCE_PER_RETRY for the corrected behavior (see below test).
288+
@Test
289+
void testAtMostOnceThrowsExceptionAfterCheckpointFailure_deprecatedBackwardCompatibility() {
161290
var executionCount = new AtomicInteger(0);
162291

163292
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
@@ -183,4 +312,32 @@ void testAtMostOnceThrowsExceptionAfterCheckpointFailure() {
183312
assertEquals(ExecutionStatus.FAILED, result.getStatus());
184313
assertEquals(1, executionCount.get());
185314
}
315+
316+
@Test
317+
void testSemanticsPerRetry_atMostOnceRetriesAfterInterruption() {
318+
var executionCount = new AtomicInteger(0);
319+
320+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
321+
return context.step(
322+
"step",
323+
String.class,
324+
stepCtx -> {
325+
executionCount.incrementAndGet();
326+
return "result";
327+
},
328+
StepConfig.builder()
329+
.semanticsPerRetry(StepSemanticsPerRetry.AT_MOST_ONCE_PER_RETRY)
330+
.build());
331+
});
332+
333+
runner.run("test");
334+
assertEquals(1, executionCount.get());
335+
336+
runner.resetCheckpointToStarted("step");
337+
338+
var result = runner.runUntilComplete("test");
339+
340+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
341+
assertEquals(2, executionCount.get());
342+
}
186343
}

sdk/src/main/java/software/amazon/lambda/durable/config/StepConfig.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
public class StepConfig {
1616
private final RetryStrategy retryStrategy;
1717
private final StepSemantics semantics;
18+
private final StepSemanticsPerRetry semanticsPerRetry;
1819
private final SerDes serDes;
1920

2021
private StepConfig(Builder builder) {
2122
this.retryStrategy = builder.retryStrategy;
2223
this.semantics = builder.semantics;
24+
this.semanticsPerRetry = builder.semanticsPerRetry;
2325
this.serDes = builder.serDes;
2426
}
2527

@@ -28,18 +30,28 @@ public RetryStrategy retryStrategy() {
2830
return retryStrategy != null ? retryStrategy : RetryStrategies.Presets.DEFAULT;
2931
}
3032

31-
/** Returns the delivery semantics for this step, defaults to AT_LEAST_ONCE_PER_RETRY if not specified. */
33+
/**
34+
* Returns the delivery semantics for this step, defaults to AT_LEAST_ONCE_PER_RETRY if not specified.
35+
*
36+
* @deprecated Use {@link #semanticsPerRetry()} instead.
37+
*/
38+
@Deprecated(forRemoval = true)
3239
public StepSemantics semantics() {
3340
return semantics != null ? semantics : StepSemantics.AT_LEAST_ONCE_PER_RETRY;
3441
}
3542

43+
/** Returns the delivery semantics per retry for this step, or null if not specified. */
44+
public StepSemanticsPerRetry semanticsPerRetry() {
45+
return semanticsPerRetry;
46+
}
47+
3648
/** Returns the custom serializer for this step, or null if not specified (uses default SerDes). */
3749
public SerDes serDes() {
3850
return serDes;
3951
}
4052

4153
public Builder toBuilder() {
42-
return new Builder(retryStrategy, semantics, serDes);
54+
return new Builder(retryStrategy, semantics, semanticsPerRetry, serDes);
4355
}
4456

4557
/**
@@ -48,18 +60,24 @@ public Builder toBuilder() {
4860
* @return a new Builder instance
4961
*/
5062
public static Builder builder() {
51-
return new Builder(null, null, null);
63+
return new Builder(null, null, null, null);
5264
}
5365

5466
/** Builder for creating StepConfig instances. */
5567
public static class Builder {
5668
private RetryStrategy retryStrategy;
5769
private StepSemantics semantics;
70+
private StepSemanticsPerRetry semanticsPerRetry;
5871
private SerDes serDes;
5972

60-
public Builder(RetryStrategy retryStrategy, StepSemantics semantics, SerDes serDes) {
73+
public Builder(
74+
RetryStrategy retryStrategy,
75+
StepSemantics semantics,
76+
StepSemanticsPerRetry semanticsPerRetry,
77+
SerDes serDes) {
6178
this.retryStrategy = retryStrategy;
6279
this.semantics = semantics;
80+
this.semanticsPerRetry = semanticsPerRetry;
6381
this.serDes = serDes;
6482
}
6583

@@ -79,12 +97,27 @@ public Builder retryStrategy(RetryStrategy retryStrategy) {
7997
*
8098
* @param semantics the delivery semantics to use, defaults to AT_LEAST_ONCE_PER_RETRY if not specified
8199
* @return this builder for method chaining
100+
* @deprecated Use {@link #semanticsPerRetry(StepSemanticsPerRetry)} instead.
82101
*/
102+
@Deprecated(forRemoval = true)
83103
public Builder semantics(StepSemantics semantics) {
84104
this.semantics = semantics;
85105
return this;
86106
}
87107

108+
/**
109+
* Sets the delivery semantics per retry for the step.
110+
*
111+
* <p>If set, this takes precedence over {@link #semantics(StepSemantics)}.
112+
*
113+
* @param semanticsPerRetry the delivery semantics to use
114+
* @return this builder for method chaining
115+
*/
116+
public Builder semanticsPerRetry(StepSemanticsPerRetry semanticsPerRetry) {
117+
this.semanticsPerRetry = semanticsPerRetry;
118+
return this;
119+
}
120+
88121
/**
89122
* Sets a custom serializer for the step.
90123
*

sdk/src/main/java/software/amazon/lambda/durable/config/StepSemantics.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66
* Delivery semantics for step operations.
77
*
88
* <p>Controls how the SDK handles step execution and interruption recovery.
9+
*
10+
* @deprecated Use {@link StepSemanticsPerRetry} instead. This enum will be removed in a future major version.
911
*/
12+
@Deprecated(forRemoval = true)
1013
public enum StepSemantics {
1114
/**
1215
* At-least-once delivery (default). The step may be re-executed if interrupted. START checkpoint is
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.config;
4+
5+
/**
6+
* Delivery semantics for step operations.
7+
*
8+
* <p>Controls how the SDK handles step execution and interruption recovery.
9+
*
10+
* @see StepConfig.Builder#semanticsPerRetry(StepSemanticsPerRetry)
11+
*/
12+
public enum StepSemanticsPerRetry {
13+
/**
14+
* At-least-once delivery (default). The step may be re-executed if interrupted. START checkpoint is
15+
* fire-and-forget.
16+
*/
17+
AT_LEAST_ONCE_PER_RETRY,
18+
19+
/**
20+
* At-most-once delivery per retry attempt. The step will not be re-executed if interrupted. START checkpoint is
21+
* awaited before user code runs. If interrupted, throws
22+
* {@link software.amazon.lambda.durable.exception.StepInterruptedException}.
23+
*/
24+
AT_MOST_ONCE_PER_RETRY
25+
}

sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import software.amazon.lambda.durable.TypeToken;
1616
import software.amazon.lambda.durable.config.StepConfig;
1717
import software.amazon.lambda.durable.config.StepSemantics;
18+
import software.amazon.lambda.durable.config.StepSemanticsPerRetry;
1819
import software.amazon.lambda.durable.context.DurableContextImpl;
1920
import software.amazon.lambda.durable.exception.DurableOperationException;
2021
import software.amazon.lambda.durable.exception.StepFailedException;
@@ -66,7 +67,7 @@ protected void replay(Operation existing) {
6667
switch (existing.status()) {
6768
case SUCCEEDED, FAILED -> markAlreadyCompleted();
6869
case STARTED -> {
69-
if (config.semantics() == StepSemantics.AT_MOST_ONCE_PER_RETRY) {
70+
if (isAtMostOnce()) {
7071
// AT_MOST_ONCE: treat as interrupted, go through retry logic
7172
handleStepFailure(new StepInterruptedException(existing), attempt);
7273
} else {
@@ -128,7 +129,7 @@ private void checkpointStarted() {
128129
if (existing == null || existing.status() != OperationStatus.STARTED) {
129130
var startUpdate = OperationUpdate.builder().action(OperationAction.START);
130131

131-
if (config.semantics() == StepSemantics.AT_MOST_ONCE_PER_RETRY) {
132+
if (isAtMostOnce()) {
132133
// AT_MOST_ONCE: await START checkpoint before executing user code
133134
sendOperationUpdate(startUpdate);
134135
} else {
@@ -165,7 +166,8 @@ private void handleStepFailure(Throwable exception, int attempt) {
165166
errorObject = serializeException(exception);
166167
}
167168

168-
var isRetryable = !(exception instanceof StepInterruptedException);
169+
var isRetryable = !(exception instanceof StepInterruptedException)
170+
|| config.semanticsPerRetry() == StepSemanticsPerRetry.AT_MOST_ONCE_PER_RETRY;
169171
var retryDecision = config.retryStrategy().makeRetryDecision(exception, attempt);
170172

171173
if (isRetryable && retryDecision.shouldRetry()) {
@@ -217,4 +219,12 @@ public T get() {
217219
throw new StepFailedException(op);
218220
}
219221
}
222+
223+
@SuppressWarnings("deprecation")
224+
private boolean isAtMostOnce() {
225+
if (config.semanticsPerRetry() != null) {
226+
return config.semanticsPerRetry() == StepSemanticsPerRetry.AT_MOST_ONCE_PER_RETRY;
227+
}
228+
return config.semantics() == StepSemantics.AT_MOST_ONCE_PER_RETRY;
229+
}
220230
}

0 commit comments

Comments
 (0)