Skip to content

Commit ae4aa9f

Browse files
committed
fix: make sure serialized result is deserialize-able
1 parent e5c8860 commit ae4aa9f

12 files changed

Lines changed: 378 additions & 32 deletions

File tree

examples/src/main/java/software/amazon/lambda/durable/examples/general/CustomConfigExample.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
* <li>Automatic region detection with fallback to us-east-1 for testing environments
3232
* <li>Environment variable credentials provider
3333
* <li>Custom SerDes with snake_case property naming
34+
* <li>Optional round-trip validation toggle for performance-sensitive workloads
3435
* </ul>
3536
*/
3637
public class CustomConfigExample extends DurableHandler<String, String> {
@@ -68,6 +69,8 @@ protected DurableConfig createConfiguration() {
6869
return DurableConfig.builder()
6970
.withDurableExecutionClient(durableClient)
7071
.withSerDes(customSerDes)
72+
// Disable the extra deserialize pass if your workload is sensitive to the added validation cost.
73+
.withSerializationRoundTripValidation(false)
7174
.build();
7275
}
7376

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public final class DurableConfig {
9898
private final LoggerConfig loggerConfig;
9999
private final PollingStrategy pollingStrategy;
100100
private final Duration checkpointDelay;
101+
private final boolean validateSerializationRoundTrip;
101102
private final PluginRunner pluginRunner;
102103

103104
private DurableConfig(Builder builder) {
@@ -109,6 +110,7 @@ private DurableConfig(Builder builder) {
109110
this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults);
110111
this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT);
111112
this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0));
113+
this.validateSerializationRoundTrip = builder.validateSerializationRoundTrip;
112114
this.pluginRunner = builder.plugins.isEmpty() ? PluginRunner.noOp() : new PluginRunner(builder.plugins);
113115

114116
validateConfiguration();
@@ -186,6 +188,19 @@ public Duration getCheckpointDelay() {
186188
return checkpointDelay;
187189
}
188190

