Skip to content

Commit 1d48a17

Browse files
committed
fix: return deserialized operation results
1 parent 4d9d3c7 commit 1d48a17

13 files changed

Lines changed: 142 additions & 72 deletions

File tree

docs/migration-1.x-to-2.x.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,27 +185,29 @@ assertThrows(IllegalStateException.class, future::get);
185185
What changes in practice:
186186

187187
- Serialization problems now fail on first execution instead of surfacing later on replay.
188+
- Operation results can be returned after a SerDes round-trip so first execution matches replay.
188189
- Custom `SerDes` implementations must be able to deserialize SDK-managed values they serialize.
189190
- Child-context results are validated consistently, including virtual child-context paths.
190191

191192
This is usually a correctness improvement, but it can surface previously hidden `SerDes` bugs during upgrade.
192193

193194
### New opt-out configuration
194195

195-
If your workload is very performance-sensitive and you need to skip the extra validation deserialize pass, you can opt out:
196+
If your workload is very performance-sensitive and you need to skip the extra deserialize pass, you can opt out:
196197

197198
```java
198199
@Override
199200
protected DurableConfig createConfiguration() {
200201
return DurableConfig.builder()
201-
.withSerializationRoundTripValidation(false)
202+
.withDeserializeAfterSerialization(false)
202203
.build();
203204
}
204205
```
205206

206207
Use that carefully:
207208

208-
- Disabling validation can hide serialization bugs until replay.
209+
- Disabling this can hide serialization bugs until replay.
210+
- First execution may return the raw result shape instead of the replay result shape.
209211
- Custom `SerDes` implementations are still expected to be round-trip safe.
210212

211213
## Recommended Validation After Upgrading
@@ -226,6 +228,6 @@ Most upgrades are straightforward:
226228
- Logger metadata moves to `executionArn`, `operationId`, and `operationName`
227229
- Replay-sensitive logging becomes per-context, `isReplaying()` moves to `DurableContext`, and step logs are no longer replay-suppressed
228230
- Validation failures now throw `IllegalStateException`
229-
- Serialization round-trip problems surface earlier by default, with an opt-out via `withSerializationRoundTripValidation(false)`
231+
- Serialization round-trip problems surface earlier by default, with an opt-out via `withDeserializeAfterSerialization(false)`
230232

231233
If you update those areas first, the `1.x` to `2.x` migration should be low risk.

examples/src/main/java/software/amazon/lambda/durable/examples/general/CustomConfigExample.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* <li>Automatic region detection with fallback to us-east-1 for testing environments
3232
* <li>Environment variable credentials provider
3333
* <li>Custom SerDes with snake_case property naming
34-
* <li>Optional round-trip validation toggle for performance-sensitive workloads
34+
* <li>Optional post-serialization deserialization toggle for performance-sensitive workloads
3535
* </ul>
3636
*/
3737
public class CustomConfigExample extends DurableHandler<String, String> {
@@ -69,8 +69,8 @@ protected DurableConfig createConfiguration() {
6969
return DurableConfig.builder()
7070
.withDurableExecutionClient(durableClient)
7171
.withSerDes(customSerDes)
72-
// Disable the extra deserialize pass if your workload is sensitive to the added validation cost.
73-
.withSerializationRoundTripValidation(false)
72+
// Disable the extra deserialize pass if your workload is sensitive to the added cost.
73+
.withDeserializeAfterSerialization(false)
7474
.build();
7575
}
7676

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public final class DurableConfig {
9898
private final LoggerConfig loggerConfig;
9999
private final PollingStrategy pollingStrategy;
100100
private final Duration checkpointDelay;
101-
private final boolean validateSerializationRoundTrip;
101+
private final boolean deserializeAfterSerialization;
102102
private final PluginRunner pluginRunner;
103103

104104
private DurableConfig(Builder builder) {
@@ -110,7 +110,7 @@ private DurableConfig(Builder builder) {
110110
this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults);
111111
this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT);
112112
this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0));
113-
this.validateSerializationRoundTrip = builder.validateSerializationRoundTrip;
113+
this.deserializeAfterSerialization = builder.deserializeAfterSerialization;
114114
this.pluginRunner = builder.plugins.isEmpty() ? PluginRunner.noOp() : new PluginRunner(builder.plugins);
115115

