Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* <li>Automatic region detection with fallback to us-east-1 for testing environments
* <li>Environment variable credentials provider
* <li>Custom SerDes with snake_case property naming
* <li>Optional round-trip validation toggle for performance-sensitive workloads
* </ul>
*/
public class CustomConfigExample extends DurableHandler<String, String> {
Expand Down Expand Up @@ -68,6 +69,8 @@ protected DurableConfig createConfiguration() {
return DurableConfig.builder()
.withDurableExecutionClient(durableClient)
.withSerDes(customSerDes)
// Disable the extra deserialize pass if your workload is sensitive to the added validation cost.
.withSerializationRoundTripValidation(false)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public final class DurableConfig {
private final LoggerConfig loggerConfig;
private final PollingStrategy pollingStrategy;
private final Duration checkpointDelay;
private final boolean validateSerializationRoundTrip;
private final PluginRunner pluginRunner;

private DurableConfig(Builder builder) {
Expand All @@ -109,6 +110,7 @@ private DurableConfig(Builder builder) {
this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults);
this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT);
this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0));
this.validateSerializationRoundTrip = builder.validateSerializationRoundTrip;
this.pluginRunner = builder.plugins.isEmpty() ? PluginRunner.noOp() : new PluginRunner(builder.plugins);

validateConfiguration();
Expand Down Expand Up @@ -186,6 +188,19 @@ public Duration getCheckpointDelay() {
return checkpointDelay;
}

/**
* Gets whether serialized operation data should be immediately deserialized to verify round-trip compatibility.
*
* <p>When enabled, the SDK validates serialized operation results and exceptions before checkpointing them. This
* catches incompatible SerDes behavior early at the cost of an extra deserialize pass. Defaults to true, and custom
* SerDes implementations are still expected to be round-trip safe even if this validation is disabled.
*
* @return true when round-trip serialization validation is enabled
*/
public boolean shouldValidateSerializationRoundTrip() {
return validateSerializationRoundTrip;
}

/**
* Gets the plugin runner that dispatches lifecycle events to registered plugins.
*
Expand Down Expand Up @@ -293,6 +308,7 @@ public static final class Builder {
private LoggerConfig loggerConfig;
private PollingStrategy pollingStrategy;
private Duration checkpointDelay;
private boolean validateSerializationRoundTrip = true;
private List<DurableExecutionPlugin> plugins = new ArrayList<>();

public Builder() {}
Expand Down Expand Up @@ -403,6 +419,22 @@ public Builder withCheckpointDelay(Duration duration) {
return this;
}

/**
* Controls whether the SDK immediately deserializes serialized results and exceptions to verify they can be
* read back before checkpointing.
*
* <p>This validation is enabled by default. Disable it only to avoid the extra deserialize pass when the
* additional safety check is too expensive for your workload. Custom SerDes implementations are still expected
* to round-trip SDK-managed values correctly.
*
* @param validateSerializationRoundTrip true to validate serialized data with an immediate deserialize pass
* @return This builder
*/
public Builder withSerializationRoundTripValidation(boolean validateSerializationRoundTrip) {
this.validateSerializationRoundTrip = validateSerializationRoundTrip;
return this;
}

/**
* Registers one or more plugins for lifecycle event instrumentation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public DurableConfig getConfiguration() {
* .withDurableExecutionClient(durableClient)
* .withSerDes(customSerDes) // Optional: custom SerDes for user data
* .withExecutorService(customExecutor) // Optional: custom thread pool
* .withSerializationRoundTripValidation(false) // Optional: skip extra validation deserialize pass
* .build();
* }
* }</pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,30 +149,31 @@ private void executeChildContext() {
}

private void handleChildContextSuccess(T result) {
var serializedResult = serializeResultWithDeserializedValue(result);

if (replayChildren.get() || isVirtual || parentOperation != null && parentOperation.isOperationCompleted()) {
// Skip checkpointing if
// - parent ConcurrencyOperation has already completed, preventing race conditions where a child finishes
// after the parent has already completed.
// - replaying a SUCCEEDED child with replayChildren=true — skip checkpointing.
// - nestingType is FLAT
// Mark the completableFuture completed so get() doesn't block waiting for a checkpoint response.
cachedOperationResult.set(DeserializedOperationResult.succeeded(result));
cachedOperationResult.set(DeserializedOperationResult.succeeded(serializedResult.deserialized()));
markAlreadyCompleted();
} else {
checkpointSuccess(result);
checkpointSuccess(serializedResult);
}
}

private void checkpointSuccess(T result) {
var serialized = serializeResult(result);

private void checkpointSuccess(SerializedResult<T> serializedResult) {
var serialized = serializedResult.serialized();
if (serialized == null || serialized.getBytes(StandardCharsets.UTF_8).length < LARGE_RESULT_THRESHOLD) {
sendOperationUpdate(
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serialized));
} else {
// Large result: checkpoint with empty payload + ReplayChildren flag.
// Store the result so get() can return it directly without deserializing the empty payload.
cachedOperationResult.set(DeserializedOperationResult.succeeded(result));
cachedOperationResult.set(DeserializedOperationResult.succeeded(serializedResult.deserialized()));
sendOperationUpdate(OperationUpdate.builder()
.action(OperationAction.SUCCEED)
.payload("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,22 +177,22 @@ protected void replay(Operation existing) {

@Override
protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletionStatus) {
this.cachedResult = constructMapResult(concurrencyCompletionStatus);
var serialized = serializeResult(cachedResult);
var serializedBytes = serialized.getBytes(StandardCharsets.UTF_8);
var serializedResult = serializeResultWithDeserializedValue(constructMapResult(concurrencyCompletionStatus));
this.cachedResult = serializedResult.deserialized();
var serializedBytes = serializedResult.serialized().getBytes(StandardCharsets.UTF_8);

if (serializedBytes.length < LARGE_RESULT_THRESHOLD) {
sendOperationUpdate(OperationUpdate.builder()
.action(OperationAction.SUCCEED)
.subType(getSubType().getValue())
.payload(serialized));
.payload(serializedResult.serialized()));
} else {
// Large result: checkpoint with stripped payload + replayChildren flag
var strippedResult = serializeResult(stripMapResult(cachedResult));
var strippedResult = serializeResultWithDeserializedValue(stripMapResult(cachedResult));
sendOperationUpdate(OperationUpdate.builder()
.action(OperationAction.SUCCEED)
.subType(getSubType().getValue())
.payload(strippedResult)
.payload(strippedResult.serialized())
.contextOptions(
ContextOptions.builder().replayChildren(true).build()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,17 @@ protected void handleCompletion(ConcurrencyCompletionStatus concurrencyCompletio
int failedCount = Math.toIntExact(
statuses.stream().filter(s -> s == ParallelResult.Status.FAILED).count());
int skippedCount = items.size() - succeededCount - failedCount;
cachedResult = new ParallelResult(
var result = new ParallelResult(
items.size(), succeededCount, failedCount, skippedCount, concurrencyCompletionStatus, statuses);
var serializedResult = serializeResultWithDeserializedValue(result);
cachedResult = serializedResult.deserialized();

// Branches added after checkpoint will not exist in the checkpointed result, but they'll be in the returned
// value from get() method.
sendOperationUpdate(OperationUpdate.builder()
.action(OperationAction.SUCCEED)
.subType(getSubType().getValue())
.payload(serializeResult(cachedResult))
.payload(serializedResult.serialized())
.contextOptions(ContextOptions.builder().replayChildren(true).build()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
public abstract class SerializableDurableOperation<T> extends BaseDurableOperation implements DurableFuture<T> {
private static final Logger logger = LoggerFactory.getLogger(SerializableDurableOperation.class);

protected record SerializedResult<T>(String serialized, T deserialized) {}

private final TypeToken<T> resultTypeToken;
private final SerDes resultSerDes;

Expand Down Expand Up @@ -94,14 +96,13 @@ protected T deserializeResult(String result) {
}
}

/**
* Serializes the result to a string.
*
* @param result the result to serialize
* @return the serialized string
*/
protected String serializeResult(T result) {
return resultSerDes.serialize(result);
protected SerializedResult<T> serializeResultWithDeserializedValue(T result) {
var serialized = resultSerDes.serialize(result);
T deserialized = result;
if (shouldValidateSerializationRoundTrip()) {
deserialized = deserializeResult(serialized);
}
return new SerializedResult<>(serialized, deserialized);
}

/**
Expand All @@ -110,8 +111,18 @@ protected String serializeResult(T result) {
* @param throwable the exception to serialize
* @return the serialized error object
*/
@SuppressWarnings("ThrowableNotThrown")
protected ErrorObject serializeException(Throwable throwable) {
return ExceptionHelper.buildErrorObject(throwable, resultSerDes);
var error = ExceptionHelper.buildErrorObject(throwable, resultSerDes);
if (shouldValidateSerializationRoundTrip()) {
deserializeException(error);
}
return error;
}

private boolean shouldValidateSerializationRoundTrip() {
var config = getContext().getDurableConfig();
return config == null || config.shouldValidateSerializationRoundTrip();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ private void checkpointStarted() {

private void handleStepSucceeded(T result) {
// Send SUCCEED
var serializedResult = serializeResultWithDeserializedValue(result);
var successUpdate =
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializeResult(result));
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serializedResult.serialized());

// sendOperationUpdate must be synchronous here. When waiting for the return of this call,
// the context threads waiting for the result of this step operation will be wakened up and registered.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ private void executeCheckLogic(T currentState, int attempt) {
// Execute check function in user executor
WaitForConditionResult<T> result = checkFunc.apply(currentState, stepContext);

// Serialize/deserialize round-trip on the value to ensure state is checkpoint-safe
var serializedState = serializeResult(result.value());
T deserializedValue = deserializeResult(serializedState);
// Continue with the checkpoint-shaped state so first execution matches replay.
var serializedState = serializeResultWithDeserializedValue(result.value());
T deserializedValue = serializedState.deserialized();

if (result.isDone()) {
// Condition met — checkpoint SUCCEED
var successUpdate = OperationUpdate.builder()
.action(OperationAction.SUCCEED)
.payload(serializedState);
.payload(serializedState.serialized());
sendOperationUpdate(successUpdate);
} else {
// Compute delay from strategy
Expand All @@ -145,7 +145,7 @@ private void executeCheckLogic(T currentState, int attempt) {
// Checkpoint RETRY with delay
var retryUpdate = OperationUpdate.builder()
.action(OperationAction.RETRY)
.payload(serializedState)
.payload(serializedState.serialized())
.stepOptions(StepOptions.builder()
.nextAttemptDelaySeconds(Math.toIntExact(delay.toSeconds()))
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ void testBuilder_WithCustomExecutorService() {
assertNotNull(config.getSerDes());
}

@Test
void testBuilder_SerializationRoundTripValidationDefaultsToTrue() {
var config =
DurableConfig.builder().withDurableExecutionClient(mockClient).build();

assertTrue(config.shouldValidateSerializationRoundTrip());
}

@Test
void testBuilder_WithSerializationRoundTripValidationDisabled() {
var config = DurableConfig.builder()
.withDurableExecutionClient(mockClient)
.withSerializationRoundTripValidation(false)
.build();

assertFalse(config.shouldValidateSerializationRoundTrip());
}

@Test
void testBuilder_WithAllCustomComponents() {
var config = DurableConfig.builder()
Expand Down Expand Up @@ -131,6 +149,7 @@ void testBuilder_FluentAPI() {
assertSame(builder, builder.withDurableExecutionClient(mockClient));
assertSame(builder, builder.withSerDes(mockSerDes));
assertSame(builder, builder.withExecutorService(mockExecutor));
assertSame(builder, builder.withSerializationRoundTripValidation(false));
}

@Test
Expand Down
Loading
Loading