Skip to content

Commit 5a5847e

Browse files
mananrajotiasimonelbaz
authored andcommitted
Adding streaming support for lambda pluggin (opensearch-project#6273)
Streaming response support for lambda plugin Signed-off-by: Manan Rajotia <rajotia@amazon.com>
1 parent 24f34c3 commit 5a5847e

15 files changed

Lines changed: 1175 additions & 70 deletions

File tree

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.lambda.common;
12+
13+
import org.opensearch.dataprepper.model.codec.InputCodec;
14+
import org.opensearch.dataprepper.model.event.Event;
15+
import org.opensearch.dataprepper.model.plugin.PluginFactory;
16+
import org.opensearch.dataprepper.model.record.Record;
17+
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer;
18+
import org.opensearch.dataprepper.plugins.lambda.common.config.ResponseHandling;
19+
import org.opensearch.dataprepper.plugins.lambda.common.config.StreamingOptions;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
24+
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
25+
import software.amazon.awssdk.services.lambda.model.InvokeWithResponseStreamRequest;
26+
import software.amazon.awssdk.services.lambda.model.InvokeWithResponseStreamResponseHandler;
27+
import software.amazon.awssdk.services.lambda.model.ResponseStreamingInvocationType;
28+
import software.amazon.awssdk.services.lambda.model.invokewithresponsestreamresponseevent.DefaultPayloadChunk;
29+
30+
import java.io.ByteArrayInputStream;
31+
import java.io.ByteArrayOutputStream;
32+
import java.io.IOException;
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
import java.util.concurrent.CompletableFuture;
36+
37+
/**
38+
* Handles actual streaming Lambda invocations using AWS SDK streaming API
39+
*/
40+
public class StreamingLambdaHandler {
41+
private static final Logger LOG = LoggerFactory.getLogger(StreamingLambdaHandler.class);
42+
43+
private final LambdaAsyncClient lambdaAsyncClient;
44+
private final PluginFactory pluginFactory;
45+
private final InputCodec responseCodec;
46+
private final String functionName;
47+
private final StreamingOptions streamingOptions;
48+
49+
public StreamingLambdaHandler(
50+
LambdaAsyncClient lambdaAsyncClient,
51+
PluginFactory pluginFactory,
52+
InputCodec responseCodec,
53+
String functionName,
54+
StreamingOptions streamingOptions) {
55+
this.lambdaAsyncClient = lambdaAsyncClient;
56+
this.pluginFactory = pluginFactory;
57+
this.responseCodec = responseCodec;
58+
this.functionName = functionName;
59+
this.streamingOptions = streamingOptions;
60+
}
61+
62+
public CompletableFuture<List<Record<Event>>> invokeWithStreaming(Buffer inputBuffer) {
63+
64+
CompletableFuture<List<Record<Event>>> resultFuture = new CompletableFuture<>();
65+
ByteArrayOutputStream responseStream = new ByteArrayOutputStream();
66+
67+
// Get the InvokeRequest from buffer and extract payload
68+
InvokeRequest invokeRequest = inputBuffer.getRequestPayload(functionName, "RequestResponse");
69+
if (invokeRequest == null) {
70+
resultFuture.completeExceptionally(new IllegalArgumentException("No payload in buffer"));
71+
return resultFuture;
72+
}
73+
74+
InvokeWithResponseStreamRequest request = InvokeWithResponseStreamRequest.builder()
75+
.functionName(functionName)
76+
.invocationType(ResponseStreamingInvocationType.REQUEST_RESPONSE)
77+
.payload(invokeRequest.payload())
78+
.build();
79+
80+
InvokeWithResponseStreamResponseHandler responseHandler = InvokeWithResponseStreamResponseHandler.builder()
81+
.onResponse(response -> {
82+
LOG.debug("Streaming response started for function: {}", functionName);
83+
})
84+
.onEventStream(publisher -> {
85+
publisher.subscribe(event -> {
86+
if (event instanceof DefaultPayloadChunk) {
87+
DefaultPayloadChunk chunk = (DefaultPayloadChunk) event;
88+
try {
89+
// DefaultPayloadChunk should have payload() method
90+
byte[] chunkBytes = chunk.payload().asByteArray();
91+
// Synchronize access to ByteArrayOutputStream as it's not thread-safe
92+
// AWS SDK may deliver chunks on different threads
93+
synchronized (responseStream) {
94+
responseStream.write(chunkBytes);
95+
}
96+
LOG.debug("Received chunk of size: {} bytes", chunkBytes.length);
97+
} catch (IOException e) {
98+
LOG.error("Error writing chunk to response stream", e);
99+
resultFuture.completeExceptionally(e);
100+
}
101+
} else {
102+
// Other events (e.g., InvokeComplete) are handled by onComplete()
103+
LOG.debug("Ignoring non-payload Lambda stream event: {}", event.getClass().getSimpleName());
104+
}
105+
});
106+
})
107+
.onComplete(() -> {
108+
try {
109+
byte[] completeResponse = responseStream.toByteArray();
110+
LOG.debug("Streaming response complete. Total size: {} bytes", completeResponse.length);
111+
112+
List<Record<Event>> processedRecords = processStreamingResponse(
113+
completeResponse, inputBuffer, streamingOptions);
114+
resultFuture.complete(processedRecords);
115+
116+
} catch (Exception e) {
117+
LOG.error("Error processing complete streaming response", e);
118+
resultFuture.completeExceptionally(e);
119+
}
120+
})
121+
.onError(throwable -> {
122+
LOG.error("Error in streaming Lambda invocation", throwable);
123+
resultFuture.completeExceptionally(throwable);
124+
})
125+
.build();
126+
127+
lambdaAsyncClient.invokeWithResponseStream(request, responseHandler);
128+
return resultFuture;
129+
}
130+
131+
private List<Record<Event>> processStreamingResponse(
132+
byte[] responseBytes,
133+
Buffer inputBuffer,
134+
StreamingOptions streamingOptions) throws IOException {
135+
136+
List<Record<Event>> resultRecords = new ArrayList<>();
137+
138+
try (ByteArrayInputStream responseStream = new ByteArrayInputStream(responseBytes)) {
139+
responseCodec.parse(responseStream, record -> {
140+
Event parsedEvent = record.getData();
141+
resultRecords.add(new Record<>(parsedEvent));
142+
});
143+
}
144+
145+
LOG.info("Processed streaming response: {} records from {} bytes",
146+
resultRecords.size(), responseBytes.length);
147+
148+
// Apply response handling strategy
149+
return applyResponseHandling(resultRecords, inputBuffer, streamingOptions);
150+
}
151+
152+
/**
153+
* Applies the configured response handling strategy to the parsed records.
154+
*
155+
* @param parsedRecords Records parsed from the streaming response
156+
* @param inputBuffer Original input buffer containing source events
157+
* @param streamingOptions Configuration for response handling
158+
* @return Processed records based on the handling strategy
159+
*/
160+
private List<Record<Event>> applyResponseHandling(
161+
List<Record<Event>> parsedRecords,
162+
Buffer inputBuffer,
163+
StreamingOptions streamingOptions) {
164+
165+
if (streamingOptions == null ||
166+
streamingOptions.getResponseHandling() != ResponseHandling.RECONSTRUCT_DOCUMENT) {
167+
// No reconstruction - return records as-is
168+
return parsedRecords;
169+
}
170+
171+
// Reconstruct: merge all chunks into a single document
172+
return reconstructDocument(parsedRecords, inputBuffer);
173+
}
174+
175+
/**
176+
* Reconstructs a single document from multiple streaming chunks.
177+
* All chunks from the streaming response are merged into one Event,
178+
* matching the original input event count.
179+
*
180+
* <p>The reconstructed event retains the original event's EventHandle, enabling proper end-to-end
181+
* acknowledgement tracking. Chunks are treated as transport-level fragments (due to Lambda's
182+
* response size limits) and are not tracked separately in the acknowledgement system - only the
183+
* final reconstructed event is acknowledged downstream.</p>
184+
*
185+
* @param parsedRecords All chunks parsed from the streaming response
186+
* @param inputBuffer Original input buffer
187+
* @return List containing the reconstructed document(s)
188+
*/
189+
private List<Record<Event>> reconstructDocument(
190+
List<Record<Event>> parsedRecords,
191+
Buffer inputBuffer) {
192+
193+
if (parsedRecords.isEmpty()) {
194+
return parsedRecords;
195+
}
196+
197+
// Get the original records to maintain event handle relationships
198+
List<Record<Event>> originalRecords = inputBuffer.getRecords();
199+
200+
if (originalRecords.isEmpty()) {
201+
LOG.warn("No original records found in buffer for reconstruction");
202+
return parsedRecords;
203+
}
204+
205+
// Defensive check: reconstruct-document requires exactly 1 event per buffer
206+
// This should be enforced by validation in LambdaProcessor, but we check here to prevent silent failures
207+
if (originalRecords.size() != 1) {
208+
String errorMsg = String.format(
209+
"reconstruct-document mode requires exactly 1 event per buffer, found %d events. " +
210+
"This should have been prevented by configuration validation. " +
211+
"Please ensure batch.threshold.event_count is set to 1.",
212+
originalRecords.size()
213+
);
214+
LOG.error(errorMsg);
215+
throw new IllegalStateException(errorMsg);
216+
}
217+
218+
// Merge all chunks into the single original record
219+
// This handles: 1 input event → multiple chunks → 1 reconstructed event
220+
Event reconstructedEvent = originalRecords.get(0).getData();
221+
222+
// Merge all parsed chunks into the reconstructed event
223+
for (Record<Event> parsedRecord : parsedRecords) {
224+
Event chunkEvent = parsedRecord.getData();
225+
reconstructedEvent.merge(chunkEvent);
226+
}
227+
228+
LOG.info("Reconstructed {} chunks into {} document(s)",
229+
parsedRecords.size(), originalRecords.size());
230+
231+
// Return the original records (now containing merged data)
232+
return originalRecords;
233+
}
234+
}

data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/accumlator/InMemoryBuffer.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ public class InMemoryBuffer implements Buffer {
4040
private int eventCount;
4141
private long payloadRequestSize;
4242
private List<String> keys;
43+
private boolean codecStarted;
44+
private boolean codecCompleted;
4345

4446

4547
public InMemoryBuffer(String batchOptionKeyName) {
@@ -56,6 +58,8 @@ public InMemoryBuffer(String batchOptionKeyName, OutputCodecContext outputCodecC
5658
eventCount = 0;
5759
payloadRequestSize = 0;
5860
payloadResponseSize = 0;
61+
codecStarted = false;
62+
codecCompleted = false;
5963
// Setup request codec
6064
JsonOutputCodecConfig jsonOutputCodecConfig = new JsonOutputCodecConfig();
6165
jsonOutputCodecConfig.setKeyName(batchOptionKeyName);
@@ -79,6 +83,7 @@ public void addRecord(Record<Event> record) {
7983
try {
8084
if (eventCount == 0) {
8185
requestCodec.start(this.byteArrayOutputStream, event, this.outputCodecContext);
86+
codecStarted = true;
8287
}
8388
requestCodec.writeEvent(event, this.byteArrayOutputStream);
8489
} catch (IOException e) {
@@ -115,7 +120,11 @@ public InvokeRequest getRequestPayload(String functionName, String invocationTyp
115120
}
116121

117122
try {
118-
requestCodec.complete(this.byteArrayOutputStream);
123+
// Only call complete if we actually started the codec and haven't completed it yet
124+
if (codecStarted && !codecCompleted) {
125+
requestCodec.complete(this.byteArrayOutputStream);
126+
codecCompleted = true;
127+
}
119128
} catch (IOException e) {
120129
throw new RuntimeException(e);
121130
}
@@ -159,4 +168,3 @@ public Long getPayloadRequestSize() {
159168
return payloadRequestSize;
160169
}
161170
}
162-

data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/InvocationType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88

99
public enum InvocationType {
1010
REQUEST_RESPONSE("request-response", "RequestResponse"),
11-
EVENT("event", "Event");
11+
EVENT("event", "Event"),
12+
STREAMING_RESPONSE("streaming-response", "RequestResponse");
1213

1314
private final String userInputValue;
1415
private final String awsLambdaValue;

data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/LambdaCommonConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ public abstract class LambdaCommonConfig {
6060
public boolean hasKeys() {
6161
return keys != null && keys.size() > 0;
6262
}
63+
@JsonPropertyDescription("Streaming options for Lambda response streaming")
64+
@JsonProperty("streaming")
65+
@Valid
66+
private StreamingOptions streamingOptions;
67+
68+
public StreamingOptions getStreamingOptions() {
69+
return streamingOptions;
70+
}
6371

6472
public abstract InvocationType getInvocationType();
6573

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.lambda.common.config;
12+
13+
import com.fasterxml.jackson.annotation.JsonCreator;
14+
import com.fasterxml.jackson.annotation.JsonValue;
15+
16+
import java.util.Arrays;
17+
import java.util.Map;
18+
import java.util.stream.Collectors;
19+
20+
/**
21+
* Enum representing response handling strategies for Lambda streaming responses.
22+
*/
23+
public enum ResponseHandling {
24+
RECONSTRUCT_DOCUMENT("reconstruct-document");
25+
26+
private static final Map<String, ResponseHandling> NAMES_MAP = Arrays.stream(ResponseHandling.values())
27+
.collect(Collectors.toMap(
28+
value -> value.optionName,
29+
value -> value
30+
));
31+
32+
private final String optionName;
33+
34+
ResponseHandling(final String optionName) {
35+
this.optionName = optionName;
36+
}
37+
38+
@JsonValue
39+
public String getOptionName() {
40+
return optionName;
41+
}
42+
43+
@JsonCreator
44+
public static ResponseHandling fromOptionName(final String optionName) {
45+
return NAMES_MAP.get(optionName);
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.lambda.common.config;
12+
13+
import com.fasterxml.jackson.annotation.JsonProperty;
14+
15+
public class StreamingOptions {
16+
@JsonProperty("enabled")
17+
private boolean enabled = false;
18+
19+
@JsonProperty("response_handling")
20+
private ResponseHandling responseHandling = ResponseHandling.RECONSTRUCT_DOCUMENT;
21+
22+
public boolean isEnabled() {
23+
return enabled;
24+
}
25+
26+
public ResponseHandling getResponseHandling() {
27+
return responseHandling;
28+
}
29+
}

0 commit comments

Comments
 (0)