116116
validateConfiguration();
@@ -189,16 +189,16 @@ public Duration getCheckpointDelay() {
189189
}
190190

191191
/**
192-
* Gets whether serialized operation data should be immediately deserialized to verify round-trip compatibility.
192+
* Gets whether serialized operation data should be deserialized immediately after serialization.
193193
*
194-
* <p>When enabled, the SDK validates serialized operation results and exceptions before checkpointing them. This
195-
* catches incompatible SerDes behavior early at the cost of an extra deserialize pass. Defaults to true, and custom
196-
* SerDes implementations are still expected to be round-trip safe even if this validation is disabled.
194+
* <p>When enabled, the SDK returns deserialized operation results so first execution matches replay, and validates
195+
* serialized exceptions before checkpointing them. Defaults to true, and custom SerDes implementations are still
196+
* expected to be round-trip safe even if this behavior is disabled.
197197
*
198-
* @return true when round-trip serialization validation is enabled
198+
* @return true when serialized data should be deserialized immediately
199199
*/
200-
public boolean shouldValidateSerializationRoundTrip() {
201-
return validateSerializationRoundTrip;
200+
public boolean shouldDeserializeAfterSerialization() {
201+
return deserializeAfterSerialization;
202202
}
203203

204204
/**
@@ -308,7 +308,7 @@ public static final class Builder {
308308
private LoggerConfig loggerConfig;
309309
private PollingStrategy pollingStrategy;
310310
private Duration checkpointDelay;
311-
private boolean validateSerializationRoundTrip = true;
311+
private boolean deserializeAfterSerialization = true;
312312
private List<DurableExecutionPlugin> plugins = new ArrayList<>();
313313

314314
public Builder() {}
@@ -420,18 +420,16 @@ public Builder withCheckpointDelay(Duration duration) {
420420
}
421421

422422
/**
423-
* Controls whether the SDK immediately deserializes serialized results and exceptions to verify they can be
424-
* read back before checkpointing.
423+
* Controls whether the SDK immediately deserializes serialized operation data after serialization.
425424
*
426-
* <p>This validation is enabled by default. Disable it only to avoid the extra deserialize pass when the
427-
* additional safety check is too expensive for your workload. Custom SerDes implementations are still expected
428-
* to round-trip SDK-managed values correctly.
425+
* <p>This is enabled by default. When enabled, operation results are returned after a SerDes round-trip so
426+
* first execution returns the same value shape as replay, and exceptions are checked before checkpointing.
429427
*
430-
* @param validateSerializationRoundTrip true to validate serialized data with an immediate deserialize pass
428+
* @param deserializeAfterSerialization true to deserialize serialized data immediately
431429
* @return This builder
432430
*/
433-
public Builder withSerializationRoundTripValidation(boolean validateSerializationRoundTrip) {
434-
this.validateSerializationRoundTrip = validateSerializationRoundTrip;
431+
public Builder withDeserializeAfterSerialization(boolean deserializeAfterSerialization) {
432+
this.deserializeAfterSerialization = deserializeAfterSerialization;
435433
return this;
436434
}
437435

sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public DurableConfig getConfiguration() {
117117
* .withDurableExecutionClient(durableClient)
118118
* .withSerDes(customSerDes) // Optional: custom SerDes for user data
119119
* .withExecutorService(customExecutor) // Optional: custom thread pool
120-
* .withSerializationRoundTripValidation(false) // Optional: skip extra validation deserialize pass
120+
* .withDeserializeAfterSerialization(false) // Optional: skip immediate deserialize pass
121121
* .build();
122122
* }
123123
* }</pre>

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private void executeChildContext() {
149149
}
150150

151151
private void handleChildContextSuccess(T result) {
152-
var serialized = serializeResult(result);
152+
var serializedResult = serializeAndDeserializeResult(result);
153153

154154
if (replayChildren.get() || isVirtual || parentOperation != null && parentOperation.isOperationCompleted()) {
155155
// Skip checkpointing if
@@ -158,10 +158,10 @@ private void handleChildContextSuccess(T result) {
158158
// - replaying a SUCCEEDED child with replayChildren=true — skip checkpointing.
159159
// - nestingType is FLAT
160160
// Mark the completableFuture completed so get() doesn't block waiting for a checkpoint response.
161-
cachedOperationResult.set(DeserializedOperationResult.succeeded(result));
161+
cachedOperationResult.set(DeserializedOperationResult.succeeded(serializedResult.deserialized()));
162162
markAlreadyCompleted();
163163
} else {
164-
checkpointSuccess(result, serialized);
164+
checkpointSuccess(serializedResult.deserialized(), serializedResult.serialized());
165165
}
166166
}
167167

sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,21 +178,22 @@ protected void replay(Operation existing) {
178178
@Override
179179
protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletionStatus) {
180180
this.cachedResult = constructMapResult(concurrencyCompletionStatus);
181-
var serialized = serializeResult(cachedResult);
182-
var serializedBytes = serialized.getBytes(StandardCharsets.UTF_8);
181+
var serializedResult = serializeAndDeserializeResult(cachedResult);
182+
this.cachedResult = serializedResult.deserialized();
183+
var serializedBytes = serializedResult.serialized().getBytes(StandardCharsets.UTF_8);
183184

184185
if (serializedBytes.length < LARGE_RESULT_THRESHOLD) {
185186
sendOperationUpdate(OperationUpdate.builder()
186187
.action(OperationAction.SUCCEED)
187188
.subType(getSubType().getValue())
188-
.payload(serialized));
189+
.payload(serializedResult.serialized()));
189190
} else {
190191
// Large result: checkpoint with stripped payload + replayChildren flag
191-
var strippedResult = serializeResult(stripMapResult(cachedResult));
192+
var strippedResult = serializeAndDeserializeResult(stripMapResult(cachedResult));
192193
sendOperationUpdate(OperationUpdate.builder()
193194
.action(OperationAction.SUCCEED)
194195
.subType(getSubType().getValue())
195-
.payload(strippedResult)
196+
.payload(strippedResult.serialized())
196197
.contextOptions(
197198
ContextOptions.builder().replayChildren(true).build()));
198199
}

sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,15 @@ protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletio
7878
int skippedCount = items.size() - succeededCount - failedCount;
7979
cachedResult = new ParallelResult(
8080
items.size(), succeededCount, failedCount, skippedCount, concurrencyCompletionStatus, statuses);
81+
var serializedResult = serializeAndDeserializeResult(cachedResult);
82+
cachedResult = serializedResult.deserialized();
8183

8284
// Branches added after checkpoint will not exist in the checkpointed result, but they'll be in the returned
8385
// value from get() method.
8486
sendOperationUpdate(OperationUpdate.builder()
8587
.action(OperationAction.SUCCEED)
8688
.subType(getSubType().getValue())
87-
.payload(serializeResult(cachedResult))
89+
.payload(serializedResult.serialized())
8890
.contextOptions(ContextOptions.builder().replayChildren(true).build()));
8991
}
9092

sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
public abstract class SerializableDurableOperation<T> extends BaseDurableOperation implements DurableFuture<T> {
3535
private static final Logger logger = LoggerFactory.getLogger(SerializableDurableOperation.class);
3636

37+
protected record SerializedResult<T>(String serialized, T deserialized) {}
38+
3739
private final TypeToken<T> resultTypeToken;
3840
private final SerDes resultSerDes;
3941

@@ -95,17 +97,18 @@ protected T deserializeResult(String result) {
9597
}
9698

9799
/**
98-
* Serializes the result to a string.
100+
* Serializes the result and returns the value that should be exposed to callers.
101+
*
102+
* <p>Use this for operations that cache a first-execution result instead of reading it back from checkpoint data.
103+
* This keeps first execution consistent with replay when a SerDes normalizes or otherwise changes the value.
99104
*
100105
* @param result the result to serialize
101-
* @return the serialized string
106+
* @return the serialized string and the deserialized result
102107
*/
103-
protected String serializeResult(T result) {
108+
protected SerializedResult<T> serializeAndDeserializeResult(T result) {
104109
var serialized = resultSerDes.serialize(result);
105-
if (shouldValidateSerializationRoundTrip()) {
106-
deserializeResult(serialized);
107-
}
108-
return serialized;
110+
var deserialized = shouldDeserializeAfterSerialization() ? deserializeResult(serialized) : result;
111+
return new SerializedResult<>(serialized, deserialized);
109112
}
110113

111114
/**
@@ -117,15 +120,15 @@ protected String serializeResult(T result) {
117120
@SuppressWarnings("ThrowableNotThrown")
118121
protected ErrorObject serializeException(Throwable throwable) {
119122
var error = ExceptionHelper.buildErrorObject(throwable, resultSerDes);
120-
if (shouldValidateSerializationRoundTrip()) {
123+
if (shouldDeserializeAfterSerialization()) {
121124
deserializeException(error);
122125
}
123126
return error;
124127
}
125128

126-
private boolean shouldValidateSerializationRoundTrip() {
129+
private boolean shouldDeserializeAfterSerialization() {
127130
var config = getContext().getDurableConfig();
128-
return config == null || config.shouldValidateSerializationRoundTrip();
131+
return config == null || config.shouldDeserializeAfterSerialization();
129132
}
130133

131134
/**

sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,11 @@ private void checkpointStarted() {
144144
}
145145

146146
private void handleStepSucceeded(T result) {
147+
var serializedResult = serializeAndDeserializeResult(result);
148+
147149
// Send SUCCEED
148150
var successUpdate =
149-
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializeResult(result));
151+
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializedResult.serialized());
150152

151153
// sendOperationUpdate must be synchronous here. When waiting for the return of this call,
152154
// the context threads waiting for the result of this step operation will be wakened up and registered.

sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,15 @@ private void executeCheckLogic(T currentState, int attempt) {
128128
// Execute check function in user executor
129129
WaitForConditionResult<T> result = checkFunc.apply(currentState, stepContext);
130130

131-
// Serialize/deserialize round-trip on the value to ensure state is checkpoint-safe
132-
var serializedState = serializeResult(result.value());
133-
T deserializedValue = deserializeResult(serializedState);
131+
// Normalize the value through SerDes so first execution matches replay.
132+
var serializedState = serializeAndDeserializeResult(result.value());
133+
T deserializedValue = serializedState.deserialized();
134134

135135
if (result.isDone()) {
136136
// Condition met — checkpoint SUCCEED
137137
var successUpdate = OperationUpdate.builder()
138138
.action(OperationAction.SUCCEED)
139-
.payload(serializedState);
139+
.payload(serializedState.serialized());
140140
sendOperationUpdate(successUpdate);
141141
} else {
142142
// Compute delay from strategy
@@ -145,7 +145,7 @@ private void executeCheckLogic(T currentState, int attempt) {
145145
// Checkpoint RETRY with delay
146146
var retryUpdate = OperationUpdate.builder()
147147
.action(OperationAction.RETRY)
148-
.payload(serializedState)
148+
.payload(serializedState.serialized())
149149
.stepOptions(StepOptions.builder()
150150
.nextAttemptDelaySeconds(Math.toIntExact(delay.toSeconds()))
151151
.build());

0 commit comments

Comments
 (0)