Skip to content

Commit 59e9fd6

Browse files
authored
Adds an ndjson input codec. This reads JSON objects for ND-JSON and more lenient formats that do not have the newline. (#4533)
Signed-off-by: David Venable <dlv@amazon.com>
1 parent e35b4ea commit 59e9fd6

4 files changed

Lines changed: 413 additions & 0 deletions

File tree

data-prepper-plugins/parse-json-processor/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ dependencies {
1515
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
1616
implementation 'org.apache.parquet:parquet-common:1.14.0'
1717
testImplementation project(':data-prepper-test-common')
18+
testImplementation project(':data-prepper-test-event')
1819
}
1920

2021
test {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.codec.json;
7+
8+
import com.fasterxml.jackson.core.JsonFactory;
9+
import com.fasterxml.jackson.core.JsonParser;
10+
import com.fasterxml.jackson.core.type.TypeReference;
11+
import com.fasterxml.jackson.databind.MappingIterator;
12+
import com.fasterxml.jackson.databind.ObjectMapper;
13+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
14+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
15+
import org.opensearch.dataprepper.model.codec.InputCodec;
16+
import org.opensearch.dataprepper.model.event.Event;
17+
import org.opensearch.dataprepper.model.event.EventFactory;
18+
import org.opensearch.dataprepper.model.event.LogEventBuilder;
19+
import org.opensearch.dataprepper.model.log.Log;
20+
import org.opensearch.dataprepper.model.record.Record;
21+
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.util.Map;
25+
import java.util.Objects;
26+
import java.util.function.Consumer;
27+
28+
/**
29+
* A Data Prepper {@link InputCodec} which reads ND-JSON and other similar
30+
* formats which have JSON objects together.
31+
*/
32+
@DataPrepperPlugin(name = "ndjson", pluginType = InputCodec.class, pluginConfigurationType = NdjsonInputConfig.class)
33+
public class NdjsonInputCodec implements InputCodec {
34+
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<>() {};
35+
private final ObjectMapper objectMapper = new ObjectMapper();
36+
private final NdjsonInputConfig ndjsonInputConfig;
37+
private final EventFactory eventFactory;
38+
private final JsonFactory jsonFactory;
39+
40+
@DataPrepperPluginConstructor
41+
public NdjsonInputCodec(final NdjsonInputConfig ndjsonInputConfig, final EventFactory eventFactory) {
42+
this.ndjsonInputConfig = ndjsonInputConfig;
43+
this.eventFactory = eventFactory;
44+
jsonFactory = new JsonFactory();
45+
}
46+
47+
@Override
48+
public void parse(final InputStream inputStream, final Consumer<Record<Event>> eventConsumer) throws IOException {
49+
Objects.requireNonNull(inputStream, "Parameter inputStream must not be null.");
50+
Objects.requireNonNull(eventConsumer, "Parameter eventConsumer must not be null.");
51+
52+
final JsonParser parser = jsonFactory.createParser(inputStream);
53+
54+
final MappingIterator<Map<String, Object>> mapMappingIterator = objectMapper.readValues(parser, MAP_TYPE_REFERENCE);
55+
while (mapMappingIterator.hasNext()) {
56+
final Map<String, Object> json = mapMappingIterator.next();
57+
58+
if(!ndjsonInputConfig.isIncludeEmptyObjects() && json.isEmpty())
59+
continue;
60+
61+
final Record<Event> record = createRecord(json);
62+
eventConsumer.accept(record);
63+
}
64+
}
65+
66+
private Record<Event> createRecord(final Map<String, Object> json) {
67+
final Log event = eventFactory.eventBuilder(LogEventBuilder.class)
68+
.withData(json)
69+
.build();
70+
71+
return new Record<>(event);
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.codec.json;
7+
8+
import com.fasterxml.jackson.annotation.JsonProperty;
9+
10+
/**
11+
* Configuration for the {@link NdjsonInputCodec} input codec.
12+
*/
13+
public class NdjsonInputConfig {
14+
/**
15+
* By default, we will not create events for empty objects. However, we will
16+
* permit users to include them if they desire.
17+
*/
18+
@JsonProperty("include_empty_objects")
19+
private boolean includeEmptyObjects = false;
20+
21+
public boolean isIncludeEmptyObjects() {
22+
return includeEmptyObjects;
23+
}
24+
}

0 commit comments

Comments
 (0)