Skip to content

Commit ed2944b

Browse files
committed
fix(sdk): Deprecate StepConfig.semantics, add StepConfig.semanticsPerRetry with correct AT_MOST_ONCE behaviour
1 parent d8fd15d commit ed2944b

9 files changed

Lines changed: 255 additions & 22 deletions

File tree

docs/advanced/error-handling.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ RuntimeException
1818
└── DurableOperationException - General operation exception
1919
├── StepException - General Step exception
2020
│ ├── StepFailedException - Step exhausted all retry attempts. Catch to implement fallback logic or let execution fail.
21-
│ └── StepInterruptedException - `AT_MOST_ONCE` step was interrupted before completion. Implement manual recovery (check if operation completed externally)
21+
│ └── StepInterruptedException - `AT_MOST_ONCE` step was interrupted before completion and retries exhausted. Implement manual recovery (check if operation completed externally)
2222
├── InvokeException - General chained invocation exception
2323
│ ├── InvokeFailedException - Chained invocation failed. Handle the error or propagate failure.
2424
│ ├── InvokeTimedOutException - Chained invocation timed out. Handle the error or propagate failure.
@@ -38,7 +38,7 @@ try {
3838
var result = ctx.step("charge-payment", Payment.class,
3939
stepCtx -> paymentService.charge(amount),
4040
StepConfig.builder()
41-
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
41+
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
4242
.build());
4343
} catch (StepInterruptedException e) {
4444
// Step started but we don't know if it completed

docs/core/steps.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ var result = ctx.step("call-api", Response.class,
1111
stepCtx -> externalApi.call(request),
1212
StepConfig.builder()
1313
.retryStrategy(...)
14-
.semantics(...)
14+
.semanticsPerRetry(...)
1515
.build());
1616
```
1717

@@ -42,7 +42,7 @@ Configure step behavior with `StepConfig`:
4242
ctx.step("my-step", Result.class, stepCtx -> doWork(),
4343
StepConfig.builder()
4444
.retryStrategy(...) // How to handle failures
45-
.semantics(...) // At-least-once vs at-most-once
45+
.semanticsPerRetry(...) // At-least-once vs at-most-once
4646
.serDes(...) // Custom serialization
4747
.build());
4848
```
@@ -73,7 +73,7 @@ Control how steps behave when interrupted mid-execution:
7373
| Semantic | Behavior | Use Case |
7474
|----------|----------|----------|
7575
| `AT_LEAST_ONCE_PER_RETRY` (default) | Re-executes step if interrupted before completion | Idempotent operations (database upserts, API calls with idempotency keys) |
76-
| `AT_MOST_ONCE_PER_RETRY` | Never re-executes; throws `StepInterruptedException` if interrupted | Non-idempotent operations (sending emails, charging payments) |
76+
| `AT_MOST_ONCE_PER_RETRY` | Re-executes step once per retry if interrupted. Throws `StepInterruptedException` if retries exhausted | Non-idempotent operations (sending emails, charging payments) |
7777

7878
```java
7979
// Default: at-least-once per retry (step may re-run if interrupted)
@@ -84,14 +84,14 @@ var result = ctx.step("idempotent-update", Result.class,
8484
var result = ctx.step("send-email", Result.class,
8585
stepCtx -> emailService.send(notification),
8686
StepConfig.builder()
87-
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
87+
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
8888
.build());
8989
```
9090

9191
**Important**: These semantics apply *per retry attempt*, not per overall execution:
9292

9393
- **AT_LEAST_ONCE_PER_RETRY**: The step executes at least once per retry. If the step succeeds but checkpointing fails (e.g., sandbox crash), the step re-executes on replay.
94-
- **AT_MOST_ONCE_PER_RETRY**: A checkpoint is created before execution. If failure occurs after checkpoint but before completion, the step is skipped on replay and `StepInterruptedException` is thrown.
94+
- **AT_MOST_ONCE_PER_RETRY**: A checkpoint is created before execution. If failure occurs after checkpoint but before completion, the step is re-executed on a new retry attempt. `StepInterruptedException` is thrown if retries are exhausted.
9595

9696
To achieve step-level at-most-once semantics, combine with a no-retry strategy:
9797

@@ -100,7 +100,7 @@ To achieve step-level at-most-once semantics, combine with a no-retry strategy:
100100
var result = ctx.step("charge-payment", Result.class,
101101
stepCtx -> paymentService.charge(amount),
102102
StepConfig.builder()
103-
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
103+
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
104104
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
105105
.build());
106106
```

docs/design.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ context.step("name", Type.class, stepCtx -> doWork(),
199199
StepConfig.builder()
200200
.serDes(stepSpecificSerDes)
201201
.retryStrategy(RetryStrategies.exponentialBackoff(3, Duration.ofSeconds(1)))
202-
.semantics(AT_MOST_ONCE_PER_RETRY)
202+
.semanticsPerRetry(AT_MOST_ONCE_PER_RETRY)
203203
.build());
204204
```
205205

examples/src/main/java/software/amazon/lambda/durable/examples/general/ErrorHandlingExample.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public String handleRequest(Object input, DurableContext context) {
7373

7474
// Example 2: Handling StepInterruptedException for AT_MOST_ONCE operations
7575
// StepInterruptedException is thrown when an AT_MOST_ONCE step was started
76-
// but the function was interrupted before the step completed.
76+
// but the function was interrupted before the step completed on every attempt.
7777
// In normal execution, this step succeeds. The catch block handles the
7878
// interruption scenario that occurs during replay after an unexpected termination.
7979
String paymentResult;
@@ -83,7 +83,8 @@ public String handleRequest(Object input, DurableContext context) {
8383
String.class,
8484
stepCtx -> "payment-" + input,
8585
StepConfig.builder()
86-
.semantics(StepSemantics.AT_MOST_ONCE_PER_RETRY)
86+
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
87+
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
8788
.build());
8889
} catch (StepInterruptedException e) {
8990
logger.warn(

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/StepSemanticsIntegrationTest.java

Lines changed: 157 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,29 @@ void testAtLeastOnceCompletesSuccessfully() {
3737
assertEquals(1, executionCount.get());
3838
}
3939

40+
@Test
41+
void testSemanticsPerRetry_atLeastOnceCompletesSuccessfully() {
42+
var executionCount = new AtomicInteger(0);
43+
44+
var runner = LocalDurableTestRunner.create(
45+
String.class,
46+
(input, ctx) -> ctx.step(
47+
"my-step",
48+
String.class,
49+
stepCtx -> {
50+
executionCount.incrementAndGet();
51+
return "result";
52+
},
53+
StepConfig.builder()
54+
.semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
55+
.build()));
56+
57+
var result = runner.run("test-input");
58+
59+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
60+
assertEquals(1, executionCount.get());
61+
}
62+
4063
@Test
4164
void testAtMostOnceCompletesSuccessfully() {
4265
var executionCount = new AtomicInteger(0);
@@ -60,6 +83,29 @@ void testAtMostOnceCompletesSuccessfully() {
6083
assertEquals(1, executionCount.get());
6184
}
6285

86+
@Test
87+
void testSemanticsPerRetry_atMostOnceCompletesSuccessfully() {
88+
var executionCount = new AtomicInteger(0);
89+
90+
var runner = LocalDurableTestRunner.create(
91+
String.class,
92+
(input, ctx) -> ctx.step(
93+
"my-step",
94+
String.class,
95+
stepCtx -> {
96+
executionCount.incrementAndGet();
97+
return "result";
98+
},
99+
StepConfig.builder()
100+
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
101+
.build()));
102+
103+
var result = runner.run("test-input");
104+
105+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
106+
assertEquals(1, executionCount.get());
107+
}
108+
63109
@Test
64110
void testAtMostOnceNoRetryFailsImmediately() {
65111
var executionCount = new AtomicInteger(0);
@@ -84,6 +130,30 @@ void testAtMostOnceNoRetryFailsImmediately() {
84130
assertEquals(1, executionCount.get());
85131
}
86132

133+
@Test
134+
void testSemanticsPerRetry_atMostOnceNoRetryFailsImmediately() {
135+
var executionCount = new AtomicInteger(0);
136+
137+
var runner = LocalDurableTestRunner.create(
138+
String.class,
139+
(input, ctx) -> ctx.step(
140+
"my-step",
141+
String.class,
142+
stepCtx -> {
143+
executionCount.incrementAndGet();
144+
throw new RuntimeException("Always fails");
145+
},
146+
StepConfig.builder()
147+
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
148+
.retryStrategy(RetryStrategies.Presets.NO_RETRY)
149+
.build()));
150+
151+
var result = runner.run("test-input");
152+
153+
assertEquals(ExecutionStatus.FAILED, result.getStatus());
154+
assertEquals(1, executionCount.get());
155+
}
156+
87157
@Test
88158
void testDefaultSemanticsIsAtLeastOnce() {
89159
var executionCount = new AtomicInteger(0);
@@ -129,6 +199,34 @@ void testAtLeastOnceReExecutesAfterCheckpointLoss() {
129199
assertEquals(2, executionCount.get());
130200
}
131201

202+
@Test
203+
void testSemanticsPerRetry_atLeastOnceReExecutesAfterCheckpointLoss() {
204+
var executionCount = new AtomicInteger(0);
205+
206+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
207+
return context.step(
208+
"step",
209+
String.class,
210+
stepCtx -> {
211+
var count = executionCount.incrementAndGet();
212+
return "Executed " + count + " times";
213+
},
214+
StepConfig.builder()
215+
.semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
216+
.build());
217+
});
218+
219+
runner.run("test");
220+
assertEquals(1, executionCount.get());
221+
222+
runner.simulateFireAndForgetCheckpointLoss("step");
223+
224+
var result = runner.run("test");
225+
226+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
227+
assertEquals(2, executionCount.get());
228+
}
229+
132230
@Test
133231
void testAtLeastOnceReExecutesAfterCheckpointFailure() {
134232
var executionCount = new AtomicInteger(0);
@@ -157,7 +255,37 @@ void testAtLeastOnceReExecutesAfterCheckpointFailure() {
157255
}
158256

159257
@Test
160-
void testAtMostOnceThrowsExceptionAfterCheckpointFailure() {
258+
void testSemanticsPerRetry_atLeastOnceReExecutesAfterCheckpointFailure() {
259+
var executionCount = new AtomicInteger(0);
260+
261+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
262+
return context.step(
263+
"step",
264+
String.class,
265+
stepCtx -> {
266+
var count = executionCount.incrementAndGet();
267+
return "Executed " + count + " times";
268+
},
269+
StepConfig.builder()
270+
.semanticsPerRetry(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
271+
.build());
272+
});
273+
274+
runner.run("test");
275+
assertEquals(1, executionCount.get());
276+
277+
runner.resetCheckpointToStarted("step");
278+
var result = runner.runUntilComplete("test");
279+
280+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
281+
assertEquals(2, executionCount.get());
282+
}
283+
284+
// This behavior is incorrect (the step should retry after interruption), but is kept for backward
285+
// compatibility. The deprecated StepConfig.semantics() method preserves this behavior.
286+
// Use StepConfig.semanticsPerRetry() for the corrected behavior (see below test).
287+
@Test
288+
void testAtMostOnceThrowsExceptionAfterCheckpointFailure_deprecatedBackwardCompatibility() {
161289
var executionCount = new AtomicInteger(0);
162290

163291
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
@@ -183,4 +311,32 @@ void testAtMostOnceThrowsExceptionAfterCheckpointFailure() {
183311
assertEquals(ExecutionStatus.FAILED, result.getStatus());
184312
assertEquals(1, executionCount.get());
185313
}
314+
315+
@Test
316+
void testSemanticsPerRetry_atMostOnceRetriesAfterInterruption() {
317+
var executionCount = new AtomicInteger(0);
318+
319+
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
320+
return context.step(
321+
"step",
322+
String.class,
323+
stepCtx -> {
324+
executionCount.incrementAndGet();
325+
return "result";
326+
},
327+
StepConfig.builder()
328+
.semanticsPerRetry(StepSemantics.AT_MOST_ONCE_PER_RETRY)
329+
.build());
330+
});
331+
332+
runner.run("test");
333+
assertEquals(1, executionCount.get());
334+
335+
runner.resetCheckpointToStarted("step");
336+
337+
var result = runner.runUntilComplete("test");
338+
339+
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
340+
assertEquals(2, executionCount.get());
341+
}
186342
}

sdk/src/main/java/software/amazon/lambda/durable/config/StepConfig.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
public class StepConfig {
1616
private final RetryStrategy retryStrategy;
1717
private final StepSemantics semantics;
18+
private final StepSemantics semanticsPerRetry;
1819
private final SerDes serDes;
1920

2021
private StepConfig(Builder builder) {
2122
this.retryStrategy = builder.retryStrategy;
2223
this.semantics = builder.semantics;
24+
this.semanticsPerRetry = builder.semanticsPerRetry;
2325
this.serDes = builder.serDes;
2426
}
2527

@@ -28,18 +30,29 @@ public RetryStrategy retryStrategy() {
2830
return retryStrategy != null ? retryStrategy : RetryStrategies.Presets.DEFAULT;
2931
}
3032

31-
/** Returns the delivery semantics for this step, defaults to AT_LEAST_ONCE_PER_RETRY if not specified. */
33+
/**
34+
* Returns the delivery semantics for this step, defaults to AT_LEAST_ONCE_PER_RETRY if not specified.
35+
*
36+
* @deprecated Use {@link #semanticsPerRetry()} instead. This method has incorrect behavior where
37+
* AT_MOST_ONCE_PER_RETRY does not retry after step interruption.
38+
*/
39+
@Deprecated(forRemoval = true)
3240
public StepSemantics semantics() {
3341
return semantics != null ? semantics : StepSemantics.AT_LEAST_ONCE_PER_RETRY;
3442
}
3543

44+
/** Returns the delivery semantics per retry for this step, or null if not specified. */
45+
public StepSemantics semanticsPerRetry() {
46+
return semanticsPerRetry;
47+
}
48+
3649
/** Returns the custom serializer for this step, or null if not specified (uses default SerDes). */
3750
public SerDes serDes() {
3851
return serDes;
3952
}
4053

4154
public Builder toBuilder() {
42-
return new Builder(retryStrategy, semantics, serDes);
55+
return new Builder(retryStrategy, semantics, semanticsPerRetry, serDes);
4356
}
4457

4558
/**
@@ -48,18 +61,21 @@ public Builder toBuilder() {
4861
* @return a new Builder instance
4962
*/
5063
public static Builder builder() {
51-
return new Builder(null, null, null);
64+
return new Builder(null, null, null, null);
5265
}
5366

5467
/** Builder for creating StepConfig instances. */
5568
public static class Builder {
5669
private RetryStrategy retryStrategy;
5770
private StepSemantics semantics;
71+
private StepSemantics semanticsPerRetry;
5872
private SerDes serDes;
5973

60-
public Builder(RetryStrategy retryStrategy, StepSemantics semantics, SerDes serDes) {
74+
public Builder(
75+
RetryStrategy retryStrategy, StepSemantics semantics, StepSemantics semanticsPerRetry, SerDes serDes) {
6176
this.retryStrategy = retryStrategy;
6277
this.semantics = semantics;
78+
this.semanticsPerRetry = semanticsPerRetry;
6379
this.serDes = serDes;
6480
}
6581

@@ -79,12 +95,28 @@ public Builder retryStrategy(RetryStrategy retryStrategy) {
7995
*
8096
* @param semantics the delivery semantics to use, defaults to AT_LEAST_ONCE_PER_RETRY if not specified
8197
* @return this builder for method chaining
98+
* @deprecated Use {@link #semanticsPerRetry(StepSemantics)} instead. This method has incorrect behavior where
99+
* AT_MOST_ONCE_PER_RETRY does not retry after step interruption.
82100
*/
101+
@Deprecated(forRemoval = true)
83102
public Builder semantics(StepSemantics semantics) {
84103
this.semantics = semantics;
85104
return this;
86105
}
87106

107+
/**
108+
* Sets the delivery semantics per retry for the step.
109+
*
110+
* <p>If set, this takes precedence over {@link #semantics(StepSemantics)}.
111+
*
112+
* @param semanticsPerRetry the delivery semantics to use
113+
* @return this builder for method chaining
114+
*/
115+
public Builder semanticsPerRetry(StepSemantics semanticsPerRetry) {
116+
this.semanticsPerRetry = semanticsPerRetry;
117+
return this;
118+
}
119+
88120
/**
89121
* Sets a custom serializer for the step.
90122
*

0 commit comments

Comments
 (0)