Skip to content

Commit 39c518b

Browse files
committed
feat(sdk): Treat KMS exceptions as non-retryable customer errors
KMS exceptions from Lambda (e.g., KMSAccessDeniedException, KMSDisabledException) arrive as 502 errors during CheckpointDurableExecution and GetDurableExecutionState API calls. Unlike other 5xx errors, we do not want Durable Functions to retry invocations for these errors, as they would never self-resolve. Add DurableApiErrorClassifier in the exception package with a NON_RETRYABLE_ERROR_CODES set and classifyException method. Use it in CheckpointManager to wrap both checkpoint() and getExecutionState() calls so that KMS errors return UnrecoverableDurableExecutionException with status FAILED immediately. The classifier also handles 4xx (non-429) as non-retryable and Invalid Checkpoint Token as retryable, matching the JS SDK's classifyCheckpointError logic. The classifyException method could be further updated to return a new UnrecoverableDurableInvocationException for retryable Durable API errors (e.g., 429, 5xx), allowing the SDK to distinguish them from user code errors and crash the invocation for retry instead of misclassifying them as step failures. Testing — parameterized unit tests for all four KMS exceptions with realistic error messages, plus tests for 4xx, Invalid Checkpoint Token, non-token InvalidParameterValueException, 429, 500, non-KMS 502, error detail preservation, and two wiring tests verifying CheckpointManager delegates to the classifier for both API paths. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent 1f7de8a commit 39c518b

4 files changed

Lines changed: 344 additions & 34 deletions

File tree

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.exception;
4+
5+
import java.util.Set;
6+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
7+
import software.amazon.awssdk.services.lambda.model.ErrorObject;
8+
9+
/**
10+
* Classifies AWS service exceptions from Durable Execution API calls as execution-level (non-retryable) or
11+
* invocation-level (retryable).
12+
*
13+
* <p>Execution-level errors throw {@link UnrecoverableDurableExecutionException} to terminate the execution
14+
* immediately. These represent permanent customer-side issues (e.g., KMS key misconfiguration) that will not
15+
* self-resolve on retry.
16+
*
17+
* <p>Invocation-level errors are allowed to propagate, crashing the current Lambda invocation so the backend can retry
18+
* with a fresh invocation.
19+
*
20+
* <p>To add a new non-retryable error, add its error code to {@link #NON_RETRYABLE_ERROR_CODES}.
21+
*/
22+
public final class DurableApiErrorClassifier {
23+
24+
/**
25+
* Error codes that represent non-retryable customer errors. When a Durable Execution API call fails with one of
26+
* these error codes, the execution is terminated immediately.
27+
*
28+
* <p>These error codes are documented under the Lambda Invoke API but also apply to other Lambda APIs such as
29+
* {@code CheckpointDurableExecution} and {@code GetDurableExecutionState}.
30+
*
31+
* @see <a href="https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html">Lambda Invoke API — Errors
32+
* (reference for KMS exception names)</a>
33+
*/
34+
static final Set<String> NON_RETRYABLE_ERROR_CODES = Set.of(
35+
"KMSAccessDeniedException", "KMSDisabledException", "KMSInvalidStateException", "KMSNotFoundException");
36+
37+
/** HTTP 429 (Too Many Requests) indicates throttling — a transient condition that resolves on retry. */
38+
private static final int THROTTLING_STATUS_CODE = 429;
39+
40+
/**
41+
* Error code for invalid checkpoint token errors. These occur when the SDK uses a stale checkpoint token that has
42+
* been superseded by a newer invocation. Retrying with a fresh invocation resolves this.
43+
*
44+
* @see <a
45+
* href="https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-manager.ts">JS
46+
* SDK classifyCheckpointError</a>
47+
*/
48+
private static final String INVALID_CHECKPOINT_TOKEN_ERROR_CODE = "InvalidParameterValueException";
49+
50+
/**
51+
* Message prefix that distinguishes invalid checkpoint token errors from other
52+
* {@code InvalidParameterValueException} errors.
53+
*/
54+
private static final String INVALID_CHECKPOINT_TOKEN_MESSAGE_PREFIX = "Invalid Checkpoint Token";
55+
56+
private DurableApiErrorClassifier() {}
57+
58+
/**
59+
* Classifies the given exception and returns the appropriate exception to throw.
60+
*
61+
* <p>Returns {@link UnrecoverableDurableExecutionException} for non-retryable customer errors, or the original
62+
* exception for retryable errors.
63+
*
64+
* <p>Classification rules:
65+
*
66+
* <ul>
67+
* <li>Error code in {@link #NON_RETRYABLE_ERROR_CODES} → execution error (non-retryable)
68+
* <li>4xx + "Invalid Checkpoint Token" → invocation error (retryable, stale token resolves on retry)
69+
* <li>4xx (non-429) → execution error (non-retryable customer error)
70+
* <li>429, 5xx, unknown → invocation error (retryable)
71+
* </ul>
72+
*
73+
* @param e the AWS service exception from a Durable Execution API call
74+
* @return an {@link UnrecoverableDurableExecutionException} if non-retryable, or the original exception if
75+
* retryable
76+
*/
77+
public static RuntimeException classifyException(AwsServiceException e) {
78+
var errorCode = e.awsErrorDetails().errorCode();
79+
80+
// Non-retryable customer errors: execution is terminally broken (e.g., KMS key misconfiguration)
81+
if (NON_RETRYABLE_ERROR_CODES.contains(errorCode)) {
82+
return buildUnrecoverableDurableExecutionException(e);
83+
}
84+
85+
var statusCode = e.awsErrorDetails().sdkHttpResponse().statusCode();
86+
var message = e.getMessage();
87+
88+
// 4xx errors (excluding throttling) are non-retryable customer errors
89+
if (statusCode >= 400 && statusCode < 500 && statusCode != THROTTLING_STATUS_CODE) {
90+
// Stale checkpoint token: occurs when a newer invocation has superseded this one.
91+
// Retrying with a fresh invocation resolves this, so treat as retryable.
92+
if (INVALID_CHECKPOINT_TOKEN_ERROR_CODE.equals(errorCode)
93+
&& message != null
94+
&& message.startsWith(INVALID_CHECKPOINT_TOKEN_MESSAGE_PREFIX)) {
95+
return e;
96+
}
97+
return buildUnrecoverableDurableExecutionException(e);
98+
}
99+
100+
// 429 (throttling), 5xx (service errors), unknown — transient, retryable
101+
return e;
102+
}
103+
104+
private static UnrecoverableDurableExecutionException buildUnrecoverableDurableExecutionException(
105+
AwsServiceException e) {
106+
return new UnrecoverableDurableExecutionException(ErrorObject.builder()
107+
.errorType(e.awsErrorDetails().errorCode())
108+
.errorMessage(e.getMessage())
109+
.build());
110+
}
111+
}

