-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathDurableHandler.java
More file actions
228 lines (215 loc) · 9.9 KB
/
Copy pathDurableHandler.java
File metadata and controls
228 lines (215 loc) · 9.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazonaws.lambda.durable;
import com.amazonaws.lambda.durable.model.DurableExecutionInput;
import com.amazonaws.lambda.durable.serde.AwsSdkV2Module;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.ParameterizedType;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class DurableHandler<I, O> implements RequestStreamHandler {
private final Class<I> inputType;
private final DurableConfig config;
private final ObjectMapper objectMapper = createObjectMapper(); // Internal ObjectMapper
private static final Logger logger = LoggerFactory.getLogger(DurableHandler.class);
@SuppressWarnings("unchecked")
protected DurableHandler() {
// Extract input type from generic superclass
var superClass = getClass().getGenericSuperclass();
if (superClass instanceof ParameterizedType paramType) {
this.inputType = (Class<I>) paramType.getActualTypeArguments()[0];
} else {
throw new IllegalArgumentException("Cannot determine input type parameter");
}
this.config = createConfiguration();
validateConfiguration();
}
/**
* Gets the configuration used by this handler. This allows test frameworks and other tools to access the handler's
* configuration for testing purposes.
*
* <p>DurableConfig is immutable.
*
* @return The DurableConfig instance used by this handler
*/
public DurableConfig getConfiguration() {
return config;
}
/**
* Template method for creating configuration. Override this method to provide custom DurableExecutionClient,
* SerDes, or other configuration.
*
* <p>The {@link com.amazonaws.lambda.durable.client.LambdaDurableFunctionsClient} is a wrapper that customers
* should use to inject their own configured {@link software.amazon.awssdk.services.lambda.LambdaClient}. This
* allows full control over AWS SDK configuration including credentials, region, HTTP client, and retry policies.
*
* <p>Basic example with custom region and credentials:
*
* <pre>{@code
* @Override
* protected DurableConfig createConfiguration() {
* // Create custom Lambda client with specific configuration
* var lambdaClient = LambdaClient.builder()
* .region(Region.US_WEST_2)
* .credentialsProvider(ProfileCredentialsProvider.create("my-profile"))
* .build();
*
* // Wrap the Lambda client with LambdaDurableFunctionsClient
* var durableClient = new LambdaDurableFunctionsClient(lambdaClient);
*
* return DurableConfig.builder()
* .withDurableExecutionClient(durableClient)
* .build();
* }
* }</pre>
*
* <p>Advanced example with AWS CRT HTTP Client for high-performance scenarios:
*
* <pre>{@code
* @Override
* protected DurableConfig createConfiguration() {
* // Configure AWS CRT HTTP Client for optimal performance
* var crtHttpClient = AwsCrtAsyncHttpClient.builder()
* .maxConcurrency(50)
* .connectionTimeout(Duration.ofSeconds(30))
* .connectionMaxIdleTime(Duration.ofSeconds(60))
* .build();
*
* // Create Lambda client with CRT HTTP client
* var lambdaClient = LambdaClient.builder()
* .region(Region.US_EAST_1)
* .credentialsProvider(EnvironmentVariableCredentialsProvider.create())
* .httpClient(crtHttpClient)
* .overrideConfiguration(ClientOverrideConfiguration.builder()
* .retryPolicy(RetryPolicy.builder()
* .numRetries(5)
* .build())
* .build())
* .build();
*
* // Wrap with LambdaDurableFunctionsClient
* var durableClient = new LambdaDurableFunctionsClient(lambdaClient);
*
* return DurableConfig.builder()
* .withDurableExecutionClient(durableClient)
* .withSerDes(customSerDes) // Optional: custom SerDes for user data
* .withExecutorService(customExecutor) // Optional: custom thread pool
* .build();
* }
* }</pre>
*
* @return DurableConfig with desired configuration
*/
protected DurableConfig createConfiguration() {
return DurableConfig.defaultConfig();
}
private void validateConfiguration() {
if (config.getDurableExecutionClient() == null) {
throw new IllegalStateException("DurableExecutionClient configuration failed");
}
if (config.getSerDes() == null) {
throw new IllegalStateException("SerDes configuration failed");
}
if (config.getExecutorService() == null) {
throw new IllegalStateException("ExecutorService configuration failed");
}
}
@Override
public final void handleRequest(InputStream inputStream, OutputStream outputStream, Context context)
throws IOException {
var inputString = new String(inputStream.readAllBytes());
logger.debug("Raw input from durable handler: {}", inputString);
var input = this.objectMapper.readValue(inputString, DurableExecutionInput.class);
var output = DurableExecutor.execute(input, context, inputType, this::handleRequest, config);
outputStream.write(objectMapper.writeValueAsBytes(output));
}
/**
* Handle the durable execution.
*
* @param input User input
* @param context Durable context for operations
* @return Result
*/
public abstract O handleRequest(I input, DurableContext context);
/**
* Creates ObjectMapper for DAR backend communication (internal use only). This is for INTERNAL use only - handles
* Lambda Durable Functions backend protocol.
*
* <p>Customer-facing serialization uses SerDes from DurableConfig.
*
* @return Configured ObjectMapper for durable backend communication
*/
public static ObjectMapper createObjectMapper() {
var dateModule = new SimpleModule();
dateModule.addDeserializer(Date.class, new JsonDeserializer<>() {
@Override
public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException {
// Timestamp is a double value represent seconds since epoch.
var timestamp = jsonParser.getDoubleValue();
// Date expects milliseconds since epoch, so multiply by 1000.
return new Date((long) (timestamp * 1000));
}
});
dateModule.addSerializer(Date.class, new JsonSerializer<>() {
@Override
public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
throws IOException {
// Timestamp should be a double value representing seconds since epoch, so
// convert from milliseconds.
double timestamp = date.getTime() / 1000.0;
jsonGenerator.writeNumber(timestamp);
}
});
// Needed for deserialization of timestamps for some SDK v2 objects
dateModule.addDeserializer(Instant.class, new JsonDeserializer<>() {
private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd HH:mm:ss.SSSSSSXXX")
.toFormatter();
@Override
public Instant deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException {
if (jsonParser.hasToken(JsonToken.VALUE_NUMBER_INT)) {
return Instant.ofEpochMilli(jsonParser.getLongValue());
}
var timestampStr = jsonParser.getValueAsString();
return Instant.from(TIMESTAMP_FORMATTER.parse(timestampStr));
}
});
return JsonMapper.builder()
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
// Looks pretty, and probably needed for tests to be deterministic.
.enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
// Data passed over the wire from the backend is UpperCamelCase
.propertyNamingStrategy(PropertyNamingStrategies.UPPER_CAMEL_CASE)
.addModule(new JavaTimeModule())
.addModule(dateModule)
.addModule(new AwsSdkV2Module())
.build();
}
}