Skip to content

Commit d5b5496

Browse files
saketh-pallempatiEC2 Default User
authored andcommitted
feat: Add configurable stream read constraints for JSON input codec (opensearch-project#5541)
feat: Add configurable stream read constraints for JSON input codec - Introduced a configurable option for setting the maximum event length in the JSON input codec. - Updated JsonDecoder to accept max_event_length parameter and apply it to StreamReadConstraints. - Added validation to ensure the maximum event length is within acceptable limits. Fixes opensearch-project#5466 * Set maxEventLength default to null for safer usage. * Set min value of maxEventLength to Jackson's default Signed-off-by: Pallempati Saketh <pallempati.saketh@fmr.com>
1 parent d47e3ff commit d5b5496

7 files changed

Lines changed: 83 additions & 21 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.fasterxml.jackson.core.JsonFactory;
99
import com.fasterxml.jackson.core.JsonParser;
1010
import com.fasterxml.jackson.core.JsonToken;
11+
import com.fasterxml.jackson.core.StreamReadConstraints;
1112
import com.fasterxml.jackson.databind.ObjectMapper;
1213

1314
import org.opensearch.dataprepper.model.event.Event;
@@ -30,10 +31,15 @@ public class JsonDecoder implements ByteDecoder {
3031
private Collection<String> includeKeys;
3132
private Collection<String> includeKeysMetadata;
3233

33-
public JsonDecoder(String keyName, Collection<String> includeKeys, Collection<String> includeKeysMetadata) {
34+
public JsonDecoder(String keyName, Collection<String> includeKeys, Collection<String> includeKeysMetadata, Integer maxEventLength) {
3435
this.keyName = keyName;
3536
this.includeKeys = includeKeys;
3637
this.includeKeysMetadata = includeKeysMetadata;
38+
if (maxEventLength != null) {
39+
jsonFactory.setStreamReadConstraints(StreamReadConstraints.builder()
40+
.maxStringLength(maxEventLength)
41+
.build());
42+
}
3743
}
3844

3945
public JsonDecoder() {

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@
2222
import static org.hamcrest.CoreMatchers.equalTo;
2323
import static org.hamcrest.MatcherAssert.assertThat;
2424
import static org.junit.jupiter.api.Assertions.assertEquals;
25-
import static org.junit.jupiter.api.Assertions.assertFalse;
2625
import static org.junit.jupiter.api.Assertions.assertNotEquals;
26+
import static org.junit.jupiter.api.Assertions.assertNotNull;
2727
import static org.junit.jupiter.api.Assertions.assertTrue;
28+
import static org.junit.jupiter.api.Assertions.assertFalse;
29+
import static org.junit.jupiter.api.Assertions.assertThrows;
30+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
31+
2832

2933
import org.junit.jupiter.api.BeforeEach;
3034

@@ -48,32 +52,68 @@ void test_basicJsonDecoder() {
4852
String stringValue = UUID.randomUUID().toString();
4953
Random r = new Random();
5054
int intValue = r.nextInt();
51-
String inputString = "[{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}]";
55+
String inputString = "[{\"key1\":\"" + stringValue + "\", \"key2\":" + intValue + "}]";
5256
try {
5357
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), null, (record) -> {
5458
receivedRecord = record;
5559
});
56-
} catch (Exception e){}
57-
60+
} catch (Exception e) {
61+
}
62+
5863
assertNotEquals(receivedRecord, null);
5964
Map<String, Object> map = receivedRecord.getData().toMap();
6065
assertThat(map.get("key1"), equalTo(stringValue));
6166
assertThat(map.get("key2"), equalTo(intValue));
6267
}
6368

69+
@Test
70+
void test_basicJsonDecoder_exceedingMaxEventLength_throwsException() {
71+
String largeString = "x".repeat(200);
72+
String inputString = "[{\"key1\":\"" + largeString + "\"}]";
73+
74+
jsonDecoder = new JsonDecoder(null, null, null, 100);
75+
76+
Exception exception = assertThrows(Exception.class, () -> {
77+
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), null, (record) -> {
78+
receivedRecord = record;
79+
});
80+
});
81+
82+
assertEquals("String value length (200) exceeds the maximum allowed (100, from `StreamReadConstraints.getMaxStringLength()`)", exception.getMessage());
83+
}
84+
85+
@Test
86+
void test_basicJsonDecoder_withMaxEventLength() {
87+
String validString = "Short string";
88+
String inputString = "[{\"key1\":\"" + validString + "\"}]";
89+
90+
jsonDecoder = new JsonDecoder(null, null, null, 100);
91+
92+
assertDoesNotThrow(() -> {
93+
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), null, (record) -> {
94+
receivedRecord = record;
95+
});
96+
});
97+
98+
assertNotNull(receivedRecord);
99+
Map<String, Object> map = receivedRecord.getData().toMap();
100+
assertThat(map.get("key1"), equalTo(validString));
101+
}
102+
64103
@Test
65104
void test_basicJsonDecoder_withTimeReceived() {
66105
String stringValue = UUID.randomUUID().toString();
67106
Random r = new Random();
68107
int intValue = r.nextInt();
69-
String inputString = "[{\"key1\":\""+stringValue+"\", \"key2\":"+intValue+"}]";
108+
String inputString = "[{\"key1\":\"" + stringValue + "\", \"key2\":" + intValue + "}]";
70109
final Instant now = Instant.now();
71110
try {
72111
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), now, (record) -> {
73112
receivedRecord = record;
74113
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
75114
});
76-
} catch (Exception e){}
115+
} catch (Exception e) {
116+
}
77117

