Skip to content

Commit efd7991

Browse files
authored
Merge branch 'opensearch-project:main' into data-loss-bug
2 parents a6983c5 + 2fe41c3 commit efd7991

6 files changed

Lines changed: 155 additions & 10 deletions

File tree

data-prepper-expression/src/main/antlr/DataPrepperExpression.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ VariableNameCharacter
271271
272272
String
273273
: DOUBLEQUOTE StringCharacters? DOUBLEQUOTE
274+
| DOUBLEQUOTE DOUBLEQUOTE DOUBLEQUOTE StringCharacters? DOUBLEQUOTE DOUBLEQUOTE DOUBLEQUOTE
274275
;
275276
276277
fragment

data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/ParseTreeCoercionService.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,7 @@ public Object coercePrimaryTerminalNode(final TerminalNode node, final Event eve
7878
case DataPrepperExpressionParser.JsonPointer:
7979
return resolveJsonPointerValue(nodeStringValue, event);
8080
case DataPrepperExpressionParser.String:
81-
final String nodeStringValueWithQuotesStripped = nodeStringValue.substring(1,
82-
nodeStringValue.length() - 1);
83-
return nodeStringValueWithQuotesStripped;
81+
return nodeStringValue.replaceAll("^\"{1,3}|\"{1,3}$", "");
8482
case DataPrepperExpressionParser.Integer:
8583
Long longValue = Long.valueOf(nodeStringValue);
8684
if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE) {

data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator_ConditionalIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ private static Stream<Arguments> validExpressionArguments() {
192192
arguments("/should_drop", event("{\"should_drop\": true}"), true),
193193
arguments("/should_drop", event("{\"should_drop\": false}"), false),
194194
arguments("/logs/2/should_drop", event("{\"logs\": [{}, {}, {\"should_drop\": true}]}"), true),
195+
arguments("/path == \"\"\"/path/to/route\"\"\"", event("{\"path\": \"/path/to/route\"}"), true),
196+
arguments("/path == \"\"\"/path/to/route\"\"\"", event("{\"path\": \"/incorrect/path\"}"), false),
195197
arguments(
196198
escapedJsonPointer(ALL_JACKSON_EVENT_GET_SUPPORTED_CHARACTERS) + " == true",
197199
complexEvent(ALL_JACKSON_EVENT_GET_SUPPORTED_CHARACTERS, true),

data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/ParseTreeCoercionServiceTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,18 @@ void testCoerceTerminalNodeEscapeJsonPointerTypeUnSupportedValues() {
226226
assertThrows(ExpressionCoercionException.class, () -> objectUnderTest.coercePrimaryTerminalNode(terminalNode, testEvent));
227227
}
228228

229+
@ParameterizedTest
230+
@MethodSource("provideSupportedJsonPointerValues")
231+
void testCoerceTerminalNodeTripleQuoteStringTypeSupportedValues(final Object testValue) {
232+
final String testKey = "/testKey";
233+
final String testTruipleQuoteStringKey = "\"\"\"/testKey\"\"\"";
234+
when(token.getType()).thenReturn(DataPrepperExpressionParser.String);
235+
when(terminalNode.getSymbol()).thenReturn(token);
236+
when(terminalNode.getText()).thenReturn(testTruipleQuoteStringKey);
237+
final Object result = objectUnderTest.coercePrimaryTerminalNode(terminalNode, null);
238+
assertThat(result, equalTo(testKey));
239+
}
240+
229241
@Test
230242
void testCoerceTerminalNodeJsonPointerTypeUnSupportedValues() {
231243
final String testKey1 = "key1";

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputCodec.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.fasterxml.jackson.core.JsonFactory;
99
import com.fasterxml.jackson.core.JsonParser;
10+
import com.fasterxml.jackson.core.JsonToken;
1011
import com.fasterxml.jackson.core.type.TypeReference;
1112
import com.fasterxml.jackson.databind.MappingIterator;
1213
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -51,15 +52,40 @@ public void parse(final InputStream inputStream, final Consumer<Record<Event>> e
5152

5253
final JsonParser parser = jsonFactory.createParser(inputStream);
5354

54-
final MappingIterator<Map<String, Object>> mapMappingIterator = objectMapper.readValues(parser, MAP_TYPE_REFERENCE);
55-
while (mapMappingIterator.hasNext()) {
56-
final Map<String, Object> json = mapMappingIterator.next();
55+
JsonToken token = parser.nextToken();
5756

58-
if(!ndjsonInputConfig.isIncludeEmptyObjects() && json.isEmpty())
59-
continue;
57+
// Continue parsing as long as we have tokens
58+
while (token != null) {
59+
if (token == JsonToken.START_ARRAY) {
60+
// Handle JSON array
61+
while (parser.nextToken() != JsonToken.END_ARRAY) {
62+
final Map<String, Object> json = objectMapper.readValue(parser, MAP_TYPE_REFERENCE);
6063

61-
final Record<Event> record = createRecord(json);
62-
eventConsumer.accept(record);
64+
if (!ndjsonInputConfig.isIncludeEmptyObjects() && json.isEmpty()) {
65+
continue;
66+
}
67+
68+
final Record<Event> record = createRecord(json);
69+
eventConsumer.accept(record);
70+
}
71+
} else {
72+
// Handle single JSON object
73+
final MappingIterator<Map<String, Object>> mapMappingIterator = objectMapper.readValues(parser, MAP_TYPE_REFERENCE);
74+
while (mapMappingIterator.hasNext()) {
75+
final Map<String, Object> json = mapMappingIterator.next();
76+
77+
if (!ndjsonInputConfig.isIncludeEmptyObjects() && json.isEmpty()) {
78+
continue;
79+
}
80+
81+
final Record<Event> record = createRecord(json);
82+
eventConsumer.accept(record);
83+
}
84+
break;
85+
}
86+
87+
// Check for next token after the end of the array or object
88+
token = parser.nextToken();
6389
}
6490
}
6591

data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputCodecTest.java

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import static org.hamcrest.CoreMatchers.equalTo;
4343
import static org.hamcrest.CoreMatchers.notNullValue;
4444
import static org.hamcrest.MatcherAssert.assertThat;
45+
import static org.junit.jupiter.api.Assertions.assertEquals;
4546
import static org.junit.jupiter.api.Assertions.assertThrows;
4647
import static org.junit.jupiter.params.provider.Arguments.arguments;
4748
import static org.mockito.Mockito.mock;
@@ -204,6 +205,111 @@ void parse_includes_empty_objects_when_configured(final InputStreamFormat inputS
204205
}
205206
}
206207

208+
@Test
209+
void parse_with_array_of_objects_asserts_number_of_objects() throws IOException {
210+
final NdjsonInputCodec objectUnderTest = createObjectUnderTest();
211+
212+
final String jsonArray = "[{\"key1\":\"value1\"},{\"key2\":\"value2\"}]";
213+
final InputStream inputStream = new ByteArrayInputStream(jsonArray.getBytes());
214+
215+
final List<Record<Event>> processedRecords = new ArrayList<>();
216+
Consumer<Record<Event>> eventConsumer = processedRecords::add;
217+
objectUnderTest.parse(inputStream, eventConsumer);
218+
assertEquals(2, processedRecords.size());
219+
}
220+
221+
@Test
222+
void parse_with_multiple_array_of_objects_asserts_number_of_objects() throws IOException {
223+
final NdjsonInputCodec objectUnderTest = createObjectUnderTest();
224+
225+
final String jsonArray = "[{\"key1\":\"value1\"},{\"key2\":\"value2\"}]\n" +
226+
"[{\"key3\":\"value3\"},{\"key4\":\"value4\"}]";
227+
final InputStream inputStream = new ByteArrayInputStream(jsonArray.getBytes());
228+
229+
final List<Record<Event>> processedRecords = new ArrayList<>();
230+
Consumer<Record<Event>> eventConsumer = processedRecords::add;
231+
objectUnderTest.parse(inputStream, eventConsumer);
232+
assertEquals(4, processedRecords.size());
233+
}
234+
235+
@Test
236+
void parse_with_multiple_array_with_empty_array() throws IOException {
237+
final NdjsonInputCodec objectUnderTest = createObjectUnderTest();
238+
239+
final String jsonArray = "[{\"key1\":\"value1\"},{\"key2\":\"value2\"}]\n" +
240+
"[]\n" +
241+
"[{\"key3\":\"value3\"},{\"key4\":\"value4\"}]";
242+
final InputStream inputStream = new ByteArrayInputStream(jsonArray.getBytes());
243+
244+
final List<Record<Event>> processedRecords = new ArrayList<>();
245+
Consumer<Record<Event>> eventConsumer = processedRecords::add;
246+
objectUnderTest.parse(inputStream, eventConsumer);
247+
assertEquals(4, processedRecords.size());
248+
}
249+
250+
@Test
251+
void parse_with_multiple_array_with_exclude_empty_object() throws IOException {
252+
final NdjsonInputCodec objectUnderTest = createObjectUnderTest();
253+
254+
final String jsonArray = "[{\"key1\":\"value1\"},{\"key2\":\"value2\"}]\n" +
255+
"[{}, {\"key1\":\"value3\"}]\n" +
256+
"[{\"key3\":\"value3\"},{\"key4\":\"value4\"}]";
257+
final InputStream inputStream = new ByteArrayInputStream(jsonArray.getBytes());
258+
259+
final List<Record<Event>> processedRecords = new ArrayList<>();
260+
Consumer<Record<Event>> eventConsumer = processedRecords::add;
261+
objectUnderTest.parse(inputStream, eventConsumer);
262+
assertEquals(5, processedRecords.size());
263+
}
264+
265+
@Test
266+
void parse_with_multiple_array_with_include_empty_object() throws IOException {
267+
when(config.isIncludeEmptyObjects()).thenReturn(true);
268+
final NdjsonInputCodec objectUnderTest = createObjectUnderTest();
269+
270+
final String jsonArray = "[{\"key1\":\"value1\"},{\"key2\":\"value2\"}]\n" +
271+
"[{}, {\"key1\":\"value3\"}]\n" +
272+
"[{\"key3\":\"value3\"},{\"key4\":\"value4\"}]";
273+
final InputStream inputStream = new ByteArrayInputStream(jsonArray.getBytes());
274+
275+
final List<Record<Event>> processedRecords = new ArrayList<>();
276+
Consumer<Record<Event>> eventConsumer = processedRecords::add;
277+
objectUnderTest.parse(inputStream, eventConsumer);
278+
assertEquals(6, processedRecords.size());
279+
}
280+
281+
@Test
282+
void parse_with_array_of_empty_objects_excludes_objects_by_default() throws IOException {
283+
final NdjsonInputCodec objectUnderTest = createObjectUnderTest();
284+
285+
final String jsonArray = "[{},{}]";
286+
final InputStream inputStream = new ByteArrayInputStream(jsonArray.getBytes());
287+
288+
final List<Record<Event>> processedRecords = new ArrayList<>();
289+
Consumer<Record<Event>> eventConsumer = processedRecords::add;
290+
objectUnderTest.parse(inputStream, eventConsumer);
291+
assertEquals(0, processedRecords.size());
292+
}
293+
294+
@Test
295+
void parse_with_array_of_empty_objects_includes_objects_when_configured() throws IOException {
296+
when(config.isIncludeEmptyObjects()).thenReturn(true);
297+
final NdjsonInputCodec objectUnderTest = createObjectUnderTest();
298+
299+
final String jsonArray = "[{},{}]";
300+
final InputStream inputStream = new ByteArrayInputStream(jsonArray.getBytes());
301+
302+
final List<Record<Event>> processedRecords = new ArrayList<>();
303+
Consumer<Record<Event>> eventConsumer = processedRecords::add;
304+
objectUnderTest.parse(inputStream, eventConsumer);
305+
assertEquals(2, processedRecords.size());
306+
307+
for (Record<Event> record : processedRecords) {
308+
assertThat(record.getData(), notNullValue());
309+
assertThat(record.getData().toMap().size(), equalTo(0));
310+
}
311+
}
312+
207313
static class ValidInputStreamFormatsArgumentsProvider implements ArgumentsProvider {
208314
@Override
209315
public Stream<? extends Arguments> provideArguments(final ExtensionContext extensionContext) throws Exception {

0 commit comments

Comments
 (0)