Skip to content

Commit 5776bd5

Browse files
authored
move serdes out of DurableHandler (#225)
1 parent 03269fc commit 5776bd5

7 files changed

Lines changed: 314 additions & 224 deletions

File tree

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ private DurableConfig(Builder builder) {
9292
this.pollingStrategy =
9393
builder.pollingStrategy != null ? builder.pollingStrategy : PollingStrategies.Presets.DEFAULT;
9494
this.checkpointDelay = builder.checkpointDelay != null ? builder.checkpointDelay : Duration.ofSeconds(0);
95+
96+
validateConfiguration();
9597
}
9698

9799
/**
@@ -166,6 +168,18 @@ public Duration getCheckpointDelay() {
166168
return checkpointDelay;
167169
}
168170

171+
public void validateConfiguration() {
172+
if (getDurableExecutionClient() == null) {
173+
throw new IllegalStateException("DurableExecutionClient configuration failed");
174+
}
175+
if (getSerDes() == null) {
176+
throw new IllegalStateException("SerDes configuration failed");
177+
}
178+
if (getExecutorService() == null) {
179+
throw new IllegalStateException("ExecutorService configuration failed");
180+
}
181+
}
182+
169183
/**
170184
* Creates a default DurableExecutionClient with production LambdaClient. Uses
171185
* EnvironmentVariableCredentialsProvider and region from AWS_REGION. If AWS_REGION is not set, defaults to
@@ -317,7 +331,7 @@ public Builder withSerDes(SerDes serDes) {
317331
* will be created.
318332
*
319333
* <p>This executor is used exclusively for running user-defined operations. Internal SDK coordination (polling,
320-
* checkpointing) uses the common ForkJoinPool and is not affected by this setting.
334+
* checkpointing) uses the SDK InternalExecutor thread pool and is not affected by this setting.
321335
*
322336
* @param executorService Custom ExecutorService instance
323337
* @return This builder
@@ -351,8 +365,8 @@ public Builder withPollingStrategy(PollingStrategy pollingStrategy) {
351365
}
352366

353367
/**
354-
* Sets how often the SDK checkpoints updates to backend. If not set, defaults to 0, which disables checkpoint
355-
* batching.
368+
* Sets how often the SDK checkpoints updates to backend. If not set, defaults to 0, SDK will checkpoint the
369+
* updates as soon as possible.
356370
*
357371
* @param duration the checkpoint delay in Duration
358372
* @return This builder

sdk/src/main/java/software/amazon/lambda/durable/DurableHandler.java

Lines changed: 12 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,14 @@
44

55
import com.amazonaws.services.lambda.runtime.Context;
66
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
7-
import com.fasterxml.jackson.core.JsonGenerator;
8-
import com.fasterxml.jackson.core.JsonParser;
9-
import com.fasterxml.jackson.core.JsonToken;
10-
import com.fasterxml.jackson.databind.DeserializationContext;
11-
import com.fasterxml.jackson.databind.DeserializationFeature;
12-
import com.fasterxml.jackson.databind.JsonDeserializer;
13-
import com.fasterxml.jackson.databind.JsonSerializer;
14-
import com.fasterxml.jackson.databind.MapperFeature;
15-
import com.fasterxml.jackson.databind.ObjectMapper;
16-
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
17-
import com.fasterxml.jackson.databind.SerializationFeature;
18-
import com.fasterxml.jackson.databind.SerializerProvider;
19-
import com.fasterxml.jackson.databind.json.JsonMapper;
20-
import com.fasterxml.jackson.databind.module.SimpleModule;
21-
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
227
import java.io.IOException;
238
import java.io.InputStream;
249
import java.io.OutputStream;
2510
import java.lang.reflect.ParameterizedType;
26-
import java.time.Instant;
27-
import java.time.format.DateTimeFormatter;
28-
import java.time.format.DateTimeFormatterBuilder;
29-
import java.util.Date;
3011
import org.slf4j.Logger;
3112
import org.slf4j.LoggerFactory;
3213
import software.amazon.lambda.durable.model.DurableExecutionInput;
33-
import software.amazon.lambda.durable.serde.AwsSdkV2Module;
14+
import software.amazon.lambda.durable.serde.DurableInputOutputSerDes;
3415

3516
/**
3617
* Abstract base class for Lambda handlers that use durable execution.
@@ -46,7 +27,7 @@ public abstract class DurableHandler<I, O> implements RequestStreamHandler {
4627

4728
private final TypeToken<I> inputType;
4829
private final DurableConfig config;
49-
private final ObjectMapper objectMapper = createObjectMapper(); // Internal ObjectMapper
30+
private final DurableInputOutputSerDes serDes = new DurableInputOutputSerDes(); // Internal ObjectMapper
5031
private static final Logger logger = LoggerFactory.getLogger(DurableHandler.class);
5132

5233
protected DurableHandler() {
@@ -58,7 +39,6 @@ protected DurableHandler() {
5839
throw new IllegalArgumentException("Cannot determine input type parameter");
5940
}
6041
this.config = createConfiguration();
61-
validateConfiguration();
6242
}
6343

6444
/**
@@ -142,26 +122,22 @@ protected DurableConfig createConfiguration() {
142122
return DurableConfig.defaultConfig();
143123
}
144124

145-
private void validateConfiguration() {
146-
if (config.getDurableExecutionClient() == null) {
147-
throw new IllegalStateException("DurableExecutionClient configuration failed");
148-
}
149-
if (config.getSerDes() == null) {
150-
throw new IllegalStateException("SerDes configuration failed");
151-
}
152-
if (config.getExecutorService() == null) {
153-
throw new IllegalStateException("ExecutorService configuration failed");
154-
}
155-
}
156-
125+
/**
126+
* Reads the request, executes the durable function handler and writes the response
127+
*
128+
* @param inputStream the input stream
129+
* @param outputStream the output stream
130+
* @param context the Lambda context
131+
* @throws IOException thrown when serialize/deserialize fails
132+
*/
157133
@Override
158134
public final void handleRequest(InputStream inputStream, OutputStream outputStream, Context context)
159135
throws IOException {
160136
var inputString = new String(inputStream.readAllBytes());
161137
logger.debug("Raw input from durable handler: {}", inputString);
162-
var input = this.objectMapper.readValue(inputString, DurableExecutionInput.class);
138+
var input = serDes.deserialize(inputString, TypeToken.get(DurableExecutionInput.class));
163139
var output = DurableExecutor.execute(input, context, inputType, this::handleRequest, config);
164-
outputStream.write(objectMapper.writeValueAsBytes(output));
140+
outputStream.write(serDes.serialize(output).getBytes());
165141
}
166142

167143
/**
@@ -172,66 +148,4 @@ public final void handleRequest(InputStream inputStream, OutputStream outputStre
172148
* @return Result
173149
*/
174150
public abstract O handleRequest(I input, DurableContext context);
175-
176-
/**
177-
* Creates ObjectMapper for DAR backend communication (internal use only). This is for INTERNAL use only - handles
178-
* Lambda Durable Functions backend protocol.
179-
*
180-
* <p>Customer-facing serialization uses SerDes from DurableConfig.
181-
*
182-
* @return Configured ObjectMapper for durable backend communication
183-
*/
184-
public static ObjectMapper createObjectMapper() {
185-
var dateModule = new SimpleModule();
186-
dateModule.addDeserializer(Date.class, new JsonDeserializer<>() {
187-
@Override
188-
public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
189-
throws IOException {
190-
// Timestamp is a double value represent seconds since epoch.
191-
var timestamp = jsonParser.getDoubleValue();
192-
// Date expects milliseconds since epoch, so multiply by 1000.
193-
return new Date((long) (timestamp * 1000));
194-
}
195-
});
196-
dateModule.addSerializer(Date.class, new JsonSerializer<>() {
197-
@Override
198-
public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
199-
throws IOException {
200-
// Timestamp should be a double value representing seconds since epoch, so
201-
// convert from milliseconds.
202-
double timestamp = date.getTime() / 1000.0;
203-
jsonGenerator.writeNumber(timestamp);
204-
}
205-
});
206-
207-
// Needed for deserialization of timestamps for some SDK v2 objects
208-
dateModule.addDeserializer(Instant.class, new JsonDeserializer<>() {
209-
private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder()
210-
.appendPattern("yyyy-MM-dd HH:mm:ss.SSSSSSXXX")
211-
.toFormatter();
212-
213-
@Override
214-
public Instant deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
215-
throws IOException {
216-
if (jsonParser.hasToken(JsonToken.VALUE_NUMBER_INT)) {
217-
return Instant.ofEpochMilli(jsonParser.getLongValue());
218-
}
219-
var timestampStr = jsonParser.getValueAsString();
220-
return Instant.from(TIMESTAMP_FORMATTER.parse(timestampStr));
221-
}
222-
});
223-
224-
return JsonMapper.builder()
225-
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
226-
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
227-
// Looks pretty, and probably needed for tests to be deterministic.
228-
.enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
229-
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
230-
// Data passed over the wire from the backend is UpperCamelCase
231-
.propertyNamingStrategy(PropertyNamingStrategies.UPPER_CAMEL_CASE)
232-
.addModule(new JavaTimeModule())
233-
.addModule(dateModule)
234-
.addModule(new AwsSdkV2Module())
235-
.build();
236-
}
237151
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.serde;
4+
5+
import com.fasterxml.jackson.core.JsonGenerator;
6+
import com.fasterxml.jackson.core.JsonParser;
7+
import com.fasterxml.jackson.core.JsonToken;
8+
import com.fasterxml.jackson.databind.DeserializationContext;
9+
import com.fasterxml.jackson.databind.DeserializationFeature;
10+
import com.fasterxml.jackson.databind.JavaType;
11+
import com.fasterxml.jackson.databind.JsonDeserializer;
12+
import com.fasterxml.jackson.databind.JsonSerializer;
13+
import com.fasterxml.jackson.databind.MapperFeature;
14+
import com.fasterxml.jackson.databind.ObjectMapper;
15+
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
16+
import com.fasterxml.jackson.databind.SerializationFeature;
17+
import com.fasterxml.jackson.databind.SerializerProvider;
18+
import com.fasterxml.jackson.databind.json.JsonMapper;
19+
import com.fasterxml.jackson.databind.module.SimpleModule;
20+
import com.fasterxml.jackson.databind.type.TypeFactory;
21+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
22+
import java.io.IOException;
23+
import java.lang.reflect.Type;
24+
import java.time.Instant;
25+
import java.time.format.DateTimeFormatter;
26+
import java.time.format.DateTimeFormatterBuilder;
27+
import java.util.Date;
28+
import java.util.Map;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
import software.amazon.lambda.durable.TypeToken;
31+
import software.amazon.lambda.durable.util.ExceptionHelper;
32+
33+
/**
34+
* Serializer/Deserializer for Durable Execution Input and Output objects. This is for INTERNAL use only - handles
35+
* Lambda Durable Functions backend protocol.
36+
*
37+
* <p>Customer-facing serialization uses SerDes from DurableConfig.
38+
*/
39+
public class DurableInputOutputSerDes implements SerDes {
40+
private final ObjectMapper objectMapper = createObjectMapper(); // Internal ObjectMapper
41+
private final TypeFactory typeFactory = objectMapper.getTypeFactory();
42+
private final Map<Type, JavaType> typeCache = new ConcurrentHashMap<>();
43+
44+
/**
45+
* Creates ObjectMapper for DAR backend communication (internal use only). This is for INTERNAL use only - handles
46+
* Lambda Durable Functions backend protocol.
47+
*
48+
* <p>Customer-facing serialization uses SerDes from DurableConfig.
49+
*
50+
* @return Configured ObjectMapper for durable backend communication
51+
*/
52+
static ObjectMapper createObjectMapper() {
53+
var dateModule = new SimpleModule();
54+
dateModule.addDeserializer(Date.class, new JsonDeserializer<>() {
55+
@Override
56+
public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
57+
throws IOException {
58+
// Timestamp is a double value represent seconds since epoch.
59+
var timestamp = jsonParser.getDoubleValue();
60+
// Date expects milliseconds since epoch, so multiply by 1000.
61+
return new Date((long) (timestamp * 1000));
62+
}
63+
});
64+
dateModule.addSerializer(Date.class, new JsonSerializer<>() {
65+
@Override
66+
public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
67+
throws IOException {
68+
// Timestamp should be a double value representing seconds since epoch, so
69+
// convert from milliseconds.
70+
double timestamp = date.getTime() / 1000.0;
71+
jsonGenerator.writeNumber(timestamp);
72+
}
73+
});
74+
75+
// Needed for deserialization of timestamps for some SDK v2 objects
76+
dateModule.addDeserializer(Instant.class, new JsonDeserializer<>() {
77+
private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder()
78+
.appendPattern("yyyy-MM-dd HH:mm:ss.SSSSSSXXX")
79+
.toFormatter();
80+
81+
@Override
82+
public Instant deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
83+
throws IOException {
84+
if (jsonParser.hasToken(JsonToken.VALUE_NUMBER_INT)) {
85+
return Instant.ofEpochMilli(jsonParser.getLongValue());
86+
}
87+
var timestampStr = jsonParser.getValueAsString();
88+
return Instant.from(TIMESTAMP_FORMATTER.parse(timestampStr));
89+
}
90+
});
91+
92+
return JsonMapper.builder()
93+
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
94+
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
95+
// Looks pretty, and probably needed for tests to be deterministic.
96+
.enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
97+
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
98+
// Data passed over the wire from the backend is UpperCamelCase
99+
.propertyNamingStrategy(PropertyNamingStrategies.UPPER_CAMEL_CASE)
100+
.addModule(new JavaTimeModule())
101+
.addModule(dateModule)
102+
.addModule(new AwsSdkV2Module())
103+
.build();
104+
}
105+
106+
/**
107+
* Serializes an object to a JSON string.
108+
*
109+
* @param value the object to serialize
110+
* @return the JSON string representation, or null if value is null
111+
*/
112+
@Override
113+
public String serialize(Object value) {
114+
if (value == null) {
115+
return null;
116+
}
117+
try {
118+
return objectMapper.writeValueAsString(value);
119+
} catch (IOException e) {
120+
ExceptionHelper.sneakyThrow(e);
121+
return null;
122+
}
123+
}
124+
125+
/**
126+
* Deserializes a JSON string to DurableExecutionInput object
127+
*
128+
* @param data the JSON string to deserialize
129+
* @param typeToken the type token of DurableExecutionInput
130+
* @return the deserialized object, or null if data is null
131+
*/
132+
@Override
133+
public <T> T deserialize(String data, TypeToken<T> typeToken) {
134+
if (data == null) {
135+
return null;
136+
}
137+
try {
138+
JavaType javaType = typeCache.computeIfAbsent(typeToken.getType(), typeFactory::constructType);
139+
return objectMapper.readValue(data, javaType);
140+
} catch (IOException e) {
141+
ExceptionHelper.sneakyThrow(e);
142+
return null;
143+
}
144+
}
145+
}

sdk/src/main/java/software/amazon/lambda/durable/serde/JacksonSerDes.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,15 @@ public class JacksonSerDes implements SerDes {
3636

3737
/** Creates a new JacksonSerDes with default ObjectMapper configuration. */
3838
public JacksonSerDes() {
39-
this.mapper = new ObjectMapper()
39+
this(new ObjectMapper()
4040
.registerModule(new JavaTimeModule())
4141
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
42-
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
42+
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES));
43+
}
44+
45+
/** Creates a new JacksonSerDes with a custom ObjectMapper configuration. */
46+
public JacksonSerDes(ObjectMapper objectMapper) {
47+
this.mapper = objectMapper;
4348
this.typeFactory = mapper.getTypeFactory();
4449
this.typeCache = new ConcurrentHashMap<>();
4550
}

0 commit comments

Comments
 (0)