Skip to content

Commit bc4039d

Browse files
authored
Initial commit to refactor the OutputCodec to support a Writer per OutputStream (#5606)
Initial commit to refactor the OutputCodec to support a Writer that is bound to a specific OutputStream. This supports backward compatibility with the existing APIs and an update to the JsonOutputCodec and NdjsonOutputCodec to start the migration. Adds missing unit tests for NdjsonOutputCodec. Signed-off-by: David Venable <dlv@amazon.com>
1 parent c60a3f9 commit bc4039d

6 files changed

Lines changed: 568 additions & 48 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,61 @@ public interface OutputCodec {
2222

2323
static final ObjectMapper objectMapper = new ObjectMapper();
2424

25+
/**
26+
* A writer specific to a single buffer.
27+
*
28+
* @since 2.12
29+
*/
30+
interface Writer {
31+
/**
32+
* Writes a single event to the {@link OutputStream}.
33+
*
34+
* @param event A Data Prepper {@link Event}
35+
* @throws IOException An IO exception writing to the stream
36+
*
37+
* @since 2.12
38+
*/
39+
void writeEvent(Event event) throws IOException;
40+
41+
/**
42+
* Completes a writer.
43+
*
44+
* @throws IOException An IO exception completing the stream
45+
*
46+
* @since 2.12
47+
*/
48+
void complete() throws IOException;
49+
}
50+
51+
/**
52+
* Creates a new {@link Writer} for a given {@link OutputStream}.
53+
* Typically, you create one per buffer.
54+
*
55+
* @param outputStream The {@link OutputStream} to write to
56+
* @param sampleEvent A sample Data Prepper {@link Event}.
57+
* It is not written to the stream, but may be used for metadata.
58+
* @param codecContext The {@link OutputCodecContext}
59+
* @return A {@link Writer} to use for this buffer.
60+
* @throws IOException An IO exception occurs initializing the writer or stream
61+
*
62+
* @since 2.12
63+
*/
64+
default Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) throws IOException {
65+
final OutputCodec codec = this;
66+
codec.start(outputStream, sampleEvent, codecContext);
67+
return new Writer() {
68+
@Override
69+
public void writeEvent(final Event event) throws IOException {
70+
codec.writeEvent(event, outputStream);
71+
}
72+
73+
@Override
74+
public void complete() throws IOException {
75+
codec.complete(outputStream);
76+
}
77+
};
78+
}
79+
2580
/**
2681
* this method get called from {@link Sink} to do initial wrapping in {@link OutputStream}
2782
* Implementors should do initial wrapping according to the implementation
@@ -30,7 +85,9 @@ public interface OutputCodec {
3085
* @param event Event to auto-generate schema
3186
* @param context Extra Context used in Codec.
3287
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
88+
* @deprecated Use {@link OutputCodec#createWriter(OutputStream, Event, OutputCodecContext)} instead.
3389
*/
90+
@Deprecated
3491
void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException;
3592

3693
/**
@@ -40,7 +97,9 @@ public interface OutputCodec {
4097
* @param event event Record event
4198
* @param outputStream outputStream param to hold the event data
4299
* @throws IOException throws IOException when not able to write data to {@link OutputStream}
100+
* @deprecated @deprecated Use {@link OutputCodec.Writer#writeEvent(Event)} instead.
43101
*/
102+
@Deprecated
44103
void writeEvent(Event event, OutputStream outputStream) throws IOException;
45104

46105
/**
@@ -49,7 +108,9 @@ public interface OutputCodec {
49108
*
50109
* @param outputStream outputStream param for wrapping
51110
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
111+
* @deprecated @deprecated Use {@link Writer#complete()} instead.
52112
*/
113+
@Deprecated
53114
void complete(OutputStream outputStream) throws IOException;
54115

55116
/**

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,17 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
16
package org.opensearch.dataprepper.model.codec;
27

38
import com.fasterxml.jackson.core.JsonProcessingException;
9+
import org.junit.jupiter.api.Nested;
410
import org.junit.jupiter.api.Test;
11+
import org.junit.jupiter.api.extension.ExtendWith;
12+
import org.mockito.Mock;
513
import org.mockito.invocation.InvocationOnMock;
14+
import org.mockito.junit.jupiter.MockitoExtension;
615
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
716
import org.opensearch.dataprepper.model.event.Event;
817
import org.opensearch.dataprepper.model.event.EventMetadata;
@@ -20,11 +29,16 @@
2029
import java.util.UUID;
2130

2231
import static org.hamcrest.CoreMatchers.equalTo;
32+
import static org.hamcrest.CoreMatchers.not;
33+
import static org.hamcrest.CoreMatchers.sameInstance;
2334
import static org.hamcrest.MatcherAssert.assertThat;
2435
import static org.junit.jupiter.api.Assertions.assertNotEquals;
36+
import static org.mockito.Mockito.doCallRealMethod;
2537
import static org.mockito.Mockito.mock;
38+
import static org.mockito.Mockito.verify;
2639
import static org.mockito.Mockito.verifyNoInteractions;
2740

41+
@ExtendWith(MockitoExtension.class)
2842
public class OutputCodecTest {
2943
@Test
3044
void isCompressionInternal_returns_false() {
@@ -85,4 +99,61 @@ private static Map<String, Object> generateJson() {
8599
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
86100
return jsonObject;
87101
}
102+
103+
@Nested
104+
class DefaultWriter {
105+
@Mock
106+
private OutputStream outputStream;
107+
@Mock
108+
private Event event;
109+
@Mock
110+
private OutputCodecContext outputCodecContext;
111+
112+
@Test
113+
void createWriter_returns_new_instance() throws IOException {
114+
final OutputCodec objectUnderTest = mock(OutputCodec.class);
115+
116+
doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext);
117+
118+
assertThat(objectUnderTest.createWriter(outputStream, event, outputCodecContext),
119+
not(sameInstance(objectUnderTest.createWriter(outputStream, event, outputCodecContext))));
120+
}
121+
122+
@Test
123+
void createWriter_calls_start() throws IOException {
124+
final OutputCodec objectUnderTest = mock(OutputCodec.class);
125+
126+
doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext);
127+
128+
objectUnderTest.createWriter(outputStream, event, outputCodecContext);
129+
130+
verify(objectUnderTest).start(outputStream, event, outputCodecContext);
131+
}
132+
133+
@Test
134+
void writer_writeEvent_calls_writeEvent_on_OutputCodec() throws IOException {
135+
final OutputCodec objectUnderTest = mock(OutputCodec.class);
136+
137+
doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext);
138+
139+
OutputCodec.Writer writer = objectUnderTest.createWriter(outputStream, event, outputCodecContext);
140+
141+
writer.writeEvent(event);
142+
143+
verify(objectUnderTest).writeEvent(event, outputStream);
144+
}
145+
146+
@Test
147+
void writer_complete_calls_complete_on_OutputCodec() throws IOException {
148+
final OutputCodec objectUnderTest = mock(OutputCodec.class);
149+
150+
doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext);
151+
152+
OutputCodec.Writer writer = objectUnderTest.createWriter(outputStream, event, outputCodecContext);
153+
154+
writer.complete();
155+
156+
verify(objectUnderTest).complete(outputStream);
157+
}
158+
}
88159
}

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java

Lines changed: 67 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,69 +30,97 @@
3030
public class JsonOutputCodec implements OutputCodec {
3131
private final ObjectMapper objectMapper = new ObjectMapper();
3232
private static final String JSON = "json";
33-
private static final JsonFactory factory = new JsonFactory();
33+
private static final JsonFactory JSON_FACTORY = new JsonFactory();
3434
private final JsonOutputCodecConfig config;
35-
private JsonGenerator generator;
36-
private OutputCodecContext codecContext;
35+
private JsonWriter deprecatedSupportWriter;
3736

3837
@DataPrepperPluginConstructor
3938
public JsonOutputCodec(final JsonOutputCodecConfig config) {
4039
Objects.requireNonNull(config);
4140
this.config = config;
4241
}
4342

43+
private class JsonWriter implements Writer {
44+
private final JsonGenerator generator;
45+
private final OutputStream outputStream;
46+
private final OutputCodecContext codecContext;
47+
48+
private JsonWriter(final OutputStream outputStream, final OutputCodecContext codecContext) throws IOException {
49+
this.outputStream = outputStream;
50+
this.codecContext = codecContext;
51+
generator = JSON_FACTORY.createGenerator(outputStream, JsonEncoding.UTF8);
52+
generator.writeStartObject();
53+
generator.writeFieldName(config.getKeyName());
54+
generator.writeStartArray();
55+
}
56+
57+
@Override
58+
public void writeEvent(final Event event) throws IOException {
59+
Objects.requireNonNull(event);
60+
final Map<String, Object> dataMap = getDataMapToSerialize(event);
61+
objectMapper.writeValue(generator, dataMap);
62+
generator.flush();
63+
}
64+
65+
@Override
66+
public void complete() throws IOException {
67+
generator.writeEndArray();
68+
generator.writeEndObject();
69+
generator.close();
70+
outputStream.flush();
71+
outputStream.close();
72+
}
73+
74+
private Map<String, Object> getDataMapToSerialize(final Event event) throws JsonProcessingException {
75+
final Event modifiedEvent;
76+
if (codecContext.getTagsTargetKey() != null) {
77+
modifiedEvent = addTagsToEvent(event, codecContext.getTagsTargetKey());
78+
} else {
79+
modifiedEvent = event;
80+
}
81+
Map<String, Object> dataMap = modifiedEvent.toMap();
82+
83+
if ((codecContext.getIncludeKeys() != null && !codecContext.getIncludeKeys().isEmpty()) ||
84+
(codecContext.getExcludeKeys() != null && !codecContext.getExcludeKeys().isEmpty())) {
85+
86+
final Map<String, Object> finalDataMap = dataMap;
87+
dataMap = dataMap.keySet()
88+
.stream()
89+
.filter(codecContext::shouldIncludeKey)
90+
.collect(Collectors.toMap(Function.identity(), finalDataMap::get));
91+
}
92+
return dataMap;
93+
}
94+
}
95+
4496
@Override
4597
public String getExtension() {
4698
return JSON;
4799
}
48100

49101
@Override
50-
public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException {
102+
public Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) throws IOException {
51103
Objects.requireNonNull(outputStream);
52104
Objects.requireNonNull(codecContext);
53-
this.codecContext = codecContext;
54-
generator = factory.createGenerator(outputStream, JsonEncoding.UTF8);
55-
generator.writeStartObject();
56-
generator.writeFieldName(config.getKeyName());
57-
generator.writeStartArray();
105+
106+
return new JsonWriter(outputStream, codecContext);
58107
}
59108

60109
@Override
61-
public void complete(final OutputStream outputStream) throws IOException {
62-
generator.writeEndArray();
63-
generator.writeEndObject();
64-
generator.close();
65-
outputStream.flush();
66-
outputStream.close();
110+
public void start(final OutputStream outputStream, final Event event, final OutputCodecContext codecContext) throws IOException {
111+
Objects.requireNonNull(outputStream);
112+
Objects.requireNonNull(codecContext);
113+
deprecatedSupportWriter = new JsonWriter(outputStream, codecContext);
67114
}
68115

69116
@Override
70-
public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
71-
Objects.requireNonNull(event);
72-
Map<String, Object> dataMap = getDataMapToSerialize(event);
73-
objectMapper.writeValue(generator, dataMap);
74-
generator.flush();
117+
public void complete(final OutputStream outputStream) throws IOException {
118+
deprecatedSupportWriter.complete();
75119
}
76120

77-
private Map<String, Object> getDataMapToSerialize(Event event) throws JsonProcessingException {
78-
final Event modifiedEvent;
79-
if (codecContext.getTagsTargetKey() != null) {
80-
modifiedEvent = addTagsToEvent(event, codecContext.getTagsTargetKey());
81-
} else {
82-
modifiedEvent = event;
83-
}
84-
Map<String, Object> dataMap = modifiedEvent.toMap();
85-
86-
if ((codecContext.getIncludeKeys() != null && !codecContext.getIncludeKeys().isEmpty()) ||
87-
(codecContext.getExcludeKeys() != null && !codecContext.getExcludeKeys().isEmpty())) {
88-
89-
Map<String, Object> finalDataMap = dataMap;
90-
dataMap = dataMap.keySet()
91-
.stream()
92-
.filter(codecContext::shouldIncludeKey)
93-
.collect(Collectors.toMap(Function.identity(), finalDataMap::get));
94-
}
95-
return dataMap;
121+
@Override
122+
public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
123+
deprecatedSupportWriter.writeEvent(event);
96124
}
97125
}
98126

0 commit comments

Comments
 (0)