78118
assertNotEquals(receivedRecord, null);
79119
Map<String, Object> map = receivedRecord.getData().toMap();
@@ -91,21 +131,23 @@ class JsonDecoderWithInputConfig {
91131
private static final int numKeyPerRecord = 3;
92132
private Map<String, Object> jsonObject;
93133
private final String key_name = "logEvents";
134+
private final Integer maxEventLength = 20000000;
94135

95136
@BeforeEach
96137
void setup() {
97138
objectMapper = new ObjectMapper();
98-
for (int i=0; i<10; i++) {
139+
for (int i = 0; i < 10; i++) {
99140
includeKeys.add(UUID.randomUUID().toString());
100141
includeMetadataKeys.add(UUID.randomUUID().toString());
101142
}
102143
jsonObject = generateJsonWithSpecificKeys(includeKeys, includeMetadataKeys, key_name, numKeyRecords, numKeyPerRecord);
103144
}
145+
104146
@Test
105147
void test_basicJsonDecoder_withInputConfig() throws IOException {
106148
final Instant now = Instant.now();
107149
List<Record<Event>> records = new ArrayList<>();
108-
jsonDecoder = new JsonDecoder(key_name, includeKeys, includeMetadataKeys);
150+
jsonDecoder = new JsonDecoder(key_name, includeKeys, includeMetadataKeys, maxEventLength);
109151
jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> {
110152
records.add(record);
111153
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
@@ -118,10 +160,10 @@ void test_basicJsonDecoder_withInputConfig() throws IOException {
118160
Map<String, Object> dataMap = record.getData().toMap();
119161
Map<String, Object> metadataMap = record.getData().getMetadata().getAttributes();
120162

121-
for (String includeKey: includeKeys) {
163+
for (String includeKey : includeKeys) {
122164
assertThat(dataMap.get(includeKey), equalTo(jsonObject.get(includeKey)));
123165
}
124-
for (String includeMetadataKey: includeMetadataKeys) {
166+
for (String includeMetadataKey : includeMetadataKeys) {
125167
assertThat(metadataMap.get(includeMetadataKey), equalTo(jsonObject.get(includeMetadataKey)));
126168
}
127169
});
@@ -133,7 +175,7 @@ void test_basicJsonDecoder_withInputConfig() throws IOException {
133175
void test_basicJsonDecoder_withInputConfig_withoutEvents_empty_metadata_keys() throws IOException {
134176
final Instant now = Instant.now();
135177
List<Record<Event>> records = new ArrayList<>();
136-
jsonDecoder = new JsonDecoder("", includeKeys, Collections.emptyList());
178+
jsonDecoder = new JsonDecoder("", includeKeys, Collections.emptyList(), maxEventLength);
137179
jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> {
138180
records.add(record);
139181
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
@@ -145,7 +187,7 @@ void test_basicJsonDecoder_withInputConfig_withoutEvents_empty_metadata_keys() t
145187
void test_basicJsonDecoder_withInputConfig_withoutEvents_null_include_metadata_keys() throws IOException {
146188
final Instant now = Instant.now();
147189
List<Record<Event>> records = new ArrayList<>();
148-
jsonDecoder = new JsonDecoder("", includeKeys, null);
190+
jsonDecoder = new JsonDecoder("", includeKeys, null, maxEventLength);
149191
jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> {
150192
records.add(record);
151193
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
@@ -158,7 +200,7 @@ void test_basicJsonDecoder_withInputConfig_withoutEvents_null_include_metadata_k
158200
void test_basicJsonDecoder_withInputConfig_withoutEvents_empty_include_keys() throws IOException {
159201
final Instant now = Instant.now();
160202
List<Record<Event>> records = new ArrayList<>();
161-
jsonDecoder = new JsonDecoder("", Collections.emptyList(), includeMetadataKeys);
203+
jsonDecoder = new JsonDecoder("", Collections.emptyList(), includeMetadataKeys, maxEventLength);
162204
jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> {
163205
records.add(record);
164206
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
@@ -170,7 +212,7 @@ void test_basicJsonDecoder_withInputConfig_withoutEvents_empty_include_keys() th
170212
void test_basicJsonDecoder_withInputConfig_withoutEvents_null_include_keys() throws IOException {
171213
final Instant now = Instant.now();
172214
List<Record<Event>> records = new ArrayList<>();
173-
jsonDecoder = new JsonDecoder("", null, includeMetadataKeys);
215+
jsonDecoder = new JsonDecoder("", null, includeMetadataKeys, maxEventLength);
174216
jsonDecoder.parse(createInputStream(jsonObject), now, (record) -> {
175217
records.add(record);
176218
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
@@ -187,17 +229,17 @@ private Map<String, Object> generateJsonWithSpecificKeys(final List<String> incl
187229
final Map<String, Object> jsonObject = new LinkedHashMap<>();
188230
final List<Map<String, Object>> innerObjects = new ArrayList<>();
189231

190-
for (String includeKey: includeKeys) {
232+
for (String includeKey : includeKeys) {
191233
jsonObject.put(includeKey, UUID.randomUUID().toString());
192234
}
193235

194-
for (String includeMetadataKey: includeMetadataKeys) {
236+
for (String includeMetadataKey : includeMetadataKeys) {
195237
jsonObject.put(includeMetadataKey, UUID.randomUUID().toString());
196238
}
197239

198-
for (int i=0; i<numKeyRecords; i++) {
240+
for (int i = 0; i < numKeyRecords; i++) {
199241
final Map<String, Object> innerJsonMap = new LinkedHashMap<>();
200-
for (int j=0; j<numKeyPerRecord; j++) {
242+
for (int j = 0; j < numKeyPerRecord; j++) {
201243
innerJsonMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString());
202244
}
203245
innerObjects.add(innerJsonMap);

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodec.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@ public class JsonInputCodec extends JsonDecoder implements InputCodec {
2525

2626
@DataPrepperPluginConstructor
2727
public JsonInputCodec(final JsonInputCodecConfig config) {
28-
super(Objects.requireNonNull(config).getKeyName(), config.getIncludeKeys(), config.getIncludeKeysMetadata());
28+
super(Objects.requireNonNull(config).getKeyName(), config.getIncludeKeys(), config.getIncludeKeysMetadata(), config.getMaxEventLength());
2929
}
3030

3131
public void parse(InputStream inputStream, Consumer<Record<Event>> eventConsumer) throws IOException {
3232
parse(inputStream, null, eventConsumer);
3333
}
34-
3534
}

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
package org.opensearch.dataprepper.plugins.codec.json;
1212

1313
import com.fasterxml.jackson.annotation.JsonProperty;
14+
import com.fasterxml.jackson.core.StreamReadConstraints;
15+
import jakarta.validation.constraints.Min;
1416
import jakarta.validation.constraints.Size;
1517

1618
import java.util.List;
@@ -38,4 +40,12 @@ public List<String> getIncludeKeys() {
3840
public List<String> getIncludeKeysMetadata() {
3941
return includeKeysMetadata;
4042
}
43+
44+
@JsonProperty("max_event_length")
45+
@Min(StreamReadConstraints.DEFAULT_MAX_STRING_LEN)
46+
private Integer maxEventLength;
47+
48+
public Integer getMaxEventLength(){
49+
return maxEventLength;
50+
}
4151
}

data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonCodecsIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ void setUp() {
5151
when(jsonInputCodecConfig.getIncludeKeysMetadata()).thenReturn(Collections.emptyList());
5252
when(jsonInputCodecConfig.getIncludeKeys()).thenReturn(Collections.emptyList());
5353
when(jsonInputCodecConfig.getKeyName()).thenReturn(null);
54+
when(jsonInputCodecConfig.getMaxEventLength()).thenReturn(null);
5455
eventConsumer = mock(Consumer.class);
5556
}
5657

data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecConfigTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
import org.junit.jupiter.api.Test;
99

10+
import static org.hamcrest.CoreMatchers.equalTo;
11+
import static org.hamcrest.MatcherAssert.assertThat;
1012
import static org.junit.jupiter.api.Assertions.assertNull;
1113

1214
public class JsonInputCodecConfigTest {
@@ -21,5 +23,6 @@ public void testJsonInputCodecConfig() {
2123
assertNull(jsonInputCodecConfig.getKeyName());
2224
assertNull(jsonInputCodecConfig.getIncludeKeys());
2325
assertNull(jsonInputCodecConfig.getIncludeKeysMetadata());
26+
assertThat(jsonInputCodecConfig.getMaxEventLength(), equalTo(null));
2427
}
2528
}

data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonInputCodecTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ void setUp() {
6868
when(jsonInputCodecConfig.getIncludeKeysMetadata()).thenReturn(null);
6969
when(jsonInputCodecConfig.getIncludeKeys()).thenReturn(null);
7070
when(jsonInputCodecConfig.getKeyName()).thenReturn(null);
71+
when(jsonInputCodecConfig.getMaxEventLength()).thenReturn(null);
7172
eventConsumer = mock(Consumer.class);
7273
}
7374

0 commit comments

Comments
 (0)