Skip to content

Commit 1dff92e

Browse files
committed
fix: make sure serialized result is deserialize-able
1 parent 1468236 commit 1dff92e

12 files changed

Lines changed: 328 additions & 8 deletions

File tree

examples/handler.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from aws_durable_execution_sdk_python.config import Duration
2+
from aws_durable_execution_sdk_python.context import DurableContext, StepContext, durable_step
3+
from aws_durable_execution_sdk_python.execution import durable_execution
4+
from aws_durable_execution_sdk_python.lambda_service import LambdaClient
5+
import boto3
6+
7+
@durable_step
8+
def my_step(step_context: StepContext, my_arg: string) -> str:
9+
step_context.logger.info("Hello from my_step")
10+
return f"from my_step: {my_arg}"
11+
12+
def durable_handler(event, context) -> dict:
13+
msg: str = context.step(my_step(raw_event))
14+
15+
context.wait(Duration.from_seconds(10))
16+
17+
context.logger.info("Waited for 10 seconds without consuming CPU.")
18+
19+
return {
20+
"statusCode": 200,
21+
"body": raw_event,
22+
}
23+
24+
raw_event = None
25+
26+
botoclient = boto3.client("lambda",
27+
endpoint_url="https://gamma.lambda.aws.a2z.com",
28+
region_name="us-east-1",
29+
)
30+
31+
def lambda_handler(event, context):
32+
global raw_event
33+
raw_event = str(event)
34+
print("event: " + str(event))
35+
36+
result = durable_execution(boto3_client=botoclient)(durable_handler)(event, context)
37+
result["Result"] = raw_event
38+
return result
39+

examples/handler.zip

670 Bytes
Binary file not shown.

examples/output

Whitespace-only changes.

examples/output.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{'DurableExecutionArn': 'arn:aws:lambda:us-east-1:858039354069:function:myDurablePythonFunction2:$LATEST/durable-execution/8e89d3bc-455f-426f-97c8-72c565aac75a/cac7f0d9-6dc2-3f9b-bcf9-8666cf6af052', 'CheckpointToken': 'AgGAAXjQr2ytV2m8Vv5j81VX/+CvAAAAAQAHYXdzLWttcwBLYXJuOmF3czprbXM6dXMtZWFzdC0xOjMxMDI3MzUwNTMwNDprZXkvZGM2OWUzOGQtYmFjNy00NjY2LThiY2QtZjliYzBjMjBlYTBiALgBAgEAeFOOxgoIuAmgS5wl8hGyK5/Q6zA+mRvvS6BWlEQhnQGVATQLMQpKFE1fNLy2FWNPuWEAAAB+MHwGCSqGSIb3DQEHBqBvMG0CAQAwaAYJKoZIhvcNAQcBMB4GCWCGSAFlAwQBLjARBAyJ1kwrOrnTlRVAWrkCARCAO0c0v/dWH34T7wD63MS35+5edzXgJxJAPSnD0BmhV/zGq1u8bIjOJOYjo6cAGsf/PG153w7omAOgtQbgAgAAAAAMAAAQAAAAAAAAAAAAAAAAALmdpx86+Llkf5D4pUCe6RX/////AAAAAQAAAAAAAAAAAAAAAQAAAPUbq6CcIk55x6/RjhqokZApQWTYt+kyB1y+fWJhlH9vN4jy36nkefO5gyQJSy8nJQEY8BlmP/ETJhsKXhUoMg4UoloDos9TtKKXfIY1KZzcPGQfoLf0SwvSkCJQVwnM46dkjK9tpki4s4T2BXebYnw37XrPzVTnfnmqU60QjSs5+riKXDu9TDcDc9U0LKXKphaku06RXzg6HevXTumk40w0m5FBCLprphhW94oQgp0RM3ehAzAYuKzNSVPmE0l0Aii8Btie5qCtSA2JnDUT0qVvij08kYDG4XD6T9SPNDXA5mLucMwbzrOun7rI28AEwjMC8I5Lj2hFbu8WkTF7E1hghTBbI/k', 'InitialExecutionState': {'Operations': [{'Id': 'cac7f0d9-6dc2-3f9b-bcf9-8666cf6af052', 'Name': '8e89d3bc-455f-426f-97c8-72c565aac75a', 'Type': 'EXECUTION', 'StartTimestamp': 1781307233431, 'Status': 'STARTED', 'ExecutionDetails': {'InputPayload': '{}'}}]}, 'UpdatedOperationIds': []}