191+
/**
192+
* Gets whether serialized operation data should be immediately deserialized to verify round-trip compatibility.
193+
*
194+
* <p>When enabled, the SDK validates serialized operation results and exceptions before checkpointing them. This
195+
* catches incompatible SerDes behavior early at the cost of an extra deserialize pass. Defaults to true, and custom
196+
* SerDes implementations are still expected to be round-trip safe even if this validation is disabled.
197+
*
198+
* @return true when round-trip serialization validation is enabled
199+
*/
200+
public boolean shouldValidateSerializationRoundTrip() {
201+
return validateSerializationRoundTrip;
202+
}
203+
189204
/**
190205
* Gets the plugin runner that dispatches lifecycle events to registered plugins.
191206
*
@@ -293,6 +308,7 @@ public static final class Builder {
293308
private LoggerConfig loggerConfig;
294309
private PollingStrategy pollingStrategy;
295310
private Duration checkpointDelay;
311+
private boolean validateSerializationRoundTrip = true;
296312
private List<DurableExecutionPlugin> plugins = new ArrayList<>();
297313

298314
public Builder() {}
@@ -403,6 +419,22 @@ public Builder withCheckpointDelay(Duration duration) {
403419
return this;
404420
}
405421

422+
/**
423+
* Controls whether the SDK immediately deserializes serialized results and exceptions to verify they can be
424+
* read back before checkpointing.
425+
*
426+
* <p>This validation is enabled by default. Disable it only to avoid the extra deserialize pass when the
427+
* additional safety check is too expensive for your workload. Custom SerDes implementations are still expected
428+
* to round-trip SDK-managed values correctly.
429+
*
430+
* @param validateSerializationRoundTrip true to validate serialized data with an immediate deserialize pass
431+
* @return This builder
432+
*/
433+
public Builder withSerializationRoundTripValidation(boolean validateSerializationRoundTrip) {
434+
this.validateSerializationRoundTrip = validateSerializationRoundTrip;
435+
return this;
436+
}
437+
406438
/**
407439
* Registers one or more plugins for lifecycle event instrumentation.
408440
*

sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public DurableConfig getConfiguration() {
117117
* .withDurableExecutionClient(durableClient)
118118
* .withSerDes(customSerDes) // Optional: custom SerDes for user data
119119
* .withExecutorService(customExecutor) // Optional: custom thread pool
120+
* .withSerializationRoundTripValidation(false) // Optional: skip extra validation deserialize pass
120121
* .build();
121122
* }
122123
* }</pre>

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -149,30 +149,31 @@ private void executeChildContext() {
149149
}
150150

151151
private void handleChildContextSuccess(T result) {
152+
var serializedResult = serializeResultWithDeserializedValue(result);
153+
152154
if (replayChildren.get() || isVirtual || parentOperation != null && parentOperation.isOperationCompleted()) {
153155
// Skip checkpointing if
154156
// - parent ConcurrencyOperation has already completed, preventing race conditions where a child finishes
155157
// after the parent has already completed.
156158
// - replaying a SUCCEEDED child with replayChildren=true — skip checkpointing.
157159
// - nestingType is FLAT
158160
// Mark the completableFuture completed so get() doesn't block waiting for a checkpoint response.
159-
cachedOperationResult.set(DeserializedOperationResult.succeeded(result));
161+
cachedOperationResult.set(DeserializedOperationResult.succeeded(serializedResult.deserialized()));
160162
markAlreadyCompleted();
161163
} else {
162-
checkpointSuccess(result);
164+
checkpointSuccess(serializedResult);
163165
}
164166
}
165167

166-
private void checkpointSuccess(T result) {
167-
var serialized = serializeResult(result);
168-
168+
private void checkpointSuccess(SerializedResult<T> serializedResult) {
169+
var serialized = serializedResult.serialized();
169170
if (serialized == null || serialized.getBytes(StandardCharsets.UTF_8).length < LARGE_RESULT_THRESHOLD) {
170171
sendOperationUpdate(
171172
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serialized));
172173
} else {
173174
// Large result: checkpoint with empty payload + ReplayChildren flag.
174175
// Store the result so get() can return it directly without deserializing the empty payload.
175-
cachedOperationResult.set(DeserializedOperationResult.succeeded(result));
176+
cachedOperationResult.set(DeserializedOperationResult.succeeded(serializedResult.deserialized()));
176177
sendOperationUpdate(OperationUpdate.builder()
177178
.action(OperationAction.SUCCEED)
178179
.payload("")

sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,22 +177,22 @@ protected void replay(Operation existing) {
177177

178178
@Override
179179
protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletionStatus) {
180-
this.cachedResult = constructMapResult(concurrencyCompletionStatus);
181-
var serialized = serializeResult(cachedResult);
182-
var serializedBytes = serialized.getBytes(StandardCharsets.UTF_8);
180+
var serializedResult = serializeResultWithDeserializedValue(constructMapResult(concurrencyCompletionStatus));
181+
this.cachedResult = serializedResult.deserialized();
182+
var serializedBytes = serializedResult.serialized().getBytes(StandardCharsets.UTF_8);
183183

184184
if (serializedBytes.length < LARGE_RESULT_THRESHOLD) {
185185
sendOperationUpdate(OperationUpdate.builder()
186186
.action(OperationAction.SUCCEED)
187187
.subType(getSubType().getValue())
188-
.payload(serialized));
188+
.payload(serializedResult.serialized()));
189189
} else {
190190
// Large result: checkpoint with stripped payload + replayChildren flag
191-
var strippedResult = serializeResult(stripMapResult(cachedResult));
191+
var strippedResult = serializeResultWithDeserializedValue(stripMapResult(cachedResult));
192192
sendOperationUpdate(OperationUpdate.builder()
193193
.action(OperationAction.SUCCEED)
194194
.subType(getSubType().getValue())
195-
.payload(strippedResult)
195+
.payload(strippedResult.serialized())
196196
.contextOptions(
197197
ContextOptions.builder().replayChildren(true).build()));
198198
}

sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,17 @@ protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletio
7676
int failedCount = Math.toIntExact(
7777
statuses.stream().filter(s -> s == ParallelResult.Status.FAILED).count());
7878
int skippedCount = items.size() - succeededCount - failedCount;
79-
cachedResult = new ParallelResult(
79+
var result = new ParallelResult(
8080
items.size(), succeededCount, failedCount, skippedCount, concurrencyCompletionStatus, statuses);
81+
var serializedResult = serializeResultWithDeserializedValue(result);
82+
cachedResult = serializedResult.deserialized();
8183

8284
// Branches added after checkpoint will not exist in the checkpointed result, but they'll be in the returned
8385
// value from get() method.
8486
sendOperationUpdate(OperationUpdate.builder()
8587
.action(OperationAction.SUCCEED)
8688
.subType(getSubType().getValue())
87-
.payload(serializeResult(cachedResult))
89+
.payload(serializedResult.serialized())
8890
.contextOptions(ContextOptions.builder().replayChildren(true).build()));
8991
}
9092

sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
public abstract class SerializableDurableOperation<T> extends BaseDurableOperation implements DurableFuture<T> {
3535
private static final Logger logger = LoggerFactory.getLogger(SerializableDurableOperation.class);
3636

37+
protected record SerializedResult<T>(String serialized, T deserialized) {}
38+
3739
private final TypeToken<T> resultTypeToken;
3840
private final SerDes resultSerDes;
3941

@@ -94,14 +96,13 @@ protected T deserializeResult(String result) {
9496
}
9597
}
9698

97-
/**
98-
* Serializes the result to a string.
99-
*
100-
* @param result the result to serialize
101-
* @return the serialized string
102-
*/
103-
protected String serializeResult(T result) {
104-
return resultSerDes.serialize(result);
99+
protected SerializedResult<T> serializeResultWithDeserializedValue(T result) {
100+
var serialized = resultSerDes.serialize(result);
101+
T deserialized = result;
102+
if (shouldValidateSerializationRoundTrip()) {
103+
deserialized = deserializeResult(serialized);
104+
}
105+
return new SerializedResult<>(serialized, deserialized);
105106
}
106107

107108
/**
@@ -110,8 +111,18 @@ protected String serializeResult(T result) {
110111
* @param throwable the exception to serialize
111112
* @return the serialized error object
112113
*/
114+
@SuppressWarnings("ThrowableNotThrown")
113115
protected ErrorObject serializeException(Throwable throwable) {
114-
return ExceptionHelper.buildErrorObject(throwable, resultSerDes);
116+
var error = ExceptionHelper.buildErrorObject(throwable, resultSerDes);
117+
if (shouldValidateSerializationRoundTrip()) {
118+
deserializeException(error);
119+
}
120+
return error;
121+
}
122+
123+
private boolean shouldValidateSerializationRoundTrip() {
124+
var config = getContext().getDurableConfig();
125+
return config == null || config.shouldValidateSerializationRoundTrip();
115126
}
116127

117128
/**

sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,9 @@ private void checkpointStarted() {
145145

146146
private void handleStepSucceeded(T result) {
147147
// Send SUCCEED
148+
var serializedResult = serializeResultWithDeserializedValue(result);
148149
var successUpdate =
149-
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializeResult(result));
150+
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializedResult.serialized());
150151

151152
// sendOperationUpdate must be synchronous here. When waiting for the return of this call,
152153
// the context threads waiting for the result of this step operation will be wakened up and registered.

sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,15 @@ private void executeCheckLogic(T currentState, int attempt) {
128128
// Execute check function in user executor
129129
WaitForConditionResult<T> result = checkFunc.apply(currentState, stepContext);
130130

131-
// Serialize/deserialize round-trip on the value to ensure state is checkpoint-safe
132-
var serializedState = serializeResult(result.value());
133-
T deserializedValue = deserializeResult(serializedState);
131+
// Continue with the checkpoint-shaped state so first execution matches replay.
132+
var serializedState = serializeResultWithDeserializedValue(result.value());
133+
T deserializedValue = serializedState.deserialized();
134134

135135
if (result.isDone()) {
136136
// Condition met — checkpoint SUCCEED
137137
var successUpdate = OperationUpdate.builder()
138138
.action(OperationAction.SUCCEED)
139-
.payload(serializedState);
139+
.payload(serializedState.serialized());
140140
sendOperationUpdate(successUpdate);
141141
} else {
142142
// Compute delay from strategy
@@ -145,7 +145,7 @@ private void executeCheckLogic(T currentState, int attempt) {
145145
// Checkpoint RETRY with delay
146146
var retryUpdate = OperationUpdate.builder()
147147
.action(OperationAction.RETRY)
148-
.payload(serializedState)
148+
.payload(serializedState.serialized())
149149
.stepOptions(StepOptions.builder()
150150
.nextAttemptDelaySeconds(Math.toIntExact(delay.toSeconds()))
151151
.build());

sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,24 @@ void testBuilder_WithCustomExecutorService() {
8787
assertNotNull(config.getSerDes());
8888
}
8989

90+
@Test
91+
void testBuilder_SerializationRoundTripValidationDefaultsToTrue() {
92+
var config =
93+
DurableConfig.builder().withDurableExecutionClient(mockClient).build();
94+
95+
assertTrue(config.shouldValidateSerializationRoundTrip());
96+
}
97+
98+
@Test
99+
void testBuilder_WithSerializationRoundTripValidationDisabled() {
100+
var config = DurableConfig.builder()
101+
.withDurableExecutionClient(mockClient)
102+
.withSerializationRoundTripValidation(false)
103+
.build();
104+
105+
assertFalse(config.shouldValidateSerializationRoundTrip());
106+
}
107+
90108
@Test
91109
void testBuilder_WithAllCustomComponents() {
92110
var config = DurableConfig.builder()
@@ -131,6 +149,7 @@ void testBuilder_FluentAPI() {
131149
assertSame(builder, builder.withDurableExecutionClient(mockClient));
132150
assertSame(builder, builder.withSerDes(mockSerDes));
133151
assertSame(builder, builder.withExecutorService(mockExecutor));
152+
assertSame(builder, builder.withSerializationRoundTripValidation(false));
134153
}
135154

136155
@Test

0 commit comments

Comments
 (0)