Skip to content

Commit e3fa433

Browse files
authored
fix: operations always return deserialized result (#480)
* fix: make sure serialized result is deserialize-able * fix: return deserialized operation results
1 parent a1d31f3 commit e3fa433

13 files changed

Lines changed: 379 additions & 29 deletions

File tree

docs/migration-1.x-to-2.x.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,27 +185,29 @@ assertThrows(IllegalStateException.class, future::get);
185185
What changes in practice:
186186

187187
- Serialization problems now fail on first execution instead of surfacing later on replay.
188+
- Operation results can be returned after a SerDes round-trip so first execution matches replay.
188189
- Custom `SerDes` implementations must be able to deserialize SDK-managed values they serialize.
189190
- Child-context results are validated consistently, including virtual child-context paths.
190191

191192
This is usually a correctness improvement, but it can surface previously hidden `SerDes` bugs during upgrade.
192193

193194
### New opt-out configuration
194195

195-
If your workload is very performance-sensitive and you need to skip the extra validation deserialize pass, you can opt out:
196+
If your workload is very performance-sensitive and you need to skip the extra deserialize pass, you can opt out:
196197

197198
```java
198199
@Override
199200
protected DurableConfig createConfiguration() {
200201
return DurableConfig.builder()
201-
.withSerializationRoundTripValidation(false)
202+
.withDeserializeAfterSerialization(false)
202203
.build();
203204
}
204205
```
205206

206207
Use that carefully:
207208

208-
- Disabling validation can hide serialization bugs until replay.
209+
- Disabling this can hide serialization bugs until replay.
210+
- First execution may return the raw result shape instead of the replay result shape.
209211
- Custom `SerDes` implementations are still expected to be round-trip safe.
210212

211213
## Recommended Validation After Upgrading
@@ -226,6 +228,6 @@ Most upgrades are straightforward:
226228
- Logger metadata moves to `executionArn`, `operationId`, and `operationName`
227229
- Replay-sensitive logging becomes per-context, `isReplaying()` moves to `DurableContext`, and step logs are no longer replay-suppressed
228230
- Validation failures now throw `IllegalStateException`
229-
- Serialization round-trip problems surface earlier by default, with an opt-out via `withSerializationRoundTripValidation(false)`
231+
- Serialization round-trip problems surface earlier by default, with an opt-out via `withDeserializeAfterSerialization(false)`
230232

231233
If you update those areas first, the `1.x` to `2.x` migration should be low risk.

examples/src/main/java/software/amazon/lambda/durable/examples/general/CustomConfigExample.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
* <li>Automatic region detection with fallback to us-east-1 for testing environments
3232
* <li>Environment variable credentials provider
3333
* <li>Custom SerDes with snake_case property naming
34+
* <li>Optional post-serialization deserialization toggle for performance-sensitive workloads
3435
* </ul>
3536
*/
3637
public class CustomConfigExample extends DurableHandler<String, String> {
@@ -68,6 +69,8 @@ protected DurableConfig createConfiguration() {
6869
return DurableConfig.builder()
6970
.withDurableExecutionClient(durableClient)
7071
.withSerDes(customSerDes)
72+
// Disable the extra deserialize pass if your workload is sensitive to the added cost.
73+
.withDeserializeAfterSerialization(false)
7174
.build();
7275
}
7376

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public final class DurableConfig {
9898
private final LoggerConfig loggerConfig;
9999
private final PollingStrategy pollingStrategy;
100100
private final Duration checkpointDelay;
101+
private final boolean deserializeAfterSerialization;
101102
private final PluginRunner pluginRunner;
102103

103104
private DurableConfig(Builder builder) {
@@ -109,6 +110,7 @@ private DurableConfig(Builder builder) {
109110
this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults);
110111
this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT);
111112
this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0));
113+
this.deserializeAfterSerialization = builder.deserializeAfterSerialization;
112114
this.pluginRunner = builder.plugins.isEmpty() ? PluginRunner.noOp() : new PluginRunner(builder.plugins);
113115

114116
validateConfiguration();
@@ -186,6 +188,19 @@ public Duration getCheckpointDelay() {
186188
return checkpointDelay;
187189
}
188190

191+
/**
192+
* Gets whether serialized operation data should be deserialized immediately after serialization.
193+
*
194+
* <p>When enabled, the SDK returns deserialized operation results so first execution matches replay, and validates
195+
* serialized exceptions before checkpointing them. Defaults to true, and custom SerDes implementations are still
196+
* expected to be round-trip safe even if this behavior is disabled.
197+
*
198+
* @return true when serialized data should be deserialized immediately
199+
*/
200+
public boolean shouldDeserializeAfterSerialization() {
201+
return deserializeAfterSerialization;
202+
}
203+
189204
/**
190205
* Gets the plugin runner that dispatches lifecycle events to registered plugins.
191206
*
@@ -293,6 +308,7 @@ public static final class Builder {
293308
private LoggerConfig loggerConfig;
294309
private PollingStrategy pollingStrategy;
295310
private Duration checkpointDelay;
311+
private boolean deserializeAfterSerialization = true;
296312
private List<DurableExecutionPlugin> plugins = new ArrayList<>();
297313

298314
public Builder() {}
@@ -403,6 +419,20 @@ public Builder withCheckpointDelay(Duration duration) {
403419
return this;
404420
}
405421

422+
/**
423+
* Controls whether the SDK immediately deserializes serialized operation data after serialization.
424+
*
425+
* <p>This is enabled by default. When enabled, operation results are returned after a SerDes round-trip so
426+
* first execution returns the same value shape as replay, and exceptions are checked before checkpointing.
427+
*
428+
* @param deserializeAfterSerialization true to deserialize serialized data immediately
429+
* @return This builder
430+
*/
431+
public Builder withDeserializeAfterSerialization(boolean deserializeAfterSerialization) {
432+
this.deserializeAfterSerialization = deserializeAfterSerialization;
433+
return this;
434+
}
435+
406436
/**
407437
* Registers one or more plugins for lifecycle event instrumentation.
408438
*

sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public DurableConfig getConfiguration() {
117117
* .withDurableExecutionClient(durableClient)
118118
* .withSerDes(customSerDes) // Optional: custom SerDes for user data
119119
* .withExecutorService(customExecutor) // Optional: custom thread pool
120+
* .withDeserializeAfterSerialization(false) // Optional: skip immediate deserialize pass
120121
* .build();
121122
* }
122123
* }</pre>

sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,26 +149,26 @@ private void executeChildContext() {
149149
}
150150

151151
private void handleChildContextSuccess(T result) {
152+
var serializedResult = serializeAndDeserializeResult(result);
153+
152154
if (replayChildren.get() || isVirtual || parentOperation != null && parentOperation.isOperationCompleted()) {
153155
// Skip checkpointing if
154156
// - parent ConcurrencyOperation has already completed, preventing race conditions where a child finishes
155157
// after the parent has already completed.
156158
// - replaying a SUCCEEDED child with replayChildren=true — skip checkpointing.
157159
// - nestingType is FLAT
158160
// Mark the completableFuture completed so get() doesn't block waiting for a checkpoint response.
159-
cachedOperationResult.set(DeserializedOperationResult.succeeded(result));
161+
cachedOperationResult.set(DeserializedOperationResult.succeeded(serializedResult.deserialized()));
160162
if (isVirtual) {
161163
fireOnOperationEnd(null, null);
162164
}
163165
markAlreadyCompleted();
164166
} else {
165-
checkpointSuccess(result);
167+
checkpointSuccess(serializedResult.deserialized(), serializedResult.serialized());
166168
}
167169
}
168170

169-
private void checkpointSuccess(T result) {
170-
var serialized = serializeResult(result);
171-
171+
private void checkpointSuccess(T result, String serialized) {
172172
if (serialized == null || serialized.getBytes(StandardCharsets.UTF_8).length < LARGE_RESULT_THRESHOLD) {
173173
sendOperationUpdate(
174174
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serialized));

sdk/src/main/java/software/amazon/lambda/durable/operation/MapOperation.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,21 +178,22 @@ protected void replay(Operation existing) {
178178
@Override
179179
protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletionStatus) {
180180
this.cachedResult = constructMapResult(concurrencyCompletionStatus);
181-
var serialized = serializeResult(cachedResult);
182-
var serializedBytes = serialized.getBytes(StandardCharsets.UTF_8);
181+
var serializedResult = serializeAndDeserializeResult(cachedResult);
182+
this.cachedResult = serializedResult.deserialized();
183+
var serializedBytes = serializedResult.serialized().getBytes(StandardCharsets.UTF_8);
183184

184185
if (serializedBytes.length < LARGE_RESULT_THRESHOLD) {
185186
sendOperationUpdate(OperationUpdate.builder()
186187
.action(OperationAction.SUCCEED)
187188
.subType(getSubType().getValue())
188-
.payload(serialized));
189+
.payload(serializedResult.serialized()));
189190
} else {
190191
// Large result: checkpoint with stripped payload + replayChildren flag
191-
var strippedResult = serializeResult(stripMapResult(cachedResult));
192+
var strippedResult = serializeAndDeserializeResult(stripMapResult(cachedResult));
192193
sendOperationUpdate(OperationUpdate.builder()
193194
.action(OperationAction.SUCCEED)
194195
.subType(getSubType().getValue())
195-
.payload(strippedResult)
196+
.payload(strippedResult.serialized())
196197
.contextOptions(
197198
ContextOptions.builder().replayChildren(true).build()));
198199
}

sdk/src/main/java/software/amazon/lambda/durable/operation/ParallelOperation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,15 @@ protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletio
7878
int skippedCount = items.size() - succeededCount - failedCount;
7979
cachedResult = new ParallelResult(
8080
items.size(), succeededCount, failedCount, skippedCount, concurrencyCompletionStatus, statuses);
81+
var serializedResult = serializeAndDeserializeResult(cachedResult);
82+
cachedResult = serializedResult.deserialized();
8183

8284
// Branches added after checkpoint will not exist in the checkpointed result, but they'll be in the returned
8385
// value from get() method.
8486
sendOperationUpdate(OperationUpdate.builder()
8587
.action(OperationAction.SUCCEED)
8688
.subType(getSubType().getValue())
87-
.payload(serializeResult(cachedResult))
89+
.payload(serializedResult.serialized())
8890
.contextOptions(ContextOptions.builder().replayChildren(true).build()));
8991
}
9092

sdk/src/main/java/software/amazon/lambda/durable/operation/SerializableDurableOperation.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
public abstract class SerializableDurableOperation<T> extends BaseDurableOperation implements DurableFuture<T> {
3535
private static final Logger logger = LoggerFactory.getLogger(SerializableDurableOperation.class);
3636

37+
protected record SerializedResult<T>(String serialized, T deserialized) {}
38+
3739
private final TypeToken<T> resultTypeToken;
3840
private final SerDes resultSerDes;
3941

@@ -95,13 +97,18 @@ protected T deserializeResult(String result) {
9597
}
9698

9799
/**
98-
* Serializes the result to a string.
100+
* Serializes the result and returns the value that should be exposed to callers.
101+
*
102+
* <p>Use this for operations that cache a first-execution result instead of reading it back from checkpoint data.
103+
* This keeps first execution consistent with replay when a SerDes normalizes or otherwise changes the value.
99104
*
100105
* @param result the result to serialize
101-
* @return the serialized string
106+
* @return the serialized string and the deserialized result
102107
*/
103-
protected String serializeResult(T result) {
104-
return resultSerDes.serialize(result);
108+
protected SerializedResult<T> serializeAndDeserializeResult(T result) {
109+
var serialized = resultSerDes.serialize(result);
110+
var deserialized = shouldDeserializeAfterSerialization() ? deserializeResult(serialized) : result;
111+
return new SerializedResult<>(serialized, deserialized);
105112
}
106113

107114
/**
@@ -110,8 +117,18 @@ protected String serializeResult(T result) {
110117
* @param throwable the exception to serialize
111118
* @return the serialized error object
112119
*/
120+
@SuppressWarnings("ThrowableNotThrown")
113121
protected ErrorObject serializeException(Throwable throwable) {
114-
return ExceptionHelper.buildErrorObject(throwable, resultSerDes);
122+
var error = ExceptionHelper.buildErrorObject(throwable, resultSerDes);
123+
if (shouldDeserializeAfterSerialization()) {
124+
deserializeException(error);
125+
}
126+
return error;
127+
}
128+
129+
private boolean shouldDeserializeAfterSerialization() {
130+
var config = getContext().getDurableConfig();
131+
return config == null || config.shouldDeserializeAfterSerialization();
115132
}
116133

117134
/**

sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,11 @@ private void checkpointStarted() {
144144
}
145145

146146
private void handleStepSucceeded(T result) {
147+
var serializedResult = serializeAndDeserializeResult(result);
148+
147149
// Send SUCCEED
148150
var successUpdate =
149-
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializeResult(result));
151+
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializedResult.serialized());
150152

151153
// sendOperationUpdate must be synchronous here. When waiting for the return of this call,
152154
// the context threads waiting for the result of this step operation will be wakened up and registered.

sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,15 @@ private void executeCheckLogic(T currentState, int attempt) {
128128
// Execute check function in user executor
129129
WaitForConditionResult<T> result = checkFunc.apply(currentState, stepContext);
130130

131-
// Serialize/deserialize round-trip on the value to ensure state is checkpoint-safe
132-
var serializedState = serializeResult(result.value());
133-
T deserializedValue = deserializeResult(serializedState);
131+
// Normalize the value through SerDes so first execution matches replay.
132+
var serializedState = serializeAndDeserializeResult(result.value());
133+
T deserializedValue = serializedState.deserialized();
134134

135135
if (result.isDone()) {
136136
// Condition met — checkpoint SUCCEED
137137
var successUpdate = OperationUpdate.builder()
138138
.action(OperationAction.SUCCEED)
139-
.payload(serializedState);
139+
.payload(serializedState.serialized());
140140
sendOperationUpdate(successUpdate);
141141
} else {
142142
// Compute delay from strategy
@@ -145,7 +145,7 @@ private void executeCheckLogic(T currentState, int attempt) {
145145
// Checkpoint RETRY with delay
146146
var retryUpdate = OperationUpdate.builder()
147147
.action(OperationAction.RETRY)
148-
.payload(serializedState)
148+
.payload(serializedState.serialized())
149149
.stepOptions(StepOptions.builder()
150150
.nextAttemptDelaySeconds(Math.toIntExact(delay.toSeconds()))
151151
.build());

0 commit comments

Comments
 (0)