examples/src/main/java/software/amazon/lambda/durable/examples/general/CustomConfigExample.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
* <li>Automatic region detection with fallback to us-east-1 for testing environments
3232
* <li>Environment variable credentials provider
3333
* <li>Custom SerDes with snake_case property naming
34+
* <li>Optional round-trip validation toggle for performance-sensitive workloads
3435
* </ul>
3536
*/
3637
public class CustomConfigExample extends DurableHandler<String, String> {
@@ -68,6 +69,8 @@ protected DurableConfig createConfiguration() {
6869
return DurableConfig.builder()
6970
.withDurableExecutionClient(durableClient)
7071
.withSerDes(customSerDes)
72+
// Disable the extra deserialize pass if your workload is sensitive to the added validation cost.
73+
.withSerializationRoundTripValidation(false)
7174
.build();
7275
}
7376

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public final class DurableConfig {
9898
private final LoggerConfig loggerConfig;
9999
private final PollingStrategy pollingStrategy;
100100
private final Duration checkpointDelay;
101+
private final boolean validateSerializationRoundTrip;
101102
private final PluginRunner pluginRunner;
102103

103104
private DurableConfig(Builder builder) {
@@ -109,6 +110,7 @@ private DurableConfig(Builder builder) {
109110
this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults);
110111
this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT);
111112
this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0));
113+
this.validateSerializationRoundTrip = builder.validateSerializationRoundTrip;
112114
this.pluginRunner = builder.plugins.isEmpty() ? PluginRunner.noOp() : new PluginRunner(builder.plugins);
113115

114116
validateConfiguration();
@@ -186,6 +188,19 @@ public Duration getCheckpointDelay() {
186188
return checkpointDelay;
187189
}
188190

191+
/**
192+
* Gets whether serialized operation data should be immediately deserialized to verify round-trip compatibility.
193+
*
194+
* <p>When enabled, the SDK validates serialized operation results and exceptions before checkpointing them. This
195+
* catches incompatible SerDes behavior early at the cost of an extra deserialize pass. Defaults to true, and custom
196+
* SerDes implementations are still expected to be round-trip safe even if this validation is disabled.
197+
*
198+
* @return true when round-trip serialization validation is enabled
199+
*/
200+
public boolean shouldValidateSerializationRoundTrip() {
201+
return validateSerializationRoundTrip;
202+
}
203+
189204
/**
190205
* Gets the plugin runner that dispatches lifecycle events to registered plugins.
191206
*
@@ -293,6 +308,7 @@ public static final class Builder {
293308
private LoggerConfig loggerConfig;
294309
private PollingStrategy pollingStrategy;
295310
private Duration checkpointDelay;
311+
private boolean validateSerializationRoundTrip = true;
296312
private List<DurableExecutionPlugin> plugins = new ArrayList<>();
297313

298314
public Builder() {}
@@ -403,6 +419,22 @@ public Builder withCheckpointDelay(Duration duration) {
403419
return this;
404420
}
405421

422+
/**
423+
* Controls whether the SDK immediately deserializes serialized results and exceptions to verify they can be
424+
* read back before checkpointing.
425+
*
426+
* <p>This validation is enabled by default. Disable it only to avoid the extra deserialize pass when the
427+
* additional safety check is too expensive for your workload. Custom SerDes implementations are still expected
428+
* to round-trip SDK-managed values correctly.
429+
*
430+
* @param validateSerializationRoundTrip true to validate serialized data with an immediate deserialize pass
431+
* @return This builder
432+
*/
433+
public Builder withSerializationRoundTripValidation(boolean validateSerializationRoundTrip) {
434+
this.validateSerializationRoundTrip = validateSerializationRoundTrip;
435+
return this;
436+
}
437+
406438
/**
407439
* Registers one or more plugins for lifecycle event instrumentation.
408440
*

sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public DurableConfig getConfiguration() {
117117
* .withDurableExecutionClient(durableClient)
118118
* .withSerDes(customSerDes) // Optional: custom SerDes for user data
119119
* .withExecutorService(customExecutor) // Optional: custom thread pool
120+
* .withSerializationRoundTripValidation(false) // Optional: skip extra validation deserialize pass
120121
* .build();
121122
* }
122123
* }</pre>

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ private void executeChildContext() {
149149
}
150150

151151
private void handleChildContextSuccess(T result) {
152+
var serialized = serializeResult(result);
153+
152154
if (replayChildren.get() || isVirtual || parentOperation != null && parentOperation.isOperationCompleted()) {
153155
// Skip checkpointing if
154156
// - parent ConcurrencyOperation has already completed, preventing race conditions where a child finishes
@@ -159,13 +161,11 @@ private void handleChildContextSuccess(T result) {
159161
cachedOperationResult.set(DeserializedOperationResult.succeeded(result));
160162
markAlreadyCompleted();
161163
} else {
162-
checkpointSuccess(result);
164+
checkpointSuccess(result, serialized);
163165
}
164166
}
165167

166-
private void checkpointSuccess(T result) {
167-
var serialized = serializeResult(result);
168-
168+
private void checkpointSuccess(T result, String serialized) {
169169
if (serialized == null || serialized.getBytes(StandardCharsets.UTF_8).length < LARGE_RESULT_THRESHOLD) {
170170
sendOperationUpdate(
171171
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serialized));

sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ protected T deserializeResult(String result) {
101101
* @return the serialized string
102102
*/
103103
protected String serializeResult(T result) {
104-
return resultSerDes.serialize(result);
104+
var serialized = resultSerDes.serialize(result);
105+
if (shouldValidateSerializationRoundTrip()) {
106+
deserializeResult(serialized);
107+
}
108+
return serialized;
105109
}
106110

