Skip to content

Commit 4d9d3c7

Browse files
committed
fix: make sure serialized result is deserialize-able
1 parent e5c8860 commit 4d9d3c7

8 files changed

Lines changed: 288 additions & 8 deletions

File tree

examples/src/main/java/software/amazon/lambda/durable/examples/general/CustomConfigExample.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
* <li>Automatic region detection with fallback to us-east-1 for testing environments
3232
* <li>Environment variable credentials provider
3333
* <li>Custom SerDes with snake_case property naming
34+
* <li>Optional round-trip validation toggle for performance-sensitive workloads
3435
* </ul>
3536
*/
3637
public class CustomConfigExample extends DurableHandler<String, String> {
@@ -68,6 +69,8 @@ protected DurableConfig createConfiguration() {
6869
return DurableConfig.builder()
6970
.withDurableExecutionClient(durableClient)
7071
.withSerDes(customSerDes)
72+
// Disable the extra deserialize pass if your workload is sensitive to the added validation cost.
73+
.withSerializationRoundTripValidation(false)
7174
.build();
7275
}
7376

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public final class DurableConfig {
9898
private final LoggerConfig loggerConfig;
9999
private final PollingStrategy pollingStrategy;
100100
private final Duration checkpointDelay;
101+
private final boolean validateSerializationRoundTrip;
101102
private final PluginRunner pluginRunner;
102103

103104
private DurableConfig(Builder builder) {
@@ -109,6 +110,7 @@ private DurableConfig(Builder builder) {
109110
this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults);
110111
this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT);
111112
this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0));
113+
this.validateSerializationRoundTrip = builder.validateSerializationRoundTrip;
112114
this.pluginRunner = builder.plugins.isEmpty() ? PluginRunner.noOp() : new PluginRunner(builder.plugins);
113115

114116
validateConfiguration();
@@ -186,6 +188,19 @@ public Duration getCheckpointDelay() {
186188
return checkpointDelay;
187189
}
188190

191+
/**
192+
* Gets whether serialized operation data should be immediately deserialized to verify round-trip compatibility.
193+
*
194+
* <p>When enabled, the SDK validates serialized operation results and exceptions before checkpointing them. This
195+
* catches incompatible SerDes behavior early at the cost of an extra deserialize pass. Defaults to true, and custom
196+
* SerDes implementations are still expected to be round-trip safe even if this validation is disabled.
197+
*
198+
* @return true when round-trip serialization validation is enabled
199+
*/
200+
public boolean shouldValidateSerializationRoundTrip() {
201+
return validateSerializationRoundTrip;
202+
}
203+
189204
/**
190205
* Gets the plugin runner that dispatches lifecycle events to registered plugins.
191206
*
@@ -293,6 +308,7 @@ public static final class Builder {
293308
private LoggerConfig loggerConfig;
294309
private PollingStrategy pollingStrategy;
295310
private Duration checkpointDelay;
311+
private boolean validateSerializationRoundTrip = true;
296312
private List<DurableExecutionPlugin> plugins = new ArrayList<>();
297313

298314
public Builder() {}
@@ -403,6 +419,22 @@ public Builder withCheckpointDelay(Duration duration) {
403419
return this;
404420
}
405421

422+
/**
423+
* Controls whether the SDK immediately deserializes serialized results and exceptions to verify they can be
424+
* read back before checkpointing.
425+
*
426+
* <p>This validation is enabled by default. Disable it only to avoid the extra deserialize pass when the
427+
* additional safety check is too expensive for your workload. Custom SerDes implementations are still expected
428+
* to round-trip SDK-managed values correctly.
429+
*
430+
* @param validateSerializationRoundTrip true to validate serialized data with an immediate deserialize pass
431+
* @return This builder
432+
*/
433+
public Builder withSerializationRoundTripValidation(boolean validateSerializationRoundTrip) {
434+
this.validateSerializationRoundTrip = validateSerializationRoundTrip;
435+
return this;
436+
}
437+
406438
/**
407439
* Registers one or more plugins for lifecycle event instrumentation.
408440
*

sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public DurableConfig getConfiguration() {
117117
* .withDurableExecutionClient(durableClient)
118118
* .withSerDes(customSerDes) // Optional: custom SerDes for user data
119119
* .withExecutorService(customExecutor) // Optional: custom thread pool
120+
* .withSerializationRoundTripValidation(false) // Optional: skip extra validation deserialize pass
120121
* .build();
121122
* }
122123
* }</pre>

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ private void executeChildContext() {
149149
}
150150

151151
private void handleChildContextSuccess(T result) {
152+
var serialized = serializeResult(result);
153+
152154
if (replayChildren.get() || isVirtual || parentOperation != null && parentOperation.isOperationCompleted()) {
153155
// Skip checkpointing if
154156
// - parent ConcurrencyOperation has already completed, preventing race conditions where a child finishes
@@ -159,13 +161,11 @@ private void handleChildContextSuccess(T result) {
159161
cachedOperationResult.set(DeserializedOperationResult.succeeded(result));
160162
markAlreadyCompleted();
161163
} else {
162-
checkpointSuccess(result);
164+
checkpointSuccess(result, serialized);
163165
}
164166
}
165167

166-
private void checkpointSuccess(T result) {
167-
var serialized = serializeResult(result);
168-
168+
private void checkpointSuccess(T result, String serialized) {
169169
if (serialized == null || serialized.getBytes(StandardCharsets.UTF_8).length < LARGE_RESULT_THRESHOLD) {
170170
sendOperationUpdate(
171171
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serialized));

sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ protected T deserializeResult(String result) {
101101
* @return the serialized string
102102
*/
103103
protected String serializeResult(T result) {
104-
return resultSerDes.serialize(result);
104+
var serialized = resultSerDes.serialize(result);
105+
if (shouldValidateSerializationRoundTrip()) {
106+
deserializeResult(serialized);
107+
}
108+
return serialized;
105109
}
106110

107111
/**
@@ -110,8 +114,18 @@ protected String serializeResult(T result) {
110114
* @param throwable the exception to serialize
111115
* @return the serialized error object
112116
*/
117+
@SuppressWarnings("ThrowableNotThrown")
113118
protected ErrorObject serializeException(Throwable throwable) {
114-
return ExceptionHelper.buildErrorObject(throwable, resultSerDes);
119+
var error = ExceptionHelper.buildErrorObject(throwable, resultSerDes);
120+
if (shouldValidateSerializationRoundTrip()) {
121+
deserializeException(error);
122+
}
123+
return error;
124+
}
125+
126+
private boolean shouldValidateSerializationRoundTrip() {
127+
var config = getContext().getDurableConfig();
128+
return config == null || config.shouldValidateSerializationRoundTrip();
115129
}
116130

117131
/**

sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,24 @@ void testBuilder_WithCustomExecutorService() {
8787
assertNotNull(config.getSerDes());
8888
}
8989

90+
@Test
91+
void testBuilder_SerializationRoundTripValidationDefaultsToTrue() {
92+
var config =
93+
DurableConfig.builder().withDurableExecutionClient(mockClient).build();
94+
95+
assertTrue(config.shouldValidateSerializationRoundTrip());
96+
}
97+
98+
@Test
99+
void testBuilder_WithSerializationRoundTripValidationDisabled() {
100+
var config = DurableConfig.builder()
101+
.withDurableExecutionClient(mockClient)
102+
.withSerializationRoundTripValidation(false)
103+
.build();
104+
105+
assertFalse(config.shouldValidateSerializationRoundTrip());
106+
}
107+
90108
@Test
91109
void testBuilder_WithAllCustomComponents() {
92110
var config = DurableConfig.builder()
@@ -131,6 +149,7 @@ void testBuilder_FluentAPI() {
131149
assertSame(builder, builder.withDurableExecutionClient(mockClient));
132150
assertSame(builder, builder.withSerDes(mockSerDes));
133151
assertSame(builder, builder.withExecutorService(mockExecutor));
152+
assertSame(builder, builder.withSerializationRoundTripValidation(false));
134153
}
135154

136155
@Test

sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,30 @@
2424
import software.amazon.lambda.durable.context.DurableContextImpl;
2525
import software.amazon.lambda.durable.exception.ChildContextFailedException;
2626
import software.amazon.lambda.durable.exception.NonDeterministicExecutionException;
27+
import software.amazon.lambda.durable.exception.SerDesException;
2728
import software.amazon.lambda.durable.execution.ExecutionManager;
2829
import software.amazon.lambda.durable.execution.ThreadContext;
2930
import software.amazon.lambda.durable.execution.ThreadType;
3031
import software.amazon.lambda.durable.model.OperationIdentifier;
3132
import software.amazon.lambda.durable.model.OperationSubType;
3233
import software.amazon.lambda.durable.serde.JacksonSerDes;
34+
import software.amazon.lambda.durable.serde.SerDes;
3335

3436
/** Unit tests for ChildContextOperation. */
3537
class ChildContextOperationTest {
3638

39+
private static final class SerializationOnlySerDes implements SerDes {
40+
@Override
41+
public String serialize(Object value) {
42+
return "\"serialized\"";
43+
}
44+
45+
@Override
46+
public <T> T deserialize(String data, TypeToken<T> typeToken) {
47+
throw new SerDesException("cannot deserialize");
48+
}
49+
}
50+
3751
private static final JacksonSerDes SERDES = new JacksonSerDes();
3852

3953
private DurableContextImpl durableContext;
@@ -49,29 +63,42 @@ void setUp() {
4963
}
5064

5165
private DurableConfig createConfig() {
66+
return createConfig(true);
67+
}
68+
69+
private DurableConfig createConfig(boolean validateSerializationRoundTrip) {
5270
return DurableConfig.builder()
5371
.withExecutorService(Executors.newCachedThreadPool())
72+
.withSerializationRoundTripValidation(validateSerializationRoundTrip)
5473
.build();
5574
}
5675

5776
private static final OperationIdentifier OPERATION_IDENTIFIER =
5877
OperationIdentifier.of("1", "test-context", OperationSubType.RUN_IN_CHILD_CONTEXT);
5978

6079
private ChildContextOperation<String> createOperation(Function<DurableContext, String> func) {
80+
return createOperation(func, SERDES);
81+
}
82+
83+
private ChildContextOperation<String> createOperation(Function<DurableContext, String> func, SerDes serDes) {
6184
return new ChildContextOperation<>(
6285
OPERATION_IDENTIFIER,
6386
func,
6487
TypeToken.get(String.class),
65-
RunInChildContextConfig.builder().serDes(SERDES).build(),
88+
RunInChildContextConfig.builder().serDes(serDes).build(),
6689
durableContext);
6790
}
6891

6992
private ChildContextOperation<String> createVirtualOperation(Function<DurableContext, String> func) {
93+
return createVirtualOperation(func, SERDES);
94+
}
95+
96+
private ChildContextOperation<String> createVirtualOperation(Function<DurableContext, String> func, SerDes serDes) {
7097
return new ChildContextOperation<>(
7198
OPERATION_IDENTIFIER,
7299
func,
73100
TypeToken.get(String.class),
74-
RunInChildContextConfig.builder().serDes(SERDES).isVirtual(true).build(),
101+
RunInChildContextConfig.builder().serDes(serDes).isVirtual(true).build(),
75102
durableContext);
76103
}
77104

@@ -311,6 +338,40 @@ void childSkipsSuccessCheckpointWhenParentAlreadyCompleted() throws Exception {
311338
.sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED));
312339
}
313340

341+
/** Virtual child still validates result round-trip before skipping a success checkpoint. */
342+
@Test
343+
void virtualChildFailsWhenResultCannotBeDeserialized() throws Exception {
344+
when(executionManager.getOperationAndUpdateReplayState("1")).thenReturn(null);
345+
346+
var operation = createVirtualOperation(ctx -> "result", new SerializationOnlySerDes());
347+
operation.execute();
348+
Thread.sleep(200);
349+
350+
var thrown = assertThrows(ChildContextFailedException.class, operation::get);
351+
assertTrue(thrown.getMessage().contains(SerDesException.class.getName()));
352+
verify(executionManager, never())
353+
.sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED));
354+
verify(executionManager, never())
355+
.sendOperationUpdate(argThat(update -> update.action() == OperationAction.FAIL));
356+
}
357+
358+
/** Virtual child can skip round-trip validation when disabled in DurableConfig. */
359+
@Test
360+
void virtualChildSucceedsWhenResultValidationDisabled() throws Exception {
361+
when(executionManager.getOperationAndUpdateReplayState("1")).thenReturn(null);
362+
when(durableContext.getDurableConfig()).thenReturn(createConfig(false));
363+
364+
var operation = createVirtualOperation(ctx -> "result", new SerializationOnlySerDes());
365+
operation.execute();
366+
Thread.sleep(200);
367+
368+
assertEquals("result", operation.get());
369+
verify(executionManager, never())
370+
.sendOperationUpdate(argThat(update -> update.action() == OperationAction.SUCCEED));
371+
verify(executionManager, never())
372+
.sendOperationUpdate(argThat(update -> update.action() == OperationAction.FAIL));
373+
}
374+
314375
/** Child skips failure checkpoint when parent operation has already completed. */
315376
@Test
316377
void childSkipsFailureCheckpointWhenParentAlreadyCompleted() throws Exception {

0 commit comments

Comments
 (0)