-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathSerializableDurableOperation.java
More file actions
166 lines (151 loc) · 6.44 KB
/
Copy pathSerializableDurableOperation.java
File metadata and controls
166 lines (151 loc) · 6.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.operation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.lambda.model.ErrorObject;
import software.amazon.lambda.durable.DurableFuture;
import software.amazon.lambda.durable.TypeToken;
import software.amazon.lambda.durable.context.DurableContextImpl;
import software.amazon.lambda.durable.exception.SerDesException;
import software.amazon.lambda.durable.model.OperationIdentifier;
import software.amazon.lambda.durable.serde.SerDes;
import software.amazon.lambda.durable.util.ExceptionHelper;
/**
* Base class for all durable operations (STEP, WAIT, etc.).
*
* <p>Key methods:
*
* <ul>
* <li>{@code execute()} starts the operation (returns immediately)
* <li>{@code get()} blocks until complete and returns the result
* </ul>
*
* <p>The separation allows:
*
* <ul>
* <li>Starting multiple async operations quickly
* <li>Blocking on results later when needed
* <li>Proper thread coordination via future
* </ul>
*/
public abstract class SerializableDurableOperation<T> extends BaseDurableOperation implements DurableFuture<T> {
private static final Logger logger = LoggerFactory.getLogger(SerializableDurableOperation.class);
protected record SerializedResult<T>(String serialized, T deserialized) {}
private final TypeToken<T> resultTypeToken;
private final SerDes resultSerDes;
/**
* Constructs a new durable operation.
*
* @param operationIdentifier the unique identifier for this operation
* @param resultTypeToken the type token for deserializing the result
* @param resultSerDes the serializer/deserializer for the result
* @param durableContext the parent context this operation belongs to
*/
protected SerializableDurableOperation(
OperationIdentifier operationIdentifier,
TypeToken<T> resultTypeToken,
SerDes resultSerDes,
DurableContextImpl durableContext) {
this(operationIdentifier, resultTypeToken, resultSerDes, durableContext, null, false);
}
/**
* Constructs a new durable operation.
*
* @param operationIdentifier the unique identifier for this operation
* @param resultTypeToken the type token for deserializing the result
* @param resultSerDes the serializer/deserializer for the result
* @param durableContext the parent context this operation belongs to
* @param isVirtual whether this is a virtual operation that should not be persisted
* @param parentOperation the parent operation if this is a branch/iteration of a ConcurrencyOperation
*/
protected SerializableDurableOperation(
OperationIdentifier operationIdentifier,
TypeToken<T> resultTypeToken,
SerDes resultSerDes,
DurableContextImpl durableContext,
BaseDurableOperation parentOperation,
boolean isVirtual) {
super(operationIdentifier, durableContext, parentOperation, isVirtual);
this.resultTypeToken = resultTypeToken;
this.resultSerDes = resultSerDes;
}
/**
* Deserializes a result string into the operation's result type.
*
* @param result the serialized result string
* @return the deserialized result
* @throws SerDesException if deserialization fails
*/
protected T deserializeResult(String result) {
try {
return resultSerDes.deserialize(result, resultTypeToken);
} catch (SerDesException e) {
logger.warn(
"Failed to deserialize {} result for operation name '{}'. Ensure the result is properly encoded.",
getType(),
getName());
throw e;
}
}
protected SerializedResult<T> serializeResultWithDeserializedValue(T result) {
var serialized = resultSerDes.serialize(result);
T deserialized = result;
if (shouldValidateSerializationRoundTrip()) {
deserialized = deserializeResult(serialized);
}
return new SerializedResult<>(serialized, deserialized);
}
/**
* Serializes a throwable into an {@link ErrorObject} for checkpointing.
*
* @param throwable the exception to serialize
* @return the serialized error object
*/
@SuppressWarnings("ThrowableNotThrown")
protected ErrorObject serializeException(Throwable throwable) {
var error = ExceptionHelper.buildErrorObject(throwable, resultSerDes);
if (shouldValidateSerializationRoundTrip()) {
deserializeException(error);
}
return error;
}
private boolean shouldValidateSerializationRoundTrip() {
var config = getContext().getDurableConfig();
return config == null || config.shouldValidateSerializationRoundTrip();
}
/**
* Deserializes an {@link ErrorObject} back into a throwable, reconstructing the original exception type and stack
* trace when possible. Falls back to null if the exception class is not found or deserialization fails.
*
* @param errorObject the serialized error object
* @return the reconstructed throwable, or null if reconstruction is not possible
*/
protected Throwable deserializeException(ErrorObject errorObject) {
Throwable original = null;
if (errorObject == null) {
return original;
}
var errorType = errorObject.errorType();
var errorData = errorObject.errorData();
if (errorType == null) {
return original;
}
try {
Class<?> exceptionClass = Class.forName(errorType);
if (Throwable.class.isAssignableFrom(exceptionClass)) {
original =
resultSerDes.deserialize(errorData, TypeToken.get(exceptionClass.asSubclass(Throwable.class)));
if (original != null) {
original.setStackTrace(ExceptionHelper.deserializeStackTrace(errorObject.stackTrace()));
}
}
} catch (ClassNotFoundException e) {
logger.warn("Cannot re-construct original exception type. Falling back to generic StepFailedException.");
} catch (SerDesException e) {
logger.warn("Cannot deserialize original exception data. Falling back to generic StepFailedException.", e);
}
return original;
}
public abstract T get();
}