Skip to content

Commit c705c09

Browse files
kkondakaKrishna Kondaka
andauthored
Split HTTP source data to multiple chunks before writing to byte buffer (#4266)
* Split HTTP source data to multiple chunks before writing to byte buffer Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Modified JsonDecoder to parse objects in addition to array of objects Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Added a test case Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Removed json to array conversion Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Modified to add new JsonObjDecoder for decoding single json objects Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Removed changes to JsonDecoder Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Renamed JsonObjDecoder to JsonObjectDecoder Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
1 parent 6611631 commit c705c09

6 files changed

Lines changed: 207 additions & 6 deletions

File tree

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.codec;
7+
8+
import com.fasterxml.jackson.core.JsonFactory;
9+
import com.fasterxml.jackson.core.JsonParser;
10+
import com.fasterxml.jackson.core.JsonToken;
11+
import com.fasterxml.jackson.databind.ObjectMapper;
12+
13+
import org.opensearch.dataprepper.model.event.Event;
14+
import org.opensearch.dataprepper.model.event.JacksonEvent;
15+
import org.opensearch.dataprepper.model.log.JacksonLog;
16+
import org.opensearch.dataprepper.model.record.Record;
17+
import java.io.IOException;
18+
import java.io.InputStream;
19+
import java.time.Instant;
20+
import java.util.Map;
21+
import java.util.Objects;
22+
import java.util.function.Consumer;
23+
24+
public class JsonObjectDecoder implements ByteDecoder {
25+
private final ObjectMapper objectMapper = new ObjectMapper();
26+
private final JsonFactory jsonFactory = new JsonFactory();
27+
28+
public void parse(InputStream inputStream, Instant timeReceived, Consumer<Record<Event>> eventConsumer) throws IOException {
29+
Objects.requireNonNull(inputStream);
30+
Objects.requireNonNull(eventConsumer);
31+
32+
final JsonParser jsonParser = jsonFactory.createParser(inputStream);
33+
34+
while (!jsonParser.isClosed() && jsonParser.nextToken() != JsonToken.END_OBJECT) {
35+
if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) {
36+
final Map<String, Object> innerJson = objectMapper.readValue(jsonParser, Map.class);
37+
38+
final Record<Event> record = createRecord(innerJson, timeReceived);
39+
eventConsumer.accept(record);
40+
}
41+
}
42+
}
43+
44+
private Record<Event> createRecord(final Map<String, Object> json, final Instant timeReceived) {
45+
final JacksonLog.Builder logBuilder = JacksonLog.builder()
46+
.withData(json)
47+
.getThis();
48+
if (timeReceived != null) {
49+
logBuilder.withTimeReceived(timeReceived);
50+
}
51+
final JacksonEvent event = (JacksonEvent)logBuilder.build();
52+
53+
return new Record<>(event);
54+
}
55+
56+
}
57+
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package org.opensearch.dataprepper.model.codec;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.opensearch.dataprepper.model.event.Event;
5+
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
6+
import org.opensearch.dataprepper.model.record.Record;
7+
8+
import java.io.ByteArrayInputStream;
9+
import java.time.Instant;
10+
import java.util.Map;
11+
import java.util.Random;
12+
import java.util.UUID;
13+
14+
import static org.hamcrest.CoreMatchers.equalTo;
15+
import static org.hamcrest.MatcherAssert.assertThat;
16+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
17+
18+
import org.junit.jupiter.api.BeforeEach;
19+
20+
public class JsonObjectDecoderTest {
21+
private JsonObjectDecoder jsonObjectDecoder;
22+
private Record<Event> receivedRecord;
23+
private Instant receivedTime;
24+
25+
private JsonObjectDecoder createObjectUnderTest() {
26+
return new JsonObjectDecoder();
27+
}
28+
29+
@BeforeEach
30+
void setup() {
31+
jsonObjectDecoder = createObjectUnderTest();
32+
receivedRecord = null;
33+
}
34+
35+
@Test
36+
void test_basicJsonObjectDecoder() {
37+
String stringValue = UUID.randomUUID().toString();
38+
Random r = new Random();
39+
int intValue = r.nextInt();
40+
String inputString = "{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}";
41+
try {
42+
jsonObjectDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), null, (record) -> {
43+
receivedRecord = record;
44+
});
45+
} catch (Exception e){}
46+
47+
assertNotEquals(receivedRecord, null);
48+
Map<String, Object> map = receivedRecord.getData().toMap();
49+
assertThat(map.get("key1"), equalTo(stringValue));
50+
assertThat(map.get("key2"), equalTo(intValue));
51+
}
52+
53+
@Test
54+
void test_basicJsonObjectDecoder_withTimeReceived() {
55+
String stringValue = UUID.randomUUID().toString();
56+
Random r = new Random();
57+
int intValue = r.nextInt();
58+
59+
String inputString = "{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}";
60+
final Instant now = Instant.now();
61+
try {
62+
jsonObjectDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), now, (record) -> {
63+
receivedRecord = record;
64+
receivedTime = ((DefaultEventHandle)(((Event)record.getData()).getEventHandle())).getInternalOriginationTime();
65+
});
66+
} catch (Exception e){}
67+
68+
assertNotEquals(receivedRecord, null);
69+
Map<String, Object> map = receivedRecord.getData().toMap();
70+
assertThat(map.get("key1"), equalTo(stringValue));
71+
assertThat(map.get("key2"), equalTo(intValue));
72+
assertThat(receivedTime, equalTo(now));
73+
}
74+
75+
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ List<Future<Void>> publishToSinks(final Collection<Record> records) {
339339

340340
final RouterGetRecordStrategy getRecordStrategy =
341341
new RouterCopyRecordStrategy(eventFactory,
342-
(source.areAcknowledgementsEnabled()) ?
342+
(source.areAcknowledgementsEnabled() || buffer.isByteBuffer()) ?
343343
acknowledgementSetManager :
344344
InactiveAcknowledgementSetManager.getInstance(),
345345
sinks);

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.opensearch.dataprepper.model.record.Record;
2121
import org.opensearch.dataprepper.model.source.Source;
2222
import org.opensearch.dataprepper.model.codec.ByteDecoder;
23-
import org.opensearch.dataprepper.model.codec.JsonDecoder;
23+
import org.opensearch.dataprepper.model.codec.JsonObjectDecoder;
2424
import com.linecorp.armeria.server.HttpService;
2525
import com.linecorp.armeria.server.Server;
2626
import com.linecorp.armeria.server.ServerBuilder;
@@ -64,7 +64,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
6464
this.sourceConfig = sourceConfig;
6565
this.pluginMetrics = pluginMetrics;
6666
this.pipelineName = pipelineDescription.getPipelineName();
67-
this.byteDecoder = new JsonDecoder();
67+
this.byteDecoder = new JsonObjectDecoder();
6868
this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
6969
final PluginModel authenticationConfiguration = sourceConfig.getAuthentication();
7070
final PluginSetting authenticationPluginSetting;

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@ private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRe
8787
}
8888
try {
8989
if (buffer.isByteBuffer()) {
90-
// jsonList is ignored in this path but parse() was done to make
91-
// sure that the data is in the expected json format
92-
buffer.writeBytes(content.array(), null, bufferWriteTimeoutInMillis);
90+
for (final String json: jsonList) {
91+
buffer.writeBytes(json.getBytes(), null, bufferWriteTimeoutInMillis);
92+
}
9393
} else {
9494
final List<Record<Log>> records = jsonList.stream()
9595
.map(this::buildRecordLog)

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
3131
import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer;
3232
import org.opensearch.dataprepper.plugins.kafka.util.TestProducer;
33+
import org.opensearch.dataprepper.model.codec.JsonDecoder;
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536

@@ -126,6 +127,10 @@ void setUp() {
126127
byteDecoder = null;
127128
}
128129

130+
private KafkaBuffer createObjectUnderTestWithJsonDecoder() {
131+
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new JsonDecoder(), null, null);
132+
}
133+
129134
private KafkaBuffer createObjectUnderTest() {
130135
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, null, null, null);
131136
}
@@ -193,6 +198,70 @@ void write_and_read_max_request_test() throws TimeoutException, NoSuchFieldExcep
193198
assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap()));
194199
}
195200