sdk/src/main/java/software/amazon/lambda/durable/execution/CheckpointManager.java

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
import java.util.function.Consumer;
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
17+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
1718
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
1819
import software.amazon.awssdk.services.lambda.model.Operation;
1920
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
2021
import software.amazon.lambda.durable.DurableConfig;
22+
import software.amazon.lambda.durable.exception.DurableApiErrorClassifier;
2123
import software.amazon.lambda.durable.retry.PollingStrategies;
2224
import software.amazon.lambda.durable.retry.PollingStrategy;
2325

@@ -163,14 +165,18 @@ List<Operation> fetchAllPages(CheckpointUpdatedExecutionState checkpointUpdatedE
163165
var nextMarker = checkpointUpdatedExecutionState.nextMarker();
164166
while (nextMarker != null && !nextMarker.isEmpty()) {
165167
var startTime = System.nanoTime();
166-
var response = config.getDurableExecutionClient()
167-
.getExecutionState(durableExecutionArn, checkpointToken, nextMarker);
168-
logger.debug(
169-
"Durable getExecutionState API called (latency={}ns): {}.",
170-
System.nanoTime() - startTime,
171-
response);
172-
operations.addAll(response.operations());
173-
nextMarker = response.nextMarker();
168+
try {
169+
var response = config.getDurableExecutionClient()
170+
.getExecutionState(durableExecutionArn, checkpointToken, nextMarker);
171+
logger.debug(
172+
"Durable getExecutionState API called (latency={}ns): {}.",
173+
System.nanoTime() - startTime,
174+
response);
175+
operations.addAll(response.operations());
176+
nextMarker = response.nextMarker();
177+
} catch (AwsServiceException e) {
178+
throw DurableApiErrorClassifier.classifyException(e);
179+
}
174180
}
175181
return operations;
176182
}
@@ -187,35 +193,41 @@ private void checkpointBatch(List<OperationUpdate> updates) {
187193

188194
var startTime = System.nanoTime();
189195
logger.debug("Calling durable checkpoint API with {} updates: {}", updates.size(), request);
190-
var response = config.getDurableExecutionClient().checkpoint(durableExecutionArn, checkpointToken, request);
191-
logger.debug("Durable checkpoint API called (latency={}ns): {}.", System.nanoTime() - startTime, response);
192-
193-
// Notify callback of completion
194-
checkpointToken = response.checkpointToken();
195-
if (response.newExecutionState() != null) {
196-
// fetch all pages of operations
197-
var operations = fetchAllPages(response.newExecutionState());
198-
199-
var processStartTime = System.nanoTime();
200-
int completedFutures = 0;
196+
try {
197+
var response =
198+
config.getDurableExecutionClient().checkpoint(durableExecutionArn, checkpointToken, request);
201199
logger.debug(
202-
"Processing {} operations. ({} pending pollers)", operations.size(), pollingFutures.size());
203-
// call the callback
204-
callback.accept(operations);
205-
206-
// complete the registered pollingFutures
207-
for (var operation : operations) {
208-
var pollers = pollingFutures.remove(operation.id());
209-
if (pollers != null) {
210-
completedFutures += pollers.size();
211-
pollers.forEach(poller -> poller.complete(operation));
200+
"Durable checkpoint API called (latency={}ns): {}.", System.nanoTime() - startTime, response);
201+
202+
// Notify callback of completion
203+
checkpointToken = response.checkpointToken();
204+
if (response.newExecutionState() != null) {
205+
// fetch all pages of operations
206+
var operations = fetchAllPages(response.newExecutionState());
207+
208+
var processStartTime = System.nanoTime();
209+
int completedFutures = 0;
210+
logger.debug(
211+
"Processing {} operations. ({} pending pollers)", operations.size(), pollingFutures.size());
212+
// call the callback
213+
callback.accept(operations);
214+
215+
// complete the registered pollingFutures
216+
for (var operation : operations) {
217+
var pollers = pollingFutures.remove(operation.id());
218+
if (pollers != null) {
219+
completedFutures += pollers.size();
220+
pollers.forEach(poller -> poller.complete(operation));
221+
}
212222
}
223+
logger.debug(
224+
"{} operations processed and {} pollers completed (latency={}ns). ",
225+
operations.size(),
226+
completedFutures,
227+
System.nanoTime() - processStartTime);
213228
}
214-
logger.debug(
215-
"{} operations processed and {} pollers completed (latency={}ns). ",
216-
operations.size(),
217-
completedFutures,
218-
System.nanoTime() - processStartTime);
229+
} catch (AwsServiceException e) {
230+
throw DurableApiErrorClassifier.classifyException(e);
219231
}
220232
}
221233
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.exception;
4+
5+
import static org.junit.jupiter.api.Assertions.*;
6+
7+
import java.util.stream.Stream;
8+
import org.junit.jupiter.api.Test;
9+
import org.junit.jupiter.params.ParameterizedTest;
10+
import org.junit.jupiter.params.provider.Arguments;
11+
import org.junit.jupiter.params.provider.MethodSource;
12+
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
13+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
14+
import software.amazon.awssdk.http.SdkHttpResponse;
15+
16+
class DurableApiErrorClassifierTest {
17+
18+
@ParameterizedTest
19+
@MethodSource("kmsExceptions")
20+
void classifyException_kmsError_returnsUnrecoverableDurableExecutionException(String errorCode, String message) {
21+
var error = awsError(502, errorCode, message);
22+
var result = DurableApiErrorClassifier.classifyException(error);
23+
assertInstanceOf(UnrecoverableDurableExecutionException.class, result);
24+
var unrecoverable = (UnrecoverableDurableExecutionException) result;
25+
assertEquals(errorCode, unrecoverable.getErrorObject().errorType());
26+
assertTrue(unrecoverable.getErrorObject().errorMessage().contains(message));
27+
}
28+
29+
static Stream<Arguments> kmsExceptions() {
30+
return Stream.of(
31+
Arguments.of(
32+
"KMSAccessDeniedException",
33+
"Lambda was unable to decrypt the environment variables because KMS access was denied."
34+
+ " Please check the function's KMS key settings."
35+
+ " KMS Exception: AccessDeniedException"),
36+
Arguments.of(
37+
"KMSDisabledException",
38+
"Lambda was unable to decrypt the environment variables because the KMS key used is disabled."
39+
+ " Please check the function's KMS key settings."),
40+
Arguments.of(
41+
"KMSInvalidStateException",
42+
"Lambda was unable to decrypt the environment variables because the KMS key is in an invalid"
43+
+ " state for Decrypt. Please check the function's KMS key settings."),
44+
Arguments.of(
45+
"KMSNotFoundException",
46+
"Lambda was unable to decrypt the environment variables because the KMS key was not found."
47+
+ " Please check the function's KMS key settings."));
48+
}
49+
50+
@Test
51+
void classifyException_clientError4xx_returnsUnrecoverableDurableExecutionException() {
52+
var error = awsError(403, "AccessDeniedException", "User is not authorized");
53+
54+
var result = DurableApiErrorClassifier.classifyException(error);
55+
assertInstanceOf(UnrecoverableDurableExecutionException.class, result);
56+
}
57+
58+
@Test
59+
void classifyException_invalidCheckpointToken_returnsOriginalException() {
60+
var error = awsError(400, "InvalidParameterValueException", "Invalid Checkpoint Token: token expired");
61+
62+
var result = DurableApiErrorClassifier.classifyException(error);
63+
assertSame(error, result);
64+
}
65+
66+
@Test
67+
void classifyException_invalidParameterValueNonToken_returnsUnrecoverableDurableExecutionException() {
68+
var error = awsError(
69+
400,
70+
"InvalidParameterValueException",
71+
"The runtime parameter of python3.8 is no longer" + " supported for creating or updating functions.");
72+
73+
var result = DurableApiErrorClassifier.classifyException(error);
74+
assertInstanceOf(UnrecoverableDurableExecutionException.class, result);
75+
}
76+
77+
@Test
78+
void classifyException_throttled429_returnsOriginalException() {
79+
var error = awsError(429, "TooManyRequestsException", "Rate exceeded");
80+
81+
var result = DurableApiErrorClassifier.classifyException(error);
82+
assertSame(error, result);
83+
}
84+
85+
@Test
86+
void classifyException_serverError500_returnsOriginalException() {
87+
var error = awsError(500, "ServiceException", "Service encountered an error");
88+
89+
var result = DurableApiErrorClassifier.classifyException(error);
90+
assertSame(error, result);
91+
}
92+
93+
@Test
94+
void classifyException_nonMatchingErrorCode502_returnsOriginalException() {
95+
var error = awsError(502, "ServiceException", "Service unavailable");
96+
97+
var result = DurableApiErrorClassifier.classifyException(error);
98+
assertSame(error, result);
99+
}
100+
101+
@Test
102+
void classifyException_errorDetailsPreserved() {
103+
var error = awsError(
104+
502,
105+
"KMSAccessDeniedException",
106+
"Lambda was unable to decrypt the environment variables because KMS access was denied."
107+
+ " Please check the function's KMS key settings."
108+
+ " KMS Exception: AccessDeniedException");
109+
110+
var result = DurableApiErrorClassifier.classifyException(error);
111+
assertInstanceOf(UnrecoverableDurableExecutionException.class, result);
112+
113+
var unrecoverable = (UnrecoverableDurableExecutionException) result;
114+
assertEquals("KMSAccessDeniedException", unrecoverable.getErrorObject().errorType());
115+
assertTrue(unrecoverable
116+
.getErrorObject()
117+
.errorMessage()
118+
.contains("Lambda was unable to decrypt the environment variables"));
119+
}
120+
121+
private static AwsServiceException awsError(int statusCode, String errorCode, String message) {
122+
return AwsServiceException.builder()
123+
.message(message)
124+
.awsErrorDetails(AwsErrorDetails.builder()
125+
.errorCode(errorCode)
126+
.errorMessage(message)
127+
.sdkHttpResponse(
128+
SdkHttpResponse.builder().statusCode(statusCode).build())
129+
.build())
130+
.statusCode(statusCode)
131+
.build();
132+
}
133+
}

0 commit comments

Comments
 (0)