107111
/**
@@ -110,8 +114,18 @@ protected String serializeResult(T result) {
110114
* @param throwable the exception to serialize
111115
* @return the serialized error object
112116
*/
117+
@SuppressWarnings("ThrowableNotThrown")
113118
protected ErrorObject serializeException(Throwable throwable) {
114-
return ExceptionHelper.buildErrorObject(throwable, resultSerDes);
119+
var error = ExceptionHelper.buildErrorObject(throwable, resultSerDes);
120+
if (shouldValidateSerializationRoundTrip()) {
121+
deserializeException(error);
122+
}
123+
return error;
124+
}
125+
126+
private boolean shouldValidateSerializationRoundTrip() {
127+
var config = getContext().getDurableConfig();
128+
return config == null || config.shouldValidateSerializationRoundTrip();
115129
}
116130

117131
/**

sdk/src/test/java/software/amazon/lambda/durable/DurableConfigTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,24 @@ void testBuilder_WithCustomExecutorService() {
8787
assertNotNull(config.getSerDes());
8888
}
8989

90+
@Test
91+
void testBuilder_SerializationRoundTripValidationDefaultsToTrue() {
92+
var config =
93+
DurableConfig.builder().withDurableExecutionClient(mockClient).build();
94+
95+
assertTrue(config.shouldValidateSerializationRoundTrip());
96+
}
97+
98+
@Test
99+
void testBuilder_WithSerializationRoundTripValidationDisabled() {
100+
var config = DurableConfig.builder()
101+
.withDurableExecutionClient(mockClient)
102+
.withSerializationRoundTripValidation(false)
103+
.build();
104+
105+
assertFalse(config.shouldValidateSerializationRoundTrip());
106+
}
107+
90108
@Test
91109
void testBuilder_WithAllCustomComponents() {
92110
var config = DurableConfig.builder()
@@ -131,6 +149,7 @@ void testBuilder_FluentAPI() {
131149
assertSame(builder, builder.withDurableExecutionClient(mockClient));
132150
assertSame(builder, builder.withSerDes(mockSerDes));
133151
assertSame(builder, builder.withExecutorService(mockExecutor));
152+
assertSame(builder, builder.withSerializationRoundTripValidation(false));
134153
}
135154

136155
@Test

0 commit comments

Comments
 (0)