-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathDurableHandler.java
More file actions
162 lines (153 loc) · 6.77 KB
/
Copy pathDurableHandler.java
File metadata and controls
162 lines (153 loc) · 6.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.lambda.durable.execution.DurableExecutor;
import software.amazon.lambda.durable.model.DurableExecutionInput;
import software.amazon.lambda.durable.serde.DurableInputOutputSerDes;
/**
* Abstract base class for Lambda handlers that use durable execution.
*
* <p>Extend this class and implement {@link #handleRequest(Object, DurableContext)} to build resilient, multi-step
* workflows. The handler automatically manages checkpoint-and-replay, input deserialization, and communication with the
* Lambda Durable Functions backend.
*
* @param <I> the input type
* @param <O> the output type
*/
public abstract class DurableHandler<I, O> implements RequestStreamHandler {
private final TypeToken<I> inputType;
private final DurableConfig config;
private final DurableInputOutputSerDes serDes = new DurableInputOutputSerDes(); // Internal ObjectMapper
private static final Logger logger = LoggerFactory.getLogger(DurableHandler.class);
protected DurableHandler() {
this.inputType = TypeToken.fromGenericSuperClass(getClass(), 0);
this.config = createConfiguration();
}
/**
* Constructs a handler with an explicitly provided input type. Use this when the input type cannot be inferred from
* the generic superclass, such as when extending {@link DurableHandler} indirectly through an intermediate class.
*
* @param inputType the token capturing the handler's input type
*/
protected DurableHandler(TypeToken<I> inputType) {
this.inputType = inputType;
this.config = createConfiguration();
}
/**
* Gets the configuration used by this handler. This allows test frameworks and other tools to access the handler's
* configuration for testing purposes.
*
* <p>DurableConfig is immutable.
*
* @return The DurableConfig instance used by this handler
*/
public DurableConfig getConfiguration() {
return config;
}
/**
* Template method for creating configuration. Override this method to provide custom DurableExecutionClient,
* SerDes, or other configuration.
*
* <p>The {@link software.amazon.lambda.durable.client.LambdaDurableFunctionsClient} is a wrapper that customers
* should use to inject their own configured {@link software.amazon.awssdk.services.lambda.LambdaClient}. This
* allows full control over AWS SDK configuration including credentials, region, HTTP client, and retry policies.
*
* <p>Basic example with custom region and credentials:
*
* <pre>{@code
* @Override
* protected DurableConfig createConfiguration() {
* // Create custom Lambda client with specific configuration
* var lambdaClient = LambdaClient.builder()
* .region(Region.US_WEST_2)
* .credentialsProvider(ProfileCredentialsProvider.create("my-profile"))
* .build();
*
* // Wrap the Lambda client with LambdaDurableFunctionsClient
* var durableClient = new LambdaDurableFunctionsClient(lambdaClient);
*
* return DurableConfig.builder()
* .withDurableExecutionClient(durableClient)
* .build();
* }
* }</pre>
*
* <p>Advanced example with AWS CRT HTTP Client for high-performance scenarios:
*
* <pre>{@code
* @Override
* protected DurableConfig createConfiguration() {
* // Configure AWS CRT HTTP Client for optimal performance
* var crtHttpClient = AwsCrtAsyncHttpClient.builder()
* .maxConcurrency(50)
* .connectionTimeout(Duration.ofSeconds(30))
* .connectionMaxIdleTime(Duration.ofSeconds(60))
* .build();
*
* // Create Lambda client with CRT HTTP client
* var lambdaClient = LambdaClient.builder()
* .region(Region.US_EAST_1)
* .credentialsProvider(EnvironmentVariableCredentialsProvider.create())
* .httpClient(crtHttpClient)
* .overrideConfiguration(ClientOverrideConfiguration.builder()
* .retryPolicy(RetryPolicy.builder()
* .numRetries(5)
* .build())
* .build())
* .build();
*
* // Wrap with LambdaDurableFunctionsClient
* var durableClient = new LambdaDurableFunctionsClient(lambdaClient);
*
* return DurableConfig.builder()
* .withDurableExecutionClient(durableClient)
* .withSerDes(customSerDes) // Optional: custom SerDes for user data
* .withExecutorService(customExecutor) // Optional: custom thread pool
* .withSerializationRoundTripValidation(false) // Optional: skip extra validation deserialize pass
* .build();
* }
* }</pre>
*
* @return DurableConfig with desired configuration
*/
protected DurableConfig createConfiguration() {
return DurableConfig.defaultConfig();
}
/**
* Reads the request, executes the durable function handler and writes the response
*
* @param inputStream the input stream
* @param outputStream the output stream
* @param context the Lambda context
* @throws IOException thrown when serialize/deserialize fails
*/
@Override
public final void handleRequest(InputStream inputStream, OutputStream outputStream, Context context)
throws IOException {
var inputString = new String(inputStream.readAllBytes());
logger.debug("Raw input from durable handler: {}", inputString);
var input = serDes.deserialize(inputString, TypeToken.get(DurableExecutionInput.class));
// Durable function inputs must contain DurableExecutionArn and CheckpointToken
if (input.durableExecutionArn() == null || input.checkpointToken() == null) {
throw new IllegalStateException(
"Unexpected payload provided to start the durable execution. DurableConfig must be set in Lambda function configuration.");
}
var output = DurableExecutor.execute(input, context, inputType, this::handleRequest, config);
outputStream.write(serDes.serialize(output).getBytes());
}
/**
* Handle the durable execution.
*
* @param input User input
* @param context Durable context for operations
* @return Result
*/
public abstract O handleRequest(I input, DurableContext context);
}