201+
@Test
202+
void writeBigJson_and_read() throws Exception {
203+
final KafkaBuffer objectUnderTest = createObjectUnderTestWithJsonDecoder();
204+
205+
String inputJson = "[";
206+
final int numRecords = 10;
207+
for (int i = 0; i < numRecords; i++) {
208+
String boolString = (i % 2 == 0) ? "true" : "false";
209+
if (i != 0)
210+
inputJson += ",";
211+
inputJson += "{\"key"+i+"\": \"value"+i+"\", \"key"+(10+i)+"\": "+(50+i)+", \"key"+(20+i)+"\": "+boolString+"}";
212+
}
213+
inputJson += "]";
214+
objectUnderTest.writeBytes(inputJson.getBytes(), null, 1_000);
215+
216+
Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
217+
218+
assertThat(readResult, notNullValue());
219+
assertThat(readResult.getKey(), notNullValue());
220+
assertThat(readResult.getKey().size(), equalTo(numRecords));
221+
222+
Object[] outputRecords = readResult.getKey().toArray();
223+
for (int i = 0; i < numRecords; i++) {
224+
Record<Event> receivedRecord = (Record<Event>)outputRecords[i];
225+
assertThat(receivedRecord, notNullValue());
226+
assertThat(receivedRecord.getData(), notNullValue());
227+
Map<String, Object> receivedMap = receivedRecord.getData().toMap();
228+
assertThat(receivedMap.get("key"+i), equalTo("value"+i));
229+
assertThat(receivedMap.get("key"+(10+i)), equalTo(50+i));
230+
boolean expectedBoolString = (i % 2 == 0) ? true : false;
231+
assertThat(receivedMap.get("key"+(20+i)), equalTo(expectedBoolString));
232+
}
233+
}
234+
235+
@Test
236+
void writeMultipleSmallJson_and_read() throws Exception {
237+
final KafkaBuffer objectUnderTest = createObjectUnderTestWithJsonDecoder();
238+
239+
final int numRecords = 10;
240+
for (int i = 0; i < numRecords; i++) {
241+
String boolString = (i % 2 == 0) ? "true" : "false";
242+
String inputJson = "[{\"key"+i+"\": \"value"+i+"\", \"key"+(10+i)+"\": "+(50+i)+", \"key"+(20+i)+"\": "+boolString+"}]";
243+
objectUnderTest.writeBytes(inputJson.getBytes(), null, 1_000);
244+
}
245+
246+
Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);
247+
248+
assertThat(readResult, notNullValue());
249+
assertThat(readResult.getKey(), notNullValue());
250+
assertThat(readResult.getKey().size(), equalTo(numRecords));
251+
252+
Object[] outputRecords = readResult.getKey().toArray();
253+
for (int i = 0; i < numRecords; i++) {
254+
Record<Event> receivedRecord = (Record<Event>)outputRecords[i];
255+
assertThat(receivedRecord, notNullValue());
256+
assertThat(receivedRecord.getData(), notNullValue());
257+
Map<String, Object> receivedMap = receivedRecord.getData().toMap();
258+
assertThat(receivedMap.get("key"+i), equalTo("value"+i));
259+
assertThat(receivedMap.get("key"+(10+i)), equalTo(50+i));
260+
boolean expectedBoolString = (i % 2 == 0) ? true : false;
261+
assertThat(receivedMap.get("key"+(20+i)), equalTo(expectedBoolString));
262+
}
263+
}
264+
196265
@Test
197266
void writeBytes_and_read() throws Exception {
198267
byteDecoder = new JsonDecoder();

0 commit comments

Comments
 (0)