22// SPDX-License-Identifier: Apache-2.0
33package software .amazon .lambda .durable .operation ;
44
5+ import java .time .Duration ;
6+ import java .time .Instant ;
57import java .util .concurrent .CompletableFuture ;
68import java .util .concurrent .ExecutorService ;
79import java .util .function .Function ;
2527import software .amazon .lambda .durable .util .ExceptionHelper ;
2628
2729public class StepOperation <T > extends BaseDurableOperation <T > {
30+ private static final Integer FIRST_ATTEMPT = 0 ;
2831
2932 private final Function <StepContext , T > function ;
3033 private final StepConfig config ;
@@ -46,37 +49,38 @@ public StepOperation(
4649 /** Starts the operation. */
4750 @ Override
4851 protected void start () {
49- executeStepLogic (0 );
52+ executeStepLogic (FIRST_ATTEMPT );
5053 }
5154
5255 /** Replays the operation. */
5356 @ Override
5457 protected void replay (Operation existing ) {
58+ var attempt = existing .stepDetails () != null && existing .stepDetails ().attempt () != null
59+ ? existing .stepDetails ().attempt ()
60+ : FIRST_ATTEMPT ;
5561 switch (existing .status ()) {
5662 case SUCCEEDED , FAILED -> markAlreadyCompleted ();
5763 case STARTED -> {
58- var attempt = existing .stepDetails ().attempt () != null
59- ? existing .stepDetails ().attempt ()
60- : 0 ;
6164 if (config .semantics () == StepSemantics .AT_MOST_ONCE_PER_RETRY ) {
6265 // AT_MOST_ONCE: treat as interrupted, go through retry logic
63- handleStepFailure (new StepInterruptedException (existing ), attempt + 1 );
66+ handleStepFailure (new StepInterruptedException (existing ), attempt );
6467 } else {
6568 // AT_LEAST_ONCE: re-execute the step
6669 executeStepLogic (attempt );
6770 }
6871 }
6972 // Step is pending retry - Start polling for PENDING -> READY transition
70- case PENDING -> pollReadyAndExecuteStepLogic (existing . stepDetails (). attempt () );
73+ case PENDING -> pollReadyAndExecuteStepLogic (existing , attempt );
7174 // Execute with current attempt
72- case READY -> executeStepLogic (existing . stepDetails (). attempt () );
75+ case READY -> executeStepLogic (attempt );
7376 default ->
7477 terminateExecutionWithIllegalDurableOperationException ("Unexpected step status: " + existing .status ());
7578 }
7679 }
7780
78- private CompletableFuture <Void > pollReadyAndExecuteStepLogic (int attempt ) {
79- return pollForOperationUpdates ()
81+ private CompletableFuture <Void > pollReadyAndExecuteStepLogic (Operation existing , int attempt ) {
82+ var nextAttemptInstant = existing .stepDetails ().nextAttemptTimestamp ();
83+ return pollForOperationUpdates (Duration .between (Instant .now (), nextAttemptInstant ))
8084 .thenCompose (op -> op .status () == OperationStatus .READY
8185 ? CompletableFuture .completedFuture (op )
8286 : pollForOperationUpdates ())
@@ -169,7 +173,8 @@ private void handleStepFailure(Throwable exception, int attempt) {
169173 .build ());
170174 sendOperationUpdate (retryUpdate );
171175
172- pollReadyAndExecuteStepLogic (attempt + 1 );
176+ // Poll for READY status and then execute the step again
177+ pollReadyAndExecuteStepLogic (getOperation (), attempt + 1 );
173178 } else {
174179 // Send FAIL - retries exhausted
175180 var failUpdate =
0 commit comments