diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/callback/RetryWaitForCallbackExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/callback/RetryWaitForCallbackExample.java new file mode 100644 index 000000000..1698e9a0c --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/callback/RetryWaitForCallbackExample.java @@ -0,0 +1,50 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.callback; + +import java.time.Duration; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.config.WithRetryConfig; +import software.amazon.lambda.durable.examples.types.ApprovalRequest; +import software.amazon.lambda.durable.retry.RetryDecision; + +/** + * Example demonstrating {@code context.withRetry} with {@code context.waitForCallback}. + * + *

Submits an approval request to an external system via a callback. If the callback fails (e.g., the external system + * rejects the request), the helper retries the entire waitForCallback cycle — creating a fresh callback with a new ID + * each time. + * + *

Each attempt uses a unique callback name ({@code "approval-1"}, {@code "approval-2"}, etc.) so the execution + * history stays clean and replay-safe. A {@code null} name is used, so attempts are grouped under a default-named child + * context. + */ +public class RetryWaitForCallbackExample extends DurableHandler { + + private static final int MAX_ATTEMPTS = 3; + + @Override + public String handleRequest(ApprovalRequest input, DurableContext context) { + // Step 1: Prepare the approval request + var prepared = context.step( + "prepare", + String.class, + stepCtx -> "Approval for: " + input.description() + " ($" + input.amount() + ")"); + + // Step 2: waitForCallback with retry — if the external system fails, try again with a fresh callback + var approvalResult = context.withRetry( + null, + (attempt, ctx) -> ctx.waitForCallback( + "approval-" + attempt, String.class, (callbackId, stepCtx) -> stepCtx.getLogger() + .info("Attempt {}: sending callback {} to approval system", attempt, callbackId)), + WithRetryConfig.builder() + .retryStrategy((error, attempt) -> attempt < MAX_ATTEMPTS + ? RetryDecision.retry(Duration.ofSeconds(2)) + : RetryDecision.fail()) + .build()); + + // Step 3: Process the result + return context.step("process-result", String.class, stepCtx -> prepared + " - Result: " + approvalResult); + } +} diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/invoke/RetryInvokeExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/invoke/RetryInvokeExample.java new file mode 100644 index 000000000..28cec8dc0 --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/invoke/RetryInvokeExample.java @@ -0,0 +1,40 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.invoke; + +import java.time.Duration; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.config.WithRetryConfig; +import software.amazon.lambda.durable.examples.types.GreetingRequest; +import software.amazon.lambda.durable.retry.RetryDecision; + +/** + * Example demonstrating {@code context.withRetry} with {@code context.invoke}. + * + *

Retries a chained Lambda invocation up to 3 times with a fixed 2-second backoff between attempts. Each attempt + * uses a unique operation name ({@code "call-greeting-1"}, {@code "call-greeting-2"}, etc.) so the execution history + * stays clean and replay-safe. + * + *

A {@code null} name is used, so attempts are grouped under a default-named child context. + */ +public class RetryInvokeExample extends DurableHandler { + + private static final int MAX_ATTEMPTS = 3; + + @Override + public String handleRequest(GreetingRequest input, DurableContext context) { + return context.withRetry( + null, + (attempt, ctx) -> ctx.invoke( + "call-greeting-" + attempt, + "simple-step-example" + input.getName() + ":$LATEST", + input, + String.class), + WithRetryConfig.builder() + .retryStrategy((error, attempt) -> attempt < MAX_ATTEMPTS + ? RetryDecision.retry(Duration.ofSeconds(2)) + : RetryDecision.fail()) + .build()); + } +} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java index a83109e89..d9df8bc67 100644 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java @@ -710,6 +710,89 @@ void testComplexFlatMapExample() { assertTrue(output.contains("reason=MIN_SUCCESSFUL_REACHED")); } + @Test + void testRetryInvokeExample() { + var runner = CloudDurableTestRunner.create( + arn("retry-invoke-example"), GreetingRequest.class, String.class, lambdaClient); + // The handler invokes "simple-step-example" + input.getName() + ":$LATEST", + // so passing the functionNameSuffix as the name targets the deployed simple-step-example function + var result = runner.run(new GreetingRequest(functionNameSuffix)); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertNotNull(result.getResult()); + } + + @Test + void testRetryWaitForCallbackExampleSucceeds() { + var runner = CloudDurableTestRunner.create( + arn("retry-wait-for-callback-example"), ApprovalRequest.class, String.class, lambdaClient); + + var execution = runner.startAsync(new ApprovalRequest("Server upgrade", 5000.0)); + + // Wait for the first callback from attempt 1 + execution.pollUntil(exec -> exec.hasCallback("approval-1-callback")); + var callbackId = execution.getCallbackId("approval-1-callback"); + assertNotNull(callbackId); + + // Complete the callback on the first attempt + execution.completeCallback(callbackId, "\"approved\""); + + var result = execution.pollUntilComplete(); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var finalResult = result.getResult(); + assertNotNull(finalResult); + assertTrue(finalResult.contains("Approval for: Server upgrade")); + assertTrue(finalResult.contains("5000")); + assertTrue(finalResult.contains("approved")); + + // Verify operations + assertNotNull(execution.getOperation("prepare")); + assertNotNull(execution.getOperation("process-result")); + } + + @Test + void testRetryWaitForCallbackExampleRetriesAfterFailure() { + var runner = CloudDurableTestRunner.create( + arn("retry-wait-for-callback-example"), ApprovalRequest.class, String.class, lambdaClient); + + var execution = runner.startAsync(new ApprovalRequest("Expensive item", 10000.0)); + + // Wait for the first callback from attempt 1 + execution.pollUntil(exec -> exec.hasCallback("approval-1-callback")); + var callbackId1 = execution.getCallbackId("approval-1-callback"); + assertNotNull(callbackId1); + + // Fail the first attempt + execution.failCallback( + callbackId1, + ErrorObject.builder() + .errorType("Rejected") + .errorMessage("denied by reviewer") + .build()); + + // Wait for the second callback from attempt 2 (after backoff) + execution.pollUntil(exec -> exec.hasCallback("approval-2-callback")); + var callbackId2 = execution.getCallbackId("approval-2-callback"); + assertNotNull(callbackId2); + + // Complete the second attempt + execution.completeCallback(callbackId2, "\"approved on retry\""); + + var result = execution.pollUntilComplete(); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var finalResult = result.getResult(); + assertNotNull(finalResult); + assertTrue(finalResult.contains("Approval for: Expensive item")); + assertTrue(finalResult.contains("10000")); + assertTrue(finalResult.contains("approved on retry")); + + // Verify operations + assertNotNull(execution.getOperation("prepare")); + assertNotNull(execution.getOperation("process-result")); + } + @Test void testWaitForConditionExample() { var runner = CloudDurableTestRunner.create( diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/callback/RetryWaitForCallbackExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/callback/RetryWaitForCallbackExampleTest.java new file mode 100644 index 000000000..ea5e8a4a8 --- /dev/null +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/callback/RetryWaitForCallbackExampleTest.java @@ -0,0 +1,148 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.callback; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.lambda.model.ErrorObject; +import software.amazon.lambda.durable.examples.types.ApprovalRequest; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class RetryWaitForCallbackExampleTest { + + @Test + void succeedsOnFirstAttempt() { + var handler = new RetryWaitForCallbackExample(); + var runner = LocalDurableTestRunner.create(ApprovalRequest.class, handler); + var input = new ApprovalRequest("New laptop", 1500.00); + + // First run — prepares request, starts waitForCallback, suspends + var result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Complete the callback (waitForCallback names it "approval-1-callback" internally) + var callbackId = runner.getCallbackId("approval-1-callback"); + assertNotNull(callbackId, "Callback 'approval-1-callback' should have been created"); + runner.completeCallback(callbackId, "\"Approved by manager\""); + + // Run to completion + result = runner.runUntilComplete(input); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals( + "Approval for: New laptop ($1500.0) - Result: Approved by manager", result.getResult(String.class)); + } + + @Test + void retriesAfterFirstCallbackFails() { + var handler = new RetryWaitForCallbackExample(); + var runner = LocalDurableTestRunner.create(ApprovalRequest.class, handler); + var input = new ApprovalRequest("Server upgrade", 5000.00); + + // First run — prepares, starts waitForCallback attempt 1, suspends + var result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Fail the first callback + var callbackId1 = runner.getCallbackId("approval-1-callback"); + assertNotNull(callbackId1); + runner.failCallback( + callbackId1, + ErrorObject.builder() + .errorType("RejectedError") + .errorMessage("Rejected by first reviewer") + .build()); + + // Run — processes failure, hits backoff wait, suspends + result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance past the backoff wait + runner.advanceTime(); + + // Run — starts waitForCallback attempt 2, suspends + result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Complete the second callback + var callbackId2 = runner.getCallbackId("approval-2-callback"); + assertNotNull(callbackId2, "Callback 'approval-2-callback' should have been created after retry"); + runner.completeCallback(callbackId2, "\"Approved on second try\""); + + // Run to completion + result = runner.runUntilComplete(input); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals( + "Approval for: Server upgrade ($5000.0) - Result: Approved on second try", + result.getResult(String.class)); + } + + @Test + void failsAfterAllRetriesExhausted() { + var handler = new RetryWaitForCallbackExample(); + var runner = LocalDurableTestRunner.create(ApprovalRequest.class, handler); + var input = new ApprovalRequest("Expensive item", 10000.00); + + // First run — starts waitForCallback attempt 1 + var result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Fail callback attempt 1 + var callbackId1 = runner.getCallbackId("approval-1-callback"); + runner.failCallback( + callbackId1, + ErrorObject.builder() + .errorType("Rejected") + .errorMessage("fail 1") + .build()); + result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance past backoff 1, run to start attempt 2 + runner.advanceTime(); + result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Fail callback attempt 2 + var callbackId2 = runner.getCallbackId("approval-2-callback"); + runner.failCallback( + callbackId2, + ErrorObject.builder() + .errorType("Rejected") + .errorMessage("fail 2") + .build()); + result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance past backoff 2, run to start attempt 3 + runner.advanceTime(); + result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Fail callback attempt 3 — last attempt, retryStrategy returns fail() + var callbackId3 = runner.getCallbackId("approval-3-callback"); + runner.failCallback( + callbackId3, + ErrorObject.builder() + .errorType("Rejected") + .errorMessage("fail 3") + .build()); + result = runner.run(input); + + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + } + + @Test + void suspendsOnFirstRun() { + var handler = new RetryWaitForCallbackExample(); + var runner = LocalDurableTestRunner.create(ApprovalRequest.class, handler); + var input = new ApprovalRequest("Test item", 100.00); + + var result = runner.run(input); + + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + } +} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/invoke/RetryInvokeExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/invoke/RetryInvokeExampleTest.java new file mode 100644 index 000000000..7d8e66104 --- /dev/null +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/invoke/RetryInvokeExampleTest.java @@ -0,0 +1,130 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.invoke; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.lambda.model.ErrorObject; +import software.amazon.lambda.durable.examples.types.GreetingRequest; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class RetryInvokeExampleTest { + + @Test + void succeedsOnFirstAttempt() { + var handler = new RetryInvokeExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + var input = new GreetingRequest("world"); + + // First run — starts the invoke, suspends waiting for result + var result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Complete the first invoke attempt + runner.completeChainedInvoke("call-greeting-1", "\"hello world\""); + result = runner.run(input); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("hello world", result.getResult(String.class)); + } + + @Test + void retriesAfterFirstAttemptFails() { + var handler = new RetryInvokeExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + var input = new GreetingRequest("world"); + + // First run — starts invoke attempt 1 + var result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Fail the first invoke attempt + runner.failChainedInvoke( + "call-greeting-1", + ErrorObject.builder() + .errorType("TransientError") + .errorMessage("Service unavailable") + .build()); + + // Second run — processes the failure, does backoff wait, starts invoke attempt 2 + result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance past the backoff wait + runner.advanceTime(); + result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Complete the second invoke attempt + runner.completeChainedInvoke("call-greeting-2", "\"hello on retry\""); + result = runner.run(input); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("hello on retry", result.getResult(String.class)); + } + + @Test + void failsAfterAllRetriesExhausted() { + var handler = new RetryInvokeExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + var input = new GreetingRequest("world"); + + // First run — starts invoke attempt 1 + var result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Fail attempt 1 + runner.failChainedInvoke( + "call-greeting-1", + ErrorObject.builder() + .errorType("TransientError") + .errorMessage("fail 1") + .build()); + result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance past backoff wait 1 + runner.advanceTime(); + result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Fail attempt 2 + runner.failChainedInvoke( + "call-greeting-2", + ErrorObject.builder() + .errorType("TransientError") + .errorMessage("fail 2") + .build()); + result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance past backoff wait 2 + runner.advanceTime(); + result = runner.run(input); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Fail attempt 3 — this is the last attempt, retryStrategy returns fail() + runner.failChainedInvoke( + "call-greeting-3", + ErrorObject.builder() + .errorType("TransientError") + .errorMessage("fail 3") + .build()); + result = runner.run(input); + + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + } + + @Test + void suspendsOnFirstRun() { + var handler = new RetryInvokeExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + var input = new GreetingRequest("test"); + + var result = runner.run(input); + + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + } +} diff --git a/examples/template.yaml b/examples/template.yaml index c5a5049b5..1a256c5c3 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -432,6 +432,186 @@ Resources: - lambda:GetDurableExecutionState Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:concurrent-wait-for-condition-example-${JavaVersion}-runtime" + RetryWaitForCallbackExampleFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Join + - '-' + - - 'retry-wait-for-callback-example' + - !Ref JavaVersion + - runtime + Handler: "software.amazon.lambda.durable.examples.callback.RetryWaitForCallbackExample" + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:retry-wait-for-callback-example-${JavaVersion}-runtime" + + WaitForCallbackFailedExampleFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Join + - '-' + - - 'wait-for-callback-failed-example' + - !Ref JavaVersion + - runtime + Handler: "software.amazon.lambda.durable.examples.callback.WaitForCallbackFailedExample" + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:wait-for-callback-failed-example-${JavaVersion}-runtime" + + CustomPollingExampleFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Join + - '-' + - - 'custom-polling-example' + - !Ref JavaVersion + - runtime + Handler: "software.amazon.lambda.durable.examples.general.CustomPollingExample" + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + - lambda:InvokeFunction + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:custom-polling-example-${JavaVersion}-runtime" + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: '*' + + RetryInvokeExampleFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Join + - '-' + - - 'retry-invoke-example' + - !Ref JavaVersion + - runtime + Handler: "software.amazon.lambda.durable.examples.invoke.RetryInvokeExample" + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + - lambda:InvokeFunction + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:retry-invoke-example-${JavaVersion}-runtime" + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: '*' + + DeserializationFailedMapExampleFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Join + - '-' + - - 'deserialization-failed-map-example' + - !Ref JavaVersion + - runtime + Handler: "software.amazon.lambda.durable.examples.map.DeserializationFailedMapExample" + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:deserialization-failed-map-example-${JavaVersion}-runtime" + + ParallelExampleFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Join + - '-' + - - 'parallel-example' + - !Ref JavaVersion + - runtime + Handler: "software.amazon.lambda.durable.examples.parallel.ParallelExample" + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:parallel-example-${JavaVersion}-runtime" + + ParallelFailureToleranceExampleFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Join + - '-' + - - 'parallel-failure-tolerance-example' + - !Ref JavaVersion + - runtime + Handler: "software.amazon.lambda.durable.examples.parallel.ParallelFailureToleranceExample" + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:parallel-failure-tolerance-example-${JavaVersion}-runtime" + + ParallelWithWaitExampleFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Join + - '-' + - - 'parallel-with-wait-example' + - !Ref JavaVersion + - runtime + Handler: "software.amazon.lambda.durable.examples.parallel.ParallelWithWaitExample" + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:parallel-with-wait-example-${JavaVersion}-runtime" + + DeserializationFailedParallelExampleFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Join + - '-' + - - 'deserialization-failed-parallel-example' + - !Ref JavaVersion + - runtime + Handler: "software.amazon.lambda.durable.examples.parallel.DeserializationFailedParallelExample" + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:deserialization-failed-parallel-example-${JavaVersion}-runtime" + + DeserializationFailureExampleFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Join + - '-' + - - 'deserialization-failure-example' + - !Ref JavaVersion + - runtime + Handler: "software.amazon.lambda.durable.examples.step.DeserializationFailureExample" + Policies: + - Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:deserialization-failure-example-${JavaVersion}-runtime" + ManyAsyncStepsVirtualThreadPoolExampleFunction: Type: AWS::Serverless::Function Condition: IsJava21OrLater @@ -515,6 +695,10 @@ Outputs: Description: Child Context Example Function ARN Value: !GetAtt ChildContextExampleFunction.Arn + VirtualChildContextExampleFunction: + Description: Virtual Child Context Example Function ARN + Value: !GetAtt VirtualChildContextExampleFunction.Arn + WaitAsyncExampleFunction: Description: Wait Async Example Function ARN Value: !GetAtt WaitAsyncExampleFunction.Arn @@ -543,6 +727,46 @@ Outputs: Description: Concurrent Wait For Condition Example Function ARN Value: !GetAtt ConcurrentWaitForConditionExampleFunction.Arn + RetryWaitForCallbackExampleFunction: + Description: Retry Wait For Callback Example Function ARN + Value: !GetAtt RetryWaitForCallbackExampleFunction.Arn + + WaitForCallbackFailedExampleFunction: + Description: Wait For Callback Failed Example Function ARN + Value: !GetAtt WaitForCallbackFailedExampleFunction.Arn + + CustomPollingExampleFunction: + Description: Custom Polling Example Function ARN + Value: !GetAtt CustomPollingExampleFunction.Arn + + RetryInvokeExampleFunction: + Description: Retry Invoke Example Function ARN + Value: !GetAtt RetryInvokeExampleFunction.Arn + + DeserializationFailedMapExampleFunction: + Description: Deserialization Failed Map Example Function ARN + Value: !GetAtt DeserializationFailedMapExampleFunction.Arn + + ParallelExampleFunction: + Description: Parallel Example Function ARN + Value: !GetAtt ParallelExampleFunction.Arn + + ParallelFailureToleranceExampleFunction: + Description: Parallel Failure Tolerance Example Function ARN + Value: !GetAtt ParallelFailureToleranceExampleFunction.Arn + + ParallelWithWaitExampleFunction: + Description: Parallel With Wait Example Function ARN + Value: !GetAtt ParallelWithWaitExampleFunction.Arn + + DeserializationFailedParallelExampleFunction: + Description: Deserialization Failed Parallel Example Function ARN + Value: !GetAtt DeserializationFailedParallelExampleFunction.Arn + + DeserializationFailureExampleFunction: + Description: Deserialization Failure Example Function ARN + Value: !GetAtt DeserializationFailureExampleFunction.Arn + ManyAsyncStepsVirtualThreadPoolExampleFunction: Condition: IsJava21OrLater Description: Many Async Steps Virtual Thread Pool Example Function ARN diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/RetryInvokeIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/RetryInvokeIntegrationTest.java new file mode 100644 index 000000000..7acacc988 --- /dev/null +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/RetryInvokeIntegrationTest.java @@ -0,0 +1,197 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable; + +import static org.junit.jupiter.api.Assertions.*; + +import java.time.Duration; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.lambda.model.ErrorObject; +import software.amazon.lambda.durable.config.WithRetryConfig; +import software.amazon.lambda.durable.exception.InvokeFailedException; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.retry.RetryDecision; +import software.amazon.lambda.durable.retry.RetryStrategies; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class RetryInvokeIntegrationTest { + + @Test + void invokeSucceedsOnFirstAttempt() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.withRetry( + null, + (attempt, ctx) -> ctx.invoke("invoke-" + attempt, "target-fn", "{}", String.class), + WithRetryConfig.builder() + .retryStrategy(RetryStrategies.fixedDelay(3, Duration.ofSeconds(2))) + .build())); + + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + runner.completeChainedInvoke("invoke-1", "\"success\""); + result = runner.run("test"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("success", result.getResult(String.class)); + } + + @Test + void invokeRetriesAfterFailure() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.withRetry( + null, + (attempt, ctx) -> ctx.invoke("invoke-" + attempt, "target-fn", "{}", String.class), + WithRetryConfig.builder() + .retryStrategy(RetryStrategies.fixedDelay(3, Duration.ofSeconds(2))) + .build())); + + // First run — invoke attempt 1 starts + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Fail attempt 1 + runner.failChainedInvoke( + "invoke-1", + ErrorObject.builder() + .errorType("TransientError") + .errorMessage("service unavailable") + .build()); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance past backoff wait + runner.advanceTime(); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Complete attempt 2 + runner.completeChainedInvoke("invoke-2", "\"recovered\""); + result = runner.run("test"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("recovered", result.getResult(String.class)); + } + + @Test + void invokeFailsAfterAllRetriesExhausted() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.withRetry( + null, + (attempt, ctx) -> ctx.invoke("invoke-" + attempt, "target-fn", "{}", String.class), + WithRetryConfig.builder() + .retryStrategy((error, attempt) -> + attempt < 2 ? RetryDecision.retry(Duration.ofSeconds(1)) : RetryDecision.fail()) + .build())); + + // Attempt 1 + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + runner.failChainedInvoke( + "invoke-1", ErrorObject.builder().errorMessage("fail 1").build()); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance past backoff + runner.advanceTime(); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Attempt 2 — last attempt + runner.failChainedInvoke( + "invoke-2", ErrorObject.builder().errorMessage("fail 2").build()); + result = runner.run("test"); + + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + } + + @Test + void invokeRetryWithCustomBackoffDelay() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.withRetry( + null, + (attempt, ctx) -> ctx.invoke("invoke-" + attempt, "target-fn", "{}", String.class), + WithRetryConfig.builder() + .retryStrategy((error, attempt) -> attempt < 3 + ? RetryDecision.retry(Duration.ofSeconds(attempt * 5L)) + : RetryDecision.fail()) + .build())); + + // Attempt 1 fails + var result = runner.run("test"); + runner.failChainedInvoke( + "invoke-1", ErrorObject.builder().errorMessage("fail").build()); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance past first backoff (5s) + runner.advanceTime(); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Attempt 2 succeeds + runner.completeChainedInvoke("invoke-2", "\"ok\""); + result = runner.run("test"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("ok", result.getResult(String.class)); + } + + @Test + void invokeRetryWithStepsBeforeAndAfter() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var prefix = context.step("prepare", String.class, stepCtx -> "prepared"); + + var invokeResult = context.withRetry( + null, + (attempt, ctx) -> ctx.invoke("invoke-" + attempt, "target-fn", "{}", String.class), + WithRetryConfig.builder() + .retryStrategy(RetryStrategies.fixedDelay(3, Duration.ofSeconds(1))) + .build()); + + return context.step("finalize", String.class, stepCtx -> prefix + " -> " + invokeResult + " -> done"); + }); + + // First run — prepare step completes, invoke starts + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Complete invoke + runner.completeChainedInvoke("invoke-1", "\"invoked\""); + result = runner.runUntilComplete("test"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("prepared -> invoked -> done", result.getResult(String.class)); + } + + @Test + void invokeRetryPreservesOriginalExceptionType() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + try { + return context.withRetry( + null, + (attempt, ctx) -> ctx.invoke("invoke-" + attempt, "target-fn", "{}", String.class), + WithRetryConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build()); + } catch (InvokeFailedException e) { + assertEquals("invoke failed", e.getMessage()); + throw e; + } + }); + + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + runner.failChainedInvoke( + "invoke-1", ErrorObject.builder().errorMessage("invoke failed").build()); + result = runner.run("test"); + + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + } +} diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/RetryWaitForCallbackIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/RetryWaitForCallbackIntegrationTest.java new file mode 100644 index 000000000..d6ad39c0c --- /dev/null +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/RetryWaitForCallbackIntegrationTest.java @@ -0,0 +1,252 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable; + +import static org.junit.jupiter.api.Assertions.*; + +import java.time.Duration; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.lambda.model.ErrorObject; +import software.amazon.lambda.durable.config.WithRetryConfig; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.retry.RetryDecision; +import software.amazon.lambda.durable.retry.RetryStrategies; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class RetryWaitForCallbackIntegrationTest { + + @Test + void waitForCallbackSucceedsOnFirstAttempt() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.withRetry( + null, + (attempt, ctx) -> ctx.waitForCallback( + "approval-" + attempt, String.class, (callbackId, stepCtx) -> stepCtx.getLogger() + .info("Submitting callback {}", callbackId)), + WithRetryConfig.builder() + .retryStrategy(RetryStrategies.fixedDelay(3, Duration.ofSeconds(2))) + .build())); + + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // waitForCallback("approval-1", ...) creates "approval-1-callback" internally + var callbackId = runner.getCallbackId("approval-1-callback"); + assertNotNull(callbackId); + runner.completeCallback(callbackId, "\"approved\""); + + result = runner.runUntilComplete("test"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("approved", result.getResult(String.class)); + } + + @Test + void waitForCallbackRetriesAfterFailure() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.withRetry( + null, + (attempt, ctx) -> ctx.waitForCallback( + "approval-" + attempt, String.class, (callbackId, stepCtx) -> stepCtx.getLogger() + .info("Attempt {} callback {}", attempt, callbackId)), + WithRetryConfig.builder() + .retryStrategy(RetryStrategies.fixedDelay(3, Duration.ofSeconds(2))) + .build())); + + // First run — starts waitForCallback attempt 1 + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Fail attempt 1 + var callbackId1 = runner.getCallbackId("approval-1-callback"); + assertNotNull(callbackId1); + runner.failCallback( + callbackId1, + ErrorObject.builder() + .errorType("Rejected") + .errorMessage("denied by reviewer") + .build()); + + // Run — processes failure, hits backoff wait + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance past backoff + runner.advanceTime(); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Complete attempt 2 + var callbackId2 = runner.getCallbackId("approval-2-callback"); + assertNotNull(callbackId2); + runner.completeCallback(callbackId2, "\"approved on retry\""); + + result = runner.runUntilComplete("test"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("approved on retry", result.getResult(String.class)); + } + + @Test + void waitForCallbackFailsAfterAllRetriesExhausted() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.withRetry( + null, + (attempt, ctx) -> + ctx.waitForCallback("approval-" + attempt, String.class, (callbackId, stepCtx) -> {}), + WithRetryConfig.builder() + .retryStrategy((error, attempt) -> + attempt < 2 ? RetryDecision.retry(Duration.ofSeconds(1)) : RetryDecision.fail()) + .build())); + + // Attempt 1 + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + runner.failCallback( + runner.getCallbackId("approval-1-callback"), + ErrorObject.builder().errorMessage("fail 1").build()); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance past backoff + runner.advanceTime(); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Attempt 2 — last attempt + runner.failCallback( + runner.getCallbackId("approval-2-callback"), + ErrorObject.builder().errorMessage("fail 2").build()); + result = runner.run("test"); + + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + } + + @Test + void waitForCallbackRetryWithStepsBeforeAndAfter() { + var runner = LocalDurableTestRunner.create(String.class, (input, context) -> { + var prefix = context.step("prepare", String.class, stepCtx -> "prepared"); + + var callbackResult = context.withRetry( + null, + (attempt, ctx) -> + ctx.waitForCallback("approval-" + attempt, String.class, (callbackId, stepCtx) -> {}), + WithRetryConfig.builder() + .retryStrategy(RetryStrategies.fixedDelay(3, Duration.ofSeconds(1))) + .build()); + + return context.step("finalize", String.class, stepCtx -> prefix + " -> " + callbackResult + " -> done"); + }); + + // First run — prepare completes, waitForCallback starts + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Complete callback + var callbackId = runner.getCallbackId("approval-1-callback"); + runner.completeCallback(callbackId, "\"approved\""); + + result = runner.runUntilComplete("test"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("prepared -> approved -> done", result.getResult(String.class)); + } + + @Test + void waitForCallbackRetryMultipleFailuresThenSuccess() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.withRetry( + null, + (attempt, ctx) -> + ctx.waitForCallback("cb-" + attempt, String.class, (callbackId, stepCtx) -> {}), + WithRetryConfig.builder() + .retryStrategy(RetryStrategies.fixedDelay(4, Duration.ofSeconds(1))) + .build())); + + // Attempt 1 — fail + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + runner.failCallback( + runner.getCallbackId("cb-1-callback"), + ErrorObject.builder().errorMessage("fail 1").build()); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + runner.advanceTime(); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Attempt 2 — fail + runner.failCallback( + runner.getCallbackId("cb-2-callback"), + ErrorObject.builder().errorMessage("fail 2").build()); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + runner.advanceTime(); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Attempt 3 — succeed + runner.completeCallback(runner.getCallbackId("cb-3-callback"), "\"third time's the charm\""); + result = runner.runUntilComplete("test"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("third time's the charm", result.getResult(String.class)); + } + + @Test + void waitForCallbackRetryWithSubmitterLogic() { + // Verify the submitter runs on each retry attempt + var runner = LocalDurableTestRunner.create( + String.class, + (input, context) -> context.withRetry( + null, + (attempt, ctx) -> + ctx.waitForCallback("approval-" + attempt, String.class, (callbackId, stepCtx) -> { + // Submitter runs each attempt — in a real scenario this would + // send the callbackId to an external system + stepCtx.getLogger().info("Attempt {} submitting {}", attempt, callbackId); + }), + WithRetryConfig.builder() + .retryStrategy(RetryStrategies.fixedDelay(3, Duration.ofSeconds(2))) + .build())); + + // Attempt 1 + var result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Verify submitter step was created for attempt 1 + var submitterOp = runner.getOperation("approval-1-submitter"); + assertNotNull(submitterOp, "Submitter step should exist for attempt 1"); + + // Fail attempt 1 + runner.failCallback( + runner.getCallbackId("approval-1-callback"), + ErrorObject.builder().errorMessage("rejected").build()); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Advance past backoff, start attempt 2 + runner.advanceTime(); + result = runner.run("test"); + assertEquals(ExecutionStatus.PENDING, result.getStatus()); + + // Verify submitter step was created for attempt 2 + var submitterOp2 = runner.getOperation("approval-2-submitter"); + assertNotNull(submitterOp2, "Submitter step should exist for attempt 2"); + + // Complete attempt 2 + runner.completeCallback(runner.getCallbackId("approval-2-callback"), "\"approved\""); + result = runner.runUntilComplete("test"); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("approved", result.getResult(String.class)); + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java index 615ce70e8..38065b749 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/DurableContext.java @@ -16,6 +16,7 @@ import software.amazon.lambda.durable.config.StepConfig; import software.amazon.lambda.durable.config.WaitForCallbackConfig; import software.amazon.lambda.durable.config.WaitForConditionConfig; +import software.amazon.lambda.durable.config.WithRetryConfig; import software.amazon.lambda.durable.context.BaseContext; import software.amazon.lambda.durable.model.MapResult; import software.amazon.lambda.durable.model.WaitForConditionResult; @@ -727,6 +728,87 @@ DurableFuture waitForConditionAsync( BiFunction> checkFunc, WaitForConditionConfig config); + // =============== withRetry ================ + + /** + * Replay-safe retry loop for any durable operation (sync) with default configuration. + * + *

Uses {@link WithRetryConfig} defaults: + * {@link software.amazon.lambda.durable.retry.RetryStrategies.Presets#DEFAULT} retry strategy and no child context + * wrapping. + * + * @param the result type + * @param name operation name (used for backoff wait names, and as the child context name when wrapping); pass + * {@code null} for an anonymous retry whose backoff waits use default names + * @param operation the retryable operation — receives the 1-based attempt number and the durable context + * @return the operation result + * @see #withRetry(String, BiFunction, WithRetryConfig) + */ + default T withRetry(String name, BiFunction operation) { + return withRetry(name, operation, WithRetryConfig.builder().build()); + } + + /** + * Replay-safe retry loop for any durable operation (sync). + * + *

Provides the same retry-with-backoff pattern that {@code step()} has built in, but for operations that cannot + * live inside a step ({@code waitForCallback}, {@code invoke}, {@code waitForCondition}, etc.). + * + *

Every side-effect in the loop is a durable operation, so the loop is replay-safe by construction. On replay, + * completed operations return cached results instantly and the loop fast-forwards to the current attempt. + * + *

The retry loop always runs in a child context to provide an isolated operation ID namespace. If + * {@link WithRetryConfig#wrapInChildContext()} is enabled, the child context is checkpointed (persisted) so all + * attempts are grouped under a single named operation in execution history. Otherwise, a virtual child context is + * used — no checkpointing overhead, but the child re-executes on replay. + * + * @param the result type + * @param name operation name (used for backoff wait names, and as the child context name when wrapping); pass + * {@code null} for an anonymous retry whose backoff waits use default names + * @param operation the retryable operation — receives the 1-based attempt number and the durable context + * @param config retry configuration including the retry strategy and child context wrapping + * @return the operation result + */ + default T withRetry(String name, BiFunction operation, WithRetryConfig config) { + return withRetryAsync(name, operation, config).get(); + } + + /** + * Replay-safe retry loop for any durable operation (async) with default configuration. + * + *

Uses {@link WithRetryConfig} defaults: + * {@link software.amazon.lambda.durable.retry.RetryStrategies.Presets#DEFAULT} retry strategy and no child context + * wrapping. + * + * @param the result type + * @param name operation name (used for child context and backoff wait names); pass {@code null} for an anonymous + * retry whose backoff waits use default names + * @param operation the retryable operation — receives the 1-based attempt number and the durable context + * @return a future representing the operation result + * @see #withRetryAsync(String, BiFunction, WithRetryConfig) + */ + default DurableFuture withRetryAsync(String name, BiFunction operation) { + return withRetryAsync(name, operation, WithRetryConfig.builder().build()); + } + + /** + * Replay-safe retry loop for any durable operation (async). + * + *

The retry loop always runs in a child context to provide an isolated operation ID namespace. If + * {@link WithRetryConfig#wrapInChildContext()} is enabled, the child context is checkpointed (persisted) so all + * attempts are grouped under a single named operation in execution history. Otherwise, a virtual child context is + * used — no checkpointing overhead, but the child re-executes on replay. + * + * @param the result type + * @param name operation name (used for child context and backoff wait names); pass {@code null} for an anonymous + * retry whose backoff waits use default names + * @param operation the retryable operation — receives the 1-based attempt number and the durable context + * @param config retry configuration including the retry strategy + * @return a future representing the operation result + */ + DurableFuture withRetryAsync( + String name, BiFunction operation, WithRetryConfig config); + /** * Function applied to each item in a map operation. * diff --git a/sdk/src/main/java/software/amazon/lambda/durable/config/WithRetryConfig.java b/sdk/src/main/java/software/amazon/lambda/durable/config/WithRetryConfig.java new file mode 100644 index 000000000..23ff43830 --- /dev/null +++ b/sdk/src/main/java/software/amazon/lambda/durable/config/WithRetryConfig.java @@ -0,0 +1,105 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.config; + +import software.amazon.lambda.durable.retry.RetryStrategies; +import software.amazon.lambda.durable.retry.RetryStrategy; + +/** + * Configuration for {@link software.amazon.lambda.durable.DurableContext#withRetry}. + * + *

Uses the same {@link RetryStrategy} shape that developers already know from {@link StepConfig}, so there are zero + * new retry concepts to learn. If no retry strategy is specified, {@link RetryStrategies.Presets#DEFAULT} is used. + */ +public class WithRetryConfig { + private final RetryStrategy retryStrategy; + private final boolean wrapInChildContext; + + private WithRetryConfig(Builder builder) { + this.retryStrategy = builder.retryStrategy; + this.wrapInChildContext = builder.wrapInChildContext; + } + + /** + * Returns the retry strategy, or the default strategy if not specified. Same type as + * {@link StepConfig#retryStrategy()}. + * + * @return the retry strategy, never null + */ + public RetryStrategy retryStrategy() { + return retryStrategy != null ? retryStrategy : RetryStrategies.Presets.DEFAULT; + } + + /** + * Returns whether the sync {@code withRetry} should wrap the retry loop in a child context. + * + *

When {@code true}, the sync form behaves like the async form — all retry attempts are grouped under a single + * named child context in execution history. When {@code false} (the default), the retry loop runs directly on the + * caller's context. + * + *

This setting has no effect on the async {@code withRetryAsync} methods, which always wrap in a child context. + * + * @return {@code true} if the sync retry loop should be wrapped in a child context + */ + public boolean wrapInChildContext() { + return wrapInChildContext; + } + + /** + * Creates a new builder for {@code WithRetryConfig}. + * + * @return a new builder instance + */ + public static Builder builder() { + return new Builder(); + } + + /** Builder for creating {@link WithRetryConfig} instances. */ + public static class Builder { + private RetryStrategy retryStrategy; + private boolean wrapInChildContext; + + private Builder() {} + + /** + * Sets the retry strategy. Optional — defaults to {@link RetryStrategies.Presets#DEFAULT} if not set. + * + *

Reuses the exact same {@link RetryStrategy} interface from {@link StepConfig}. All existing factory + * methods ({@link software.amazon.lambda.durable.retry.RetryStrategies#exponentialBackoff}, + * {@link software.amazon.lambda.durable.retry.RetryStrategies#fixedDelay}, presets, and custom lambdas) work + * without modification. + * + * @param retryStrategy the retry strategy to use + * @return this builder for method chaining + */ + public Builder retryStrategy(RetryStrategy retryStrategy) { + this.retryStrategy = retryStrategy; + return this; + } + + /** + * Sets whether the sync {@code withRetry} should wrap the retry loop in a child context. Optional — defaults to + * {@code false}. + * + *

When enabled, the sync form groups all retry attempts under a single named child context in execution + * history, matching the behavior of the async form. This is useful when you want operation isolation but don't + * need a {@link software.amazon.lambda.durable.DurableFuture}. + * + * @param wrapInChildContext {@code true} to wrap in a child context + * @return this builder for method chaining + */ + public Builder wrapInChildContext(boolean wrapInChildContext) { + this.wrapInChildContext = wrapInChildContext; + return this; + } + + /** + * Builds the {@link WithRetryConfig} instance. + * + * @return a new config with the configured options + */ + public WithRetryConfig build() { + return new WithRetryConfig(this); + } + } +} diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java index 33cbc21bb..62414aba9 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java @@ -27,8 +27,11 @@ import software.amazon.lambda.durable.config.StepConfig; import software.amazon.lambda.durable.config.WaitForCallbackConfig; import software.amazon.lambda.durable.config.WaitForConditionConfig; +import software.amazon.lambda.durable.config.WithRetryConfig; +import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.OperationIdGenerator; +import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.logging.DurableLogger; import software.amazon.lambda.durable.model.MapResult; @@ -43,6 +46,7 @@ import software.amazon.lambda.durable.operation.StepOperation; import software.amazon.lambda.durable.operation.WaitForConditionOperation; import software.amazon.lambda.durable.operation.WaitOperation; +import software.amazon.lambda.durable.retry.RetryDecision; import software.amazon.lambda.durable.util.ParameterValidator; /** @@ -366,6 +370,65 @@ public DurableFuture waitForConditionAsync( return operation; } + // =============== withRetry ================ + + private static final Duration DEFAULT_BACKOFF_DELAY = Duration.ofSeconds(1); + private static final String BACKOFF_SUFFIX = "-backoff-"; + private static final String ANONYMOUS_CHILD_CONTEXT_NAME = "retry"; + private static final String ANONYMOUS_BACKOFF_PREFIX = "retry-backoff-"; + + @Override + @SuppressWarnings("unchecked") + public DurableFuture withRetryAsync( + String name, BiFunction operation, WithRetryConfig config) { + Objects.requireNonNull(operation, "operation cannot be null"); + Objects.requireNonNull(config, "config cannot be null"); + + var childContextName = name != null ? name : ANONYMOUS_CHILD_CONTEXT_NAME; + + return (DurableFuture) runInChildContextAsync( + childContextName, + new TypeToken() {}, + childCtx -> executeRetryLoop(childCtx, name, operation, config), + RunInChildContextConfig.builder() + .isVirtual(!config.wrapInChildContext()) + .build(), + OperationSubType.WITH_RETRY); + } + + /** + * Core retry loop. Replay-safe because every side-effect is a durable operation: the user's operation calls durable + * primitives, and backoff uses {@code context.wait()}. + * + *

{@link SuspendExecutionException} and {@link UnrecoverableDurableExecutionException} are never retried — they + * are internal SDK control flow signals that must propagate immediately. + */ + private static T executeRetryLoop( + DurableContext context, + String name, + BiFunction operation, + WithRetryConfig config) { + var attempt = 1; + while (true) { + try { + return operation.apply(attempt, context); + } catch (SuspendExecutionException | UnrecoverableDurableExecutionException e) { + // Internal SDK control flow — never retry, always propagate + throw e; + } catch (Exception e) { + RetryDecision decision = config.retryStrategy().makeRetryDecision(e, attempt); + if (!decision.shouldRetry()) { + throw e; + } + + var delay = decision.delay().isZero() ? DEFAULT_BACKOFF_DELAY : decision.delay(); + var waitName = name != null ? name + BACKOFF_SUFFIX + attempt : ANONYMOUS_BACKOFF_PREFIX + attempt; + context.wait(waitName, delay); + attempt++; + } + } + } + // =============== accessors ================ @Override public DurableLogger getLogger() { diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java b/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java index 90c351487..416798f9f 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java @@ -15,7 +15,8 @@ public enum OperationSubType { PARALLEL("Parallel"), PARALLEL_BRANCH("ParallelBranch"), WAIT_FOR_CALLBACK("WaitForCallback"), - WAIT_FOR_CONDITION("WaitForCondition"); + WAIT_FOR_CONDITION("WaitForCondition"), + WITH_RETRY("WithRetry"); private final String value; diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java index 6f539ac73..1aba0f72b 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java @@ -245,7 +245,7 @@ private Throwable translateException(Operation op, ErrorObject errorObject) { case WAIT_FOR_CALLBACK -> handleWaitForCallbackFailure(); case MAP_ITERATION -> new MapIterationFailedException(op); case PARALLEL_BRANCH -> new ParallelBranchFailedException(op); - case RUN_IN_CHILD_CONTEXT -> new ChildContextFailedException(op); + case RUN_IN_CHILD_CONTEXT, WITH_RETRY -> new ChildContextFailedException(op); // the following subtypes should not be able to reach here case PARALLEL, MAP, WAIT_FOR_CONDITION -> new IllegalStateException("Unexpected sub-type: " + getSubType()); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/config/WithRetryConfigTest.java b/sdk/src/test/java/software/amazon/lambda/durable/config/WithRetryConfigTest.java new file mode 100644 index 000000000..b1c02db9a --- /dev/null +++ b/sdk/src/test/java/software/amazon/lambda/durable/config/WithRetryConfigTest.java @@ -0,0 +1,80 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.config; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.retry.RetryDecision; +import software.amazon.lambda.durable.retry.RetryStrategies; + +class WithRetryConfigTest { + + @Test + void builderWithRetryStrategy() { + var strategy = RetryStrategies.Presets.DEFAULT; + + var config = WithRetryConfig.builder().retryStrategy(strategy).build(); + + assertEquals(strategy, config.retryStrategy()); + } + + @Test + void builderWithoutRetryStrategy_usesDefault() { + var config = WithRetryConfig.builder().build(); + + assertEquals(RetryStrategies.Presets.DEFAULT, config.retryStrategy()); + } + + @Test + void builderChaining() { + var strategy = RetryStrategies.Presets.DEFAULT; + + var config = WithRetryConfig.builder().retryStrategy(strategy).build(); + + assertEquals(strategy, config.retryStrategy()); + } + + @Test + void builderWithCustomLambdaRetryStrategy() { + var config = WithRetryConfig.builder() + .retryStrategy((error, attempt) -> RetryDecision.fail()) + .build(); + + assertNotNull(config.retryStrategy()); + } + + @Test + void wrapInChildContext_defaultsFalse() { + var config = WithRetryConfig.builder().build(); + + assertFalse(config.wrapInChildContext()); + } + + @Test + void wrapInChildContext_canBeEnabled() { + var config = WithRetryConfig.builder().wrapInChildContext(true).build(); + + assertTrue(config.wrapInChildContext()); + } + + @Test + void wrapInChildContext_canBeExplicitlyDisabled() { + var config = WithRetryConfig.builder().wrapInChildContext(false).build(); + + assertFalse(config.wrapInChildContext()); + } + + @Test + void builderChaining_withWrapInChildContext() { + var strategy = RetryStrategies.Presets.DEFAULT; + + var config = WithRetryConfig.builder() + .retryStrategy(strategy) + .wrapInChildContext(true) + .build(); + + assertEquals(strategy, config.retryStrategy()); + assertTrue(config.wrapInChildContext()); + } +} diff --git a/sdk/src/test/java/software/amazon/lambda/durable/context/DurableContextWithRetryTest.java b/sdk/src/test/java/software/amazon/lambda/durable/context/DurableContextWithRetryTest.java new file mode 100644 index 000000000..5f0c2c9aa --- /dev/null +++ b/sdk/src/test/java/software/amazon/lambda/durable/context/DurableContextWithRetryTest.java @@ -0,0 +1,675 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.context; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableFuture; +import software.amazon.lambda.durable.TypeToken; +import software.amazon.lambda.durable.config.WithRetryConfig; +import software.amazon.lambda.durable.exception.SerDesException; +import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; +import software.amazon.lambda.durable.execution.SuspendExecutionException; +import software.amazon.lambda.durable.retry.RetryDecision; +import software.amazon.lambda.durable.retry.RetryStrategies; + +@SuppressWarnings("unchecked") +class DurableContextWithRetryTest { + + private DurableContext context; + private DurableContext childContext; + + @BeforeEach + void setUp() { + context = mock(DurableContext.class); + childContext = mock(DurableContext.class); + stubWithRetryMethods(context); + stubWithRetryMethods(childContext); + } + + /** + * Stubs the withRetry/withRetryAsync methods on a mock DurableContext so they execute the real retry loop logic. + * This is needed because withRetry/withRetryAsync are abstract interface methods, and mocks return null by default. + * + *

All forms always run in a child context (matching the real implementation which uses a virtual child context + * when {@code wrapInChildContext} is false, and a checkpointed child context when true). + */ + private void stubWithRetryMethods(DurableContext mock) { + // Sync form with config — always runs in a child context + when(mock.withRetry(any(), nullable(BiFunction.class), nullable(WithRetryConfig.class))) + .thenAnswer(invocation -> { + String name = invocation.getArgument(0); + BiFunction operation = invocation.getArgument(1); + WithRetryConfig config = invocation.getArgument(2); + Objects.requireNonNull(operation, "operation cannot be null"); + Objects.requireNonNull(config, "config cannot be null"); + var childContextName = name != null ? name : "retry"; + return mock.runInChildContextAsync( + childContextName, + new TypeToken() {}, + childCtx -> executeRetryLoop(childCtx, name, operation, config)) + .get(); + }); + + // Sync form without config — delegates to the 3-arg form with default config + when(mock.withRetry(any(), nullable(BiFunction.class))).thenAnswer(invocation -> { + String name = invocation.getArgument(0); + BiFunction operation = invocation.getArgument(1); + return mock.withRetry(name, operation, WithRetryConfig.builder().build()); + }); + + // Async form with config + when(mock.withRetryAsync(any(), nullable(BiFunction.class), nullable(WithRetryConfig.class))) + .thenAnswer(invocation -> { + String name = invocation.getArgument(0); + BiFunction operation = invocation.getArgument(1); + WithRetryConfig config = invocation.getArgument(2); + Objects.requireNonNull(operation, "operation cannot be null"); + Objects.requireNonNull(config, "config cannot be null"); + var childContextName = name != null ? name : "retry"; + return mock.runInChildContextAsync( + childContextName, + new TypeToken() {}, + childCtx -> executeRetryLoop(childCtx, name, operation, config)); + }); + + // Async form without config — delegates to the 3-arg form with default config + when(mock.withRetryAsync(any(), nullable(BiFunction.class))).thenAnswer(invocation -> { + String name = invocation.getArgument(0); + BiFunction operation = invocation.getArgument(1); + return mock.withRetryAsync( + name, operation, WithRetryConfig.builder().build()); + }); + } + + /** Replicates the retry loop logic from DurableContextImpl for test stubbing. */ + private static T executeRetryLoop( + DurableContext context, + String name, + BiFunction operation, + WithRetryConfig config) { + var attempt = 1; + while (true) { + try { + return operation.apply(attempt, context); + } catch (SuspendExecutionException | UnrecoverableDurableExecutionException e) { + throw e; + } catch (Exception e) { + var decision = config.retryStrategy().makeRetryDecision(e, attempt); + if (!decision.shouldRetry()) { + throw e; + } + var delay = decision.delay().isZero() ? Duration.ofSeconds(1) : decision.delay(); + var waitName = name != null ? name + "-backoff-" + attempt : "retry-backoff-" + attempt; + context.wait(waitName, delay); + attempt++; + } + } + } + + /** Stubs runInChildContextAsync to immediately execute the function with childContext. */ + private void stubChildContext(String name) { + when(context.runInChildContextAsync(eq(name), any(TypeToken.class), any())) + .thenAnswer(invocation -> { + Function func = invocation.getArgument(2); + var result = func.apply(childContext); + return (DurableFuture) () -> result; + }); + } + + /** Stubs runInChildContextAsync for any name to immediately execute the function with childContext. */ + private void stubChildContextAnyName() { + when(context.runInChildContextAsync(anyString(), any(TypeToken.class), any())) + .thenAnswer(invocation -> { + Function func = invocation.getArgument(2); + var result = func.apply(childContext); + return (DurableFuture) () -> result; + }); + } + + // --- Core retry logic (uses named form; retry behavior is identical for all forms) --- + + @Nested + class CoreRetryLogic { + + @BeforeEach + void setUpChildContext() { + stubChildContextAnyName(); + } + + @Test + void successOnFirstAttempt() { + var config = WithRetryConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build(); + + var result = context.withRetry("my-op", (attempt, ctx) -> "success", config); + + assertEquals("success", result); + } + + @Test + void retriesWithBackoffWaits() { + var callCount = new int[] {0}; + var config = WithRetryConfig.builder() + .retryStrategy((error, attempt) -> + attempt < 3 ? RetryDecision.retry(Duration.ofSeconds(5)) : RetryDecision.fail()) + .build(); + + var result = context.withRetry( + "my-op", + (attempt, ctx) -> { + callCount[0]++; + if (attempt < 3) { + throw new RuntimeException("fail-" + attempt); + } + return "success-on-3"; + }, + config); + + assertEquals("success-on-3", result); + assertEquals(3, callCount[0]); + verify(childContext).wait("my-op-backoff-1", Duration.ofSeconds(5)); + verify(childContext).wait("my-op-backoff-2", Duration.ofSeconds(5)); + verify(childContext, times(2)).wait(anyString(), any(Duration.class)); + } + + @Test + void rethrowsWhenRetryStrategyReturnsFail() { + var config = WithRetryConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build(); + + var exception = assertThrows( + RuntimeException.class, + () -> context.withRetry( + "my-op", + (attempt, ctx) -> { + throw new RuntimeException("terminal"); + }, + config)); + + assertEquals("terminal", exception.getMessage()); + verify(childContext, never()).wait(anyString(), any(Duration.class)); + } + + @Test + void rethrowsLastExceptionWhenAllRetriesExhausted() { + var config = WithRetryConfig.builder() + .retryStrategy(RetryStrategies.fixedDelay(3, Duration.ofSeconds(1))) + .build(); + + var thrown = assertThrows( + RuntimeException.class, + () -> context.withRetry( + "my-op", + (attempt, ctx) -> { + throw new RuntimeException("attempt-" + attempt); + }, + config)); + + assertEquals("attempt-3", thrown.getMessage()); + } + + @Test + void usesDefaultDelayWhenRetryDecisionDelayIsZero() { + var config = WithRetryConfig.builder() + .retryStrategy( + (error, attempt) -> attempt < 2 ? RetryDecision.retry(Duration.ZERO) : RetryDecision.fail()) + .build(); + + var callCount = new int[] {0}; + var result = context.withRetry( + "my-op", + (attempt, ctx) -> { + callCount[0]++; + if (attempt == 1) { + throw new RuntimeException("fail"); + } + return "ok"; + }, + config); + + assertEquals("ok", result); + verify(childContext).wait("my-op-backoff-1", Duration.ofSeconds(1)); + } + + @Test + void passesCorrectAttemptNumberToOperation() { + var attempts = new ArrayList(); + var config = WithRetryConfig.builder() + .retryStrategy((error, attempt) -> + attempt < 4 ? RetryDecision.retry(Duration.ofSeconds(1)) : RetryDecision.fail()) + .build(); + + context.withRetry( + "track", + (attempt, ctx) -> { + attempts.add(attempt); + if (attempt < 4) { + throw new RuntimeException("not yet"); + } + return "done"; + }, + config); + + assertEquals(4, attempts.size()); + assertEquals(1, attempts.get(0)); + assertEquals(2, attempts.get(1)); + assertEquals(3, attempts.get(2)); + assertEquals(4, attempts.get(3)); + } + + @Test + void passesErrorToRetryStrategy() { + var errors = new ArrayList(); + var config = WithRetryConfig.builder() + .retryStrategy((error, attempt) -> { + errors.add(error); + return attempt < 3 ? RetryDecision.retry(Duration.ofSeconds(1)) : RetryDecision.fail(); + }) + .build(); + + assertThrows( + RuntimeException.class, + () -> context.withRetry( + "my-op", + (attempt, ctx) -> { + throw new RuntimeException("error-" + attempt); + }, + config)); + + assertEquals(3, errors.size()); + assertEquals("error-1", errors.get(0).getMessage()); + assertEquals("error-2", errors.get(1).getMessage()); + assertEquals("error-3", errors.get(2).getMessage()); + } + + @Test + void respectsCustomDelayFromRetryDecision() { + var config = WithRetryConfig.builder() + .retryStrategy((error, attempt) -> RetryDecision.retry(Duration.ofSeconds(attempt * 10L))) + .build(); + + context.withRetry( + "my-op", + (attempt, ctx) -> { + if (attempt <= 2) { + throw new RuntimeException("fail"); + } + return "ok"; + }, + config); + + verify(childContext).wait("my-op-backoff-1", Duration.ofSeconds(10)); + verify(childContext).wait("my-op-backoff-2", Duration.ofSeconds(20)); + } + + @Test + void passesChildContextToOperation() { + var config = WithRetryConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build(); + + context.withRetry( + "my-op", + (attempt, ctx) -> { + assertSame(childContext, ctx); + return "verified"; + }, + config); + } + + @Test + void operationReturnsNull() { + var config = WithRetryConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build(); + + var result = context.withRetry( + "my-op", (BiFunction) (attempt, ctx) -> null, config); + + assertNull(result); + } + + @Test + void preservesCheckedExceptionSubclassType() { + var config = WithRetryConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build(); + + var original = new SerDesException("deserialization failed", new RuntimeException("bad json")); + + var thrown = assertThrows( + SerDesException.class, + () -> context.withRetry( + "my-op", + (attempt, ctx) -> { + throw original; + }, + config)); + + assertSame(original, thrown); + assertEquals("deserialization failed", thrown.getMessage()); + } + } + + // --- Exception propagation (SuspendExecution, Unrecoverable) --- + + @Nested + class ExceptionPropagation { + + @BeforeEach + void setUpChildContext() { + stubChildContextAnyName(); + } + + @Test + void propagatesSuspendExecutionExceptionWithoutRetrying() { + var config = WithRetryConfig.builder() + .retryStrategy((error, attempt) -> RetryDecision.retry(Duration.ofSeconds(1))) + .build(); + + assertThrows( + SuspendExecutionException.class, + () -> context.withRetry( + "my-op", + (attempt, ctx) -> { + throw new SuspendExecutionException(); + }, + config)); + + verify(childContext, never()).wait(anyString(), any(Duration.class)); + } + + @Test + void propagatesUnrecoverableDurableExecutionExceptionWithoutRetrying() { + var config = WithRetryConfig.builder() + .retryStrategy((error, attempt) -> RetryDecision.retry(Duration.ofSeconds(1))) + .build(); + + assertThrows( + UnrecoverableDurableExecutionException.class, + () -> context.withRetry( + "my-op", + (attempt, ctx) -> { + throw new UnrecoverableDurableExecutionException( + software.amazon.awssdk.services.lambda.model.ErrorObject.builder() + .errorMessage("unrecoverable") + .build()); + }, + config)); + + verify(childContext, never()).wait(anyString(), any(Duration.class)); + } + + @Test + void propagatesSuspendExecutionExceptionOnLaterAttempt() { + var config = WithRetryConfig.builder() + .retryStrategy((error, attempt) -> RetryDecision.retry(Duration.ofSeconds(1))) + .build(); + + assertThrows( + SuspendExecutionException.class, + () -> context.withRetry( + "my-op", + (attempt, ctx) -> { + if (attempt == 1) { + throw new RuntimeException("transient"); + } + throw new SuspendExecutionException(); + }, + config)); + + verify(childContext, times(1)).wait(anyString(), any(Duration.class)); + } + + @Test + void propagatesUnrecoverableDurableExecutionExceptionOnLaterAttempt() { + var config = WithRetryConfig.builder() + .retryStrategy((error, attempt) -> RetryDecision.retry(Duration.ofSeconds(1))) + .build(); + + assertThrows( + UnrecoverableDurableExecutionException.class, + () -> context.withRetry( + "my-op", + (attempt, ctx) -> { + if (attempt == 1) { + throw new RuntimeException("transient"); + } + throw new UnrecoverableDurableExecutionException( + software.amazon.awssdk.services.lambda.model.ErrorObject.builder() + .errorMessage("unrecoverable on attempt 2") + .build()); + }, + config)); + + verify(childContext, times(1)).wait(anyString(), any(Duration.class)); + } + } + + // --- Naming: named form uses the provided name, null-name form defaults to "retry" --- + + @Nested + class Naming { + + @Test + void namedFormUsesProvidedNameForChildContextAndBackoff() { + stubChildContext("my-op"); + var config = WithRetryConfig.builder() + .retryStrategy((error, attempt) -> + attempt < 2 ? RetryDecision.retry(Duration.ofSeconds(5)) : RetryDecision.fail()) + .build(); + + context.withRetry( + "my-op", + (attempt, ctx) -> { + if (attempt < 2) { + throw new RuntimeException("fail"); + } + return "ok"; + }, + config); + + verify(context).runInChildContextAsync(eq("my-op"), any(TypeToken.class), any()); + verify(childContext).wait("my-op-backoff-1", Duration.ofSeconds(5)); + } + + @Test + void nullNameFormDefaultsToRetryForChildContextAndBackoff() { + stubChildContext("retry"); + var config = WithRetryConfig.builder() + .retryStrategy((error, attempt) -> + attempt < 2 ? RetryDecision.retry(Duration.ofSeconds(2)) : RetryDecision.fail()) + .build(); + + context.withRetry( + null, + (attempt, ctx) -> { + if (attempt < 2) { + throw new RuntimeException("fail"); + } + return "ok"; + }, + config); + + verify(context).runInChildContextAsync(eq("retry"), any(TypeToken.class), any()); + verify(childContext).wait("retry-backoff-1", Duration.ofSeconds(2)); + } + } + + // --- Sync vs async: sync returns value, async returns DurableFuture --- + + @Nested + class SyncVsAsync { + + @BeforeEach + void setUpChildContext() { + stubChildContextAnyName(); + } + + @Test + void syncReturnsValueDirectly() { + var config = WithRetryConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build(); + + var result = context.withRetry("my-op", (attempt, ctx) -> "sync-value", config); + + assertEquals("sync-value", result); + } + + @Test + void asyncReturnsDurableFuture() { + var config = WithRetryConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build(); + + DurableFuture future = context.withRetryAsync("my-op", (attempt, ctx) -> "async-value", config); + + assertNotNull(future); + assertEquals("async-value", future.get()); + } + + @Test + void syncAndAsyncProduceSameResult() { + var config = WithRetryConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build(); + + var syncResult = context.withRetry("op", (attempt, ctx) -> "value", config); + var asyncResult = context.withRetryAsync("op", (attempt, ctx) -> "value", config) + .get(); + + assertEquals(syncResult, asyncResult); + } + + @Test + void asyncRetriesWithBackoff() { + var config = WithRetryConfig.builder() + .retryStrategy((error, attempt) -> + attempt < 3 ? RetryDecision.retry(Duration.ofSeconds(5)) : RetryDecision.fail()) + .build(); + + DurableFuture future = context.withRetryAsync( + "my-op", + (attempt, ctx) -> { + if (attempt < 3) { + throw new RuntimeException("fail-" + attempt); + } + return "success-on-3"; + }, + config); + + assertEquals("success-on-3", future.get()); + verify(childContext).wait("my-op-backoff-1", Duration.ofSeconds(5)); + verify(childContext).wait("my-op-backoff-2", Duration.ofSeconds(5)); + } + } + + // --- Null guards --- + + @Nested + class NullGuards { + + @Test + void syncNullOperationThrows() { + var config = WithRetryConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build(); + + assertThrows(NullPointerException.class, () -> context.withRetry("name", null, config)); + } + + @Test + void syncNullConfigThrows() { + assertThrows(NullPointerException.class, () -> context.withRetry("name", (a, ctx) -> "x", null)); + } + + @Test + void asyncNullOperationThrows() { + var config = WithRetryConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build(); + + assertThrows(NullPointerException.class, () -> context.withRetryAsync("name", null, config)); + } + + @Test + void asyncNullConfigThrows() { + assertThrows(NullPointerException.class, () -> context.withRetryAsync("name", (a, ctx) -> "x", null)); + } + } + + // --- Default config overloads (no WithRetryConfig parameter) --- + + @Nested + class DefaultConfigOverloads { + + @BeforeEach + void setUpChildContext() { + stubChildContextAnyName(); + } + + @Test + void syncWithRetryWithoutConfigSucceedsOnFirstAttempt() { + var result = context.withRetry("my-op", (attempt, ctx) -> "default-config-result"); + + assertEquals("default-config-result", result); + } + + @Test + void syncWithRetryWithoutConfigRetriesOnFailure() { + var callCount = new int[] {0}; + + var result = context.withRetry("my-op", (attempt, ctx) -> { + callCount[0]++; + if (attempt == 1) { + throw new RuntimeException("transient"); + } + return "recovered"; + }); + + assertEquals("recovered", result); + assertEquals(2, callCount[0]); + verify(childContext).wait(eq("my-op-backoff-1"), any(Duration.class)); + } + + @Test + void asyncWithRetryWithoutConfigSucceedsOnFirstAttempt() { + DurableFuture future = context.withRetryAsync("my-op", (attempt, ctx) -> "async-default"); + + assertNotNull(future); + assertEquals("async-default", future.get()); + } + + @Test + void asyncWithRetryWithoutConfigRetriesOnFailure() { + var callCount = new int[] {0}; + + DurableFuture future = context.withRetryAsync("my-op", (attempt, ctx) -> { + callCount[0]++; + if (attempt == 1) { + throw new RuntimeException("transient"); + } + return "async-recovered"; + }); + + assertEquals("async-recovered", future.get()); + assertEquals(2, callCount[0]); + verify(childContext).wait(eq("my-op-backoff-1"), any(Duration.class)); + } + } +}