Skip to content

Commit 6089f56

Browse files
committed
Updates to the OutputCodec Javadocs and writer-based implementation for NdjsonOutputCodec.
Signed-off-by: David Venable <dlv@amazon.com>
1 parent e5b3dfe commit 6089f56

3 files changed

Lines changed: 79 additions & 10 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,45 @@ public interface OutputCodec {
2222

2323
static final ObjectMapper objectMapper = new ObjectMapper();
2424

25+
/**
26+
* A writer specific to a single buffer.
27+
*
28+
* @since 2.12
29+
*/
2530
interface Writer {
31+
/**
32+
* Writes a single event to the {@link OutputStream}.
33+
*
34+
* @param event A Data Prepper {@link Event}
35+
* @throws IOException An IO exception writing to the stream
36+
*
37+
* @since 2.12
38+
*/
2639
void writeEvent(Event event) throws IOException;
40+
41+
/**
42+
* Completes a writer.
43+
*
44+
* @throws IOException An IO exception completing the stream
45+
*
46+
* @since 2.12
47+
*/
2748
void complete() throws IOException;
2849
}
2950

51+
/**
52+
* Creates a new {@link Writer} for a given {@link OutputStream}.
53+
* Typically, you create one per buffer.
54+
*
55+
* @param outputStream The {@link OutputStream} to write to
56+
* @param sampleEvent A sample Data Prepper {@link Event}.
57+
* It is not written to the stream, but may be used for metadata.
58+
* @param codecContext The {@link OutputCodecContext}
59+
* @return A {@link Writer} to use for this buffer.
60+
* @throws IOException An IO exception occurs initializing the writer or stream
61+
*
62+
* @since 2.12
63+
*/
3064
default Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) throws IOException {
3165
final OutputCodec codec = this;
3266
codec.start(outputStream, sampleEvent, codecContext);

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ public void complete() throws IOException {
7171
outputStream.close();
7272
}
7373

74-
7574
private Map<String, Object> getDataMapToSerialize(final Event event) throws JsonProcessingException {
7675
final Event modifiedEvent;
7776
if (codecContext.getTagsTargetKey() != null) {

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputCodec.java

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
*/
55
package org.opensearch.dataprepper.plugins.codec.json;
66

7-
import com.fasterxml.jackson.databind.ObjectMapper;
87
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
98
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
109
import org.opensearch.dataprepper.model.codec.OutputCodec;
@@ -22,31 +21,56 @@
2221
@DataPrepperPlugin(name = "ndjson", pluginType = OutputCodec.class, pluginConfigurationType = NdjsonOutputConfig.class)
2322
public class NdjsonOutputCodec implements OutputCodec {
2423
private static final String NDJSON = "ndjson";
25-
private static final ObjectMapper objectMapper = new ObjectMapper();
26-
private final NdjsonOutputConfig config;
27-
private OutputCodecContext codecContext;
24+
private OutputCodecContext deprecatedSupportCodecContext;
2825

2926
@DataPrepperPluginConstructor
3027
public NdjsonOutputCodec(final NdjsonOutputConfig config) {
3128
Objects.requireNonNull(config);
32-
this.config = config;
29+
}
30+
31+
private static class NdjsonWriter implements Writer {
32+
private final OutputStream outputStream;
33+
private final OutputCodecContext codecContext;
34+
35+
private NdjsonWriter(final OutputStream outputStream, final OutputCodecContext codecContext) {
36+
this.outputStream = outputStream;
37+
this.codecContext = codecContext;
38+
}
39+
40+
@Override
41+
public void writeEvent(final Event event) throws IOException {
42+
doWriteEvent(outputStream, event, codecContext);
43+
}
44+
45+
@Override
46+
public void complete() throws IOException {
47+
outputStream.close();
48+
}
49+
}
50+
51+
@Override
52+
public Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) {
53+
Objects.requireNonNull(outputStream);
54+
Objects.requireNonNull(codecContext);
55+
56+
return new NdjsonWriter(outputStream, codecContext);
3357
}
3458

3559
@Override
3660
public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException {
3761
Objects.requireNonNull(outputStream);
3862
Objects.requireNonNull(codecContext);
39-
this.codecContext = codecContext;
63+
this.deprecatedSupportCodecContext = codecContext;
4064
}
4165

4266
@Override
4367
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
4468
Objects.requireNonNull(event);
4569

4670
String json = event.jsonBuilder()
47-
.includeKeys(codecContext.getIncludeKeys())
48-
.excludeKeys(codecContext.getExcludeKeys())
49-
.includeTags(codecContext.getTagsTargetKey())
71+
.includeKeys(deprecatedSupportCodecContext.getIncludeKeys())
72+
.excludeKeys(deprecatedSupportCodecContext.getExcludeKeys())
73+
.includeTags(deprecatedSupportCodecContext.getTagsTargetKey())
5074
.toJsonString();
5175
outputStream.write(json.getBytes());
5276
outputStream.write(System.lineSeparator().getBytes());
@@ -61,4 +85,16 @@ public void complete(final OutputStream outputStream) throws IOException {
6185
public String getExtension() {
6286
return NDJSON;
6387
}
88+
89+
private static void doWriteEvent(final OutputStream outputStream, final Event event, final OutputCodecContext codecContext) throws IOException {
90+
Objects.requireNonNull(event);
91+
92+
String json = event.jsonBuilder()
93+
.includeKeys(codecContext.getIncludeKeys())
94+
.excludeKeys(codecContext.getExcludeKeys())
95+
.includeTags(codecContext.getTagsTargetKey())
96+
.toJsonString();
97+
outputStream.write(json.getBytes());
98+
outputStream.write(System.lineSeparator().getBytes());
99+
}
64100
}

0 commit comments

Comments
 (0)