Skip to content

Commit c1fcea6

Browse files
authored
feat(sdk): Classify Durable API errors as retryable or non-retryable (e.g. customer KMS errors)
feat(sdk): Classify Durable API errors as retryable or non-retryable (e.g. customer KMS errors) The Java SDK currently fails the execution when encountering any error from CheckpointDurableExecution and GetDurableExecutionState API calls, including retryable errors like throttling (429) and service errors (5xx). KMS exceptions (e.g., KMSAccessDeniedException, KMSDisabledException) also arrive as 502 errors from these APIs but, unlike other 5xx errors, would never self-resolve on retry. Add DurableApiErrorClassifier in the util package with a NON_RETRYABLE_ERROR_CODES set and classifyException method. Use it in CheckpointManager to wrap both checkpoint() and getExecutionState() calls. Non-retryable errors (KMS, 4xx non-429) return UnrecoverableDurableExecutionException with retryable=false, terminating the execution immediately with status FAILED. Retryable errors (429, 5xx, invalid checkpoint token) return UnrecoverableDurableExecutionException with retryable=true, integrating with the retryable exception support from #350 so the backend can retry the invocation. The classifier also matches the JS SDK's classifyCheckpointError logic for 4xx and invalid checkpoint token handling. Testing — parameterized unit tests for all four KMS exceptions with realistic error messages, verifying retryable=false. Tests for retryable cases (429, 5xx, invalid checkpoint token, non-KMS 502) verify retryable=true. Additional tests for non-token InvalidParameterValueException, error detail preservation, and two wiring tests verifying CheckpointManager delegates to the classifier for both checkpoint and getExecutionState API paths. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent f4212dc commit c1fcea6

4 files changed

Lines changed: 347 additions & 34 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/execution/CheckpointManager.java

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414
import java.util.function.Consumer;
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
17+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
1718
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
1819
import software.amazon.awssdk.services.lambda.model.Operation;
1920
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
2021
import software.amazon.lambda.durable.DurableConfig;
2122
import software.amazon.lambda.durable.retry.PollingStrategies;
2223
import software.amazon.lambda.durable.retry.PollingStrategy;
24+
import software.amazon.lambda.durable.util.DurableApiErrorClassifier;
2325

2426
/**
2527
* Package-private checkpoint manager for batching and queueing checkpoint API calls.
@@ -163,14 +165,18 @@ List<Operation> fetchAllPages(CheckpointUpdatedExecutionState checkpointUpdatedE
163165
var nextMarker = checkpointUpdatedExecutionState.nextMarker();
164166
while (nextMarker != null && !nextMarker.isEmpty()) {
165167
var startTime = System.nanoTime();
166-
var response = config.getDurableExecutionClient()
167-
.getExecutionState(durableExecutionArn, checkpointToken, nextMarker);
168-
logger.debug(
169-
"Durable getExecutionState API called (latency={}ns): {}.",
170-
System.nanoTime() - startTime,
171-
response);
172-
operations.addAll(response.operations());
173-
nextMarker = response.nextMarker();
168+
try {
169+
var response = config.getDurableExecutionClient()
170+
.getExecutionState(durableExecutionArn, checkpointToken, nextMarker);
171+
logger.debug(
172+
"Durable getExecutionState API called (latency={}ns): {}.",
173+
System.nanoTime() - startTime,
174+
response);
175+
operations.addAll(response.operations());
176+
nextMarker = response.nextMarker();
177+
} catch (AwsServiceException e) {
178+
throw DurableApiErrorClassifier.classifyException(e);
179+
}
174180
}
175181
return operations;
176182
}
@@ -187,35 +193,41 @@ private void checkpointBatch(List<OperationUpdate> updates) {
187193

188194
var startTime = System.nanoTime();
189195
logger.debug("Calling durable checkpoint API with {} updates: {}", updates.size(), request);
190-
var response = config.getDurableExecutionClient().checkpoint(durableExecutionArn, checkpointToken, request);
191-
logger.debug("Durable checkpoint API called (latency={}ns): {}.", System.nanoTime() - startTime, response);
192-
193-
// Notify callback of completion
194-
checkpointToken = response.checkpointToken();
195-
if (response.newExecutionState() != null) {
196-
// fetch all pages of operations
197-
var operations = fetchAllPages(response.newExecutionState());
198-
199-
var processStartTime = System.nanoTime();
200-
int completedFutures = 0;
196+
try {
197+
var response =
198+
config.getDurableExecutionClient().checkpoint(durableExecutionArn, checkpointToken, request);
201199
logger.debug(
202-
"Processing {} operations. ({} pending pollers)", operations.size(), pollingFutures.size());
203-
// call the callback
204-
callback.accept(operations);
205-
206-
// complete the registered pollingFutures
207-
for (var operation : operations) {
208-
var pollers = pollingFutures.remove(operation.id());
209-
if (pollers != null) {
210-
completedFutures += pollers.size();
211-
pollers.forEach(poller -> poller.complete(operation));
200+
"Durable checkpoint API called (latency={}ns): {}.", System.nanoTime() - startTime, response);
201+
202+
// Notify callback of completion
203+
checkpointToken = response.checkpointToken();
204+
if (response.newExecutionState() != null) {
205+
// fetch all pages of operations
206+
var operations = fetchAllPages(response.newExecutionState());
207+
208+
var processStartTime = System.nanoTime();
209+
int completedFutures = 0;
210+
logger.debug(
211+
"Processing {} operations. ({} pending pollers)", operations.size(), pollingFutures.size());
212+
// call the callback
213+
callback.accept(operations);
214+
215+
// complete the registered pollingFutures
216+
for (var operation : operations) {
217+
var pollers = pollingFutures.remove(operation.id());
218+
if (pollers != null) {
219+
completedFutures += pollers.size();
220+
pollers.forEach(poller -> poller.complete(operation));
221+
}
212222
}
223+
logger.debug(
224+
"{} operations processed and {} pollers completed (latency={}ns). ",
225+
operations.size(),
226+
completedFutures,
227+
System.nanoTime() - processStartTime);
213228
}
214-
logger.debug(
215-
"{} operations processed and {} pollers completed (latency={}ns). ",
216-
operations.size(),
217-
completedFutures,
218-
System.nanoTime() - processStartTime);
229+
} catch (AwsServiceException e) {
230+
throw DurableApiErrorClassifier.classifyException(e);
219231
}
220232
}
221233
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.util;
4+
5+
import java.util.Set;
6+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
7+
import software.amazon.awssdk.services.lambda.model.ErrorObject;
8+
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
9+
10+
/**
11+
* Classifies AWS service exceptions from Durable Execution API calls as non-retryable or retryable.
12+
*
13+
* <p>Returns {@link UnrecoverableDurableExecutionException} with {@code retryable=false} for non-retryable customer
14+
* errors (e.g., KMS key misconfiguration), or {@code retryable=true} for retryable errors (throttling, server errors,
15+
* stale checkpoint tokens).
16+
*
17+
* <p>To add a new non-retryable error, add its error code to {@link #NON_RETRYABLE_ERROR_CODES}.
18+
*/
19+
public final class DurableApiErrorClassifier {
20+
21+
/**
22+
* Error codes that represent non-retryable customer errors. When a Durable Execution API call fails with one of
23+
* these error codes, the execution is terminated immediately.
24+
*
25+
* <p>These error codes are documented under the Lambda Invoke API but also apply to other Lambda APIs such as
26+
* {@code CheckpointDurableExecution} and {@code GetDurableExecutionState}.
27+
*
28+
* @see <a href="https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html">Lambda Invoke API — Errors
29+
* (reference for KMS exception names)</a>
30+
*/
31+
static final Set<String> NON_RETRYABLE_ERROR_CODES = Set.of(
32+
"KMSAccessDeniedException", "KMSDisabledException", "KMSInvalidStateException", "KMSNotFoundException");
33+
34+
/** HTTP 429 (Too Many Requests) indicates throttling — a transient condition that resolves on retry. */
35+
private static final int THROTTLING_STATUS_CODE = 429;
36+
37+
/**
38+
* Error code for invalid checkpoint token errors. These occur when the SDK uses a stale checkpoint token that has
39+
* been superseded by a newer invocation. Retrying with a fresh invocation resolves this.
40+
*
41+
* @see <a
42+
* href="https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-manager.ts">JS
43+
* SDK classifyCheckpointError</a>
44+
*/
45+
private static final String INVALID_CHECKPOINT_TOKEN_ERROR_CODE = "InvalidParameterValueException";
46+
47+
/**
48+
* Message prefix that distinguishes invalid checkpoint token errors from other
49+
* {@code InvalidParameterValueException} errors.
50+
*/
51+
private static final String INVALID_CHECKPOINT_TOKEN_MESSAGE_PREFIX = "Invalid Checkpoint Token";
52+
53+
private DurableApiErrorClassifier() {}
54+
55+
/**
56+
* Classifies the given exception and returns the appropriate exception to throw.
57+
*
58+
* <p>Returns {@link UnrecoverableDurableExecutionException} with {@code retryable=false} for non-retryable customer
59+
* errors, or {@code retryable=true} for retryable errors.
60+
*
61+
* <p>Classification rules:
62+
*
63+
* <ul>
64+
* <li>Error code in {@link #NON_RETRYABLE_ERROR_CODES} → non-retryable ({@code retryable=false})
65+
* <li>4xx + "Invalid Checkpoint Token" → retryable ({@code retryable=true}, stale token resolves on retry)
66+
* <li>4xx (non-429) → non-retryable ({@code retryable=false}, customer error)
67+
* <li>429, 5xx, unknown → retryable ({@code retryable=true})
68+
* </ul>
69+
*
70+
* @param e the AWS service exception from a Durable Execution API call
71+
* @return an {@link UnrecoverableDurableExecutionException} for all cases, with the retryable flag set accordingly
72+
*/
73+
public static UnrecoverableDurableExecutionException classifyException(AwsServiceException e) {
74+
var errorCode = e.awsErrorDetails().errorCode();
75+
76+
// Non-retryable customer errors: execution is terminally broken (e.g., KMS key misconfiguration)
77+
if (NON_RETRYABLE_ERROR_CODES.contains(errorCode)) {
78+
return buildUnrecoverableDurableExecutionException(e, false);
79+
}
80+
81+
var statusCode = e.awsErrorDetails().sdkHttpResponse().statusCode();
82+
var message = e.getMessage();
83+
84+
// 4xx errors (excluding throttling) are non-retryable customer errors
85+
if (statusCode >= 400 && statusCode < 500 && statusCode != THROTTLING_STATUS_CODE) {
86+
// Stale checkpoint token: occurs when a newer invocation has superseded this one.
87+
// Retrying with a fresh invocation resolves this, so treat as retryable.
88+
if (INVALID_CHECKPOINT_TOKEN_ERROR_CODE.equals(errorCode)
89+
&& message != null
90+
&& message.startsWith(INVALID_CHECKPOINT_TOKEN_MESSAGE_PREFIX)) {
91+
return buildUnrecoverableDurableExecutionException(e, true);
92+
}
93+
return buildUnrecoverableDurableExecutionException(e, false);
94+
}
95+
96+
// 429 (throttling), 5xx (service errors), unknown — transient, retryable
97+
return buildUnrecoverableDurableExecutionException(e, true);
98+
}
99+
100+
private static UnrecoverableDurableExecutionException buildUnrecoverableDurableExecutionException(
101+
AwsServiceException e, boolean retryable) {
102+
return new UnrecoverableDurableExecutionException(
103+
ErrorObject.builder()
104+
.errorType(e.awsErrorDetails().errorCode())
105+
.errorMessage(e.getMessage())
106+
.build(),
107+
retryable);
108+
}
109+
}

sdk/src/test/java/software/amazon/lambda/durable/execution/CheckpointManagerTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
import java.util.concurrent.atomic.AtomicInteger;
1616
import org.junit.jupiter.api.BeforeEach;
1717
import org.junit.jupiter.api.Test;
18+
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
19+
import software.amazon.awssdk.awscore.exception.AwsServiceException;
20+
import software.amazon.awssdk.http.SdkHttpResponse;
1821
import software.amazon.awssdk.services.lambda.model.CheckpointDurableExecutionResponse;
1922
import software.amazon.awssdk.services.lambda.model.CheckpointUpdatedExecutionState;
2023
import software.amazon.awssdk.services.lambda.model.GetDurableExecutionStateResponse;
@@ -25,6 +28,7 @@
2528
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
2629
import software.amazon.lambda.durable.DurableConfig;
2730
import software.amazon.lambda.durable.client.DurableExecutionClient;
31+
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
2832
import software.amazon.lambda.durable.retry.JitterStrategy;
2933
import software.amazon.lambda.durable.retry.PollingStrategies;
3034

@@ -613,4 +617,54 @@ void pollForUpdate_withFixedDelay_intervalsAreConsistent() throws Exception {
613617
+ "ms, intervals=" + intervals);
614618
}
615619
}
620+
621+
// --- Checkpoint error classification wiring tests ---
622+
623+
@Test
624+
void checkpointBatch_nonRetryableError_throwsUnrecoverable() {
625+
when(client.checkpoint(anyString(), anyString(), anyList()))
626+
.thenThrow(AwsServiceException.builder()
627+
.message("KMSAccessDeniedException: Lambda was unable to decrypt the environment variables")
628+
.awsErrorDetails(AwsErrorDetails.builder()
629+
.errorCode("KMSAccessDeniedException")
630+
.errorMessage("Lambda was unable to decrypt the environment variables")
631+
.sdkHttpResponse(SdkHttpResponse.builder()
632+
.statusCode(502)
633+
.build())
634+
.build())
635+
.statusCode(502)
636+
.build());
637+
638+
var future = batcher.checkpoint(OperationUpdate.builder()
639+
.id("op-1")
640+
.type(OperationType.STEP)
641+
.action(OperationAction.START)
642+
.build());
643+
644+
var ex = assertThrows(Exception.class, () -> future.get(200, TimeUnit.MILLISECONDS));
645+
assertInstanceOf(UnrecoverableDurableExecutionException.class, ex.getCause());
646+
}
647+
648+
@Test
649+
void fetchAllPages_nonRetryableError_throwsUnrecoverable() {
650+
when(client.getExecutionState(eq("arn:test"), eq("token-1"), eq("marker-1")))
651+
.thenThrow(AwsServiceException.builder()
652+
.message("KMSAccessDeniedException: Lambda was unable to decrypt the environment variables")
653+
.awsErrorDetails(AwsErrorDetails.builder()
654+
.errorCode("KMSAccessDeniedException")
655+
.errorMessage("Lambda was unable to decrypt the environment variables")
656+
.sdkHttpResponse(SdkHttpResponse.builder()
657+
.statusCode(502)
658+
.build())
659+
.build())
660+
.statusCode(502)
661+
.build());
662+
663+
var state = CheckpointUpdatedExecutionState.builder()
664+
.operations(List.of(Operation.builder().id("op-1").build()))
665+
.nextMarker("marker-1")
666+
.build();
667+
668+
assertThrows(UnrecoverableDurableExecutionException.class, () -> batcher.fetchAllPages(state));
669+
}
616670
}

0 commit comments

Comments
 (0)