Skip to content

Commit e5b3dfe

Browse files
committed
Initial commit to refactor the OutputCodec to support a Writer that is bound to a specific OutputStream. This supports backward compatibility with the existing APIs and an update to the JsonOutputCodec to start the migration.
Signed-off-by: David Venable <dlv@amazon.com>
1 parent d24c3fa commit e5b3dfe

3 files changed

Lines changed: 210 additions & 39 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,27 @@ public interface OutputCodec {
2222

2323
static final ObjectMapper objectMapper = new ObjectMapper();
2424

25+
interface Writer {
26+
void writeEvent(Event event) throws IOException;
27+
void complete() throws IOException;
28+
}
29+
30+
default Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) throws IOException {
31+
final OutputCodec codec = this;
32+
codec.start(outputStream, sampleEvent, codecContext);
33+
return new Writer() {
34+
@Override
35+
public void writeEvent(final Event event) throws IOException {
36+
codec.writeEvent(event, outputStream);
37+
}
38+
39+
@Override
40+
public void complete() throws IOException {
41+
codec.complete(outputStream);
42+
}
43+
};
44+
}
45+
2546
/**
2647
* this method get called from {@link Sink} to do initial wrapping in {@link OutputStream}
2748
* Implementors should do initial wrapping according to the implementation
@@ -30,7 +51,9 @@ public interface OutputCodec {
3051
* @param event Event to auto-generate schema
3152
* @param context Extra Context used in Codec.
3253
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
54+
* @deprecated Use {@link OutputCodec#createWriter(OutputStream, Event, OutputCodecContext)} instead.
3355
*/
56+
@Deprecated
3457
void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException;
3558

3659
/**
@@ -40,7 +63,9 @@ public interface OutputCodec {
4063
* @param event event Record event
4164
* @param outputStream outputStream param to hold the event data
4265
* @throws IOException throws IOException when not able to write data to {@link OutputStream}
66+
* @deprecated @deprecated Use {@link OutputCodec.Writer#writeEvent(Event)} instead.
4367
*/
68+
@Deprecated
4469
void writeEvent(Event event, OutputStream outputStream) throws IOException;
4570

4671
/**
@@ -49,7 +74,9 @@ public interface OutputCodec {
4974
*
5075
* @param outputStream outputStream param for wrapping
5176
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
77+
* @deprecated @deprecated Use {@link Writer#complete()} instead.
5278
*/
79+
@Deprecated
5380
void complete(OutputStream outputStream) throws IOException;
5481

5582
/**

data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java

Lines changed: 68 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,69 +30,98 @@
3030
public class JsonOutputCodec implements OutputCodec {
3131
private final ObjectMapper objectMapper = new ObjectMapper();
3232
private static final String JSON = "json";
33-
private static final JsonFactory factory = new JsonFactory();
33+
private static final JsonFactory JSON_FACTORY = new JsonFactory();
3434
private final JsonOutputCodecConfig config;
35-
private JsonGenerator generator;
36-
private OutputCodecContext codecContext;
35+
private JsonWriter deprecatedSupportWriter;
3736

3837
@DataPrepperPluginConstructor
3938
public JsonOutputCodec(final JsonOutputCodecConfig config) {
4039
Objects.requireNonNull(config);
4140
this.config = config;
4241
}
4342

43+
private class JsonWriter implements Writer {
44+
private final JsonGenerator generator;
45+
private final OutputStream outputStream;
46+
private final OutputCodecContext codecContext;
47+
48+
private JsonWriter(final OutputStream outputStream, final OutputCodecContext codecContext) throws IOException {
49+
this.outputStream = outputStream;
50+
this.codecContext = codecContext;
51+
generator = JSON_FACTORY.createGenerator(outputStream, JsonEncoding.UTF8);
52+
generator.writeStartObject();
53+
generator.writeFieldName(config.getKeyName());
54+
generator.writeStartArray();
55+
}
56+
57+
@Override
58+
public void writeEvent(final Event event) throws IOException {
59+
Objects.requireNonNull(event);
60+
final Map<String, Object> dataMap = getDataMapToSerialize(event);
61+
objectMapper.writeValue(generator, dataMap);
62+
generator.flush();
63+
}
64+
65+
@Override
66+
public void complete() throws IOException {
67+
generator.writeEndArray();
68+
generator.writeEndObject();
69+
generator.close();
70+
outputStream.flush();
71+
outputStream.close();
72+
}
73+
74+
75+
private Map<String, Object> getDataMapToSerialize(final Event event) throws JsonProcessingException {
76+
final Event modifiedEvent;
77+
if (codecContext.getTagsTargetKey() != null) {
78+
modifiedEvent = addTagsToEvent(event, codecContext.getTagsTargetKey());
79+
} else {
80+
modifiedEvent = event;
81+
}
82+
Map<String, Object> dataMap = modifiedEvent.toMap();
83+
84+
if ((codecContext.getIncludeKeys() != null && !codecContext.getIncludeKeys().isEmpty()) ||
85+
(codecContext.getExcludeKeys() != null && !codecContext.getExcludeKeys().isEmpty())) {
86+
87+
final Map<String, Object> finalDataMap = dataMap;
88+
dataMap = dataMap.keySet()
89+
.stream()
90+
.filter(codecContext::shouldIncludeKey)
91+
.collect(Collectors.toMap(Function.identity(), finalDataMap::get));
92+
}
93+
return dataMap;
94+
}
95+
}
96+
4497
@Override
4598
public String getExtension() {
4699
return JSON;
47100
}
48101

49102
@Override
50-
public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException {
103+
public Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) throws IOException {
51104
Objects.requireNonNull(outputStream);
52105
Objects.requireNonNull(codecContext);
53-
this.codecContext = codecContext;
54-
generator = factory.createGenerator(outputStream, JsonEncoding.UTF8);
55-
generator.writeStartObject();
56-
generator.writeFieldName(config.getKeyName());
57-
generator.writeStartArray();
106+
107+
return new JsonWriter(outputStream, codecContext);
58108
}
59109

60110
@Override
61-
public void complete(final OutputStream outputStream) throws IOException {
62-
generator.writeEndArray();
63-
generator.writeEndObject();
64-
generator.close();
65-
outputStream.flush();
66-
outputStream.close();
111+
public void start(final OutputStream outputStream, final Event event, final OutputCodecContext codecContext) throws IOException {
112+
Objects.requireNonNull(outputStream);
113+
Objects.requireNonNull(codecContext);
114+
deprecatedSupportWriter = new JsonWriter(outputStream, codecContext);
67115
}
68116

69117
@Override
70-
public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
71-
Objects.requireNonNull(event);
72-
Map<String, Object> dataMap = getDataMapToSerialize(event);
73-
objectMapper.writeValue(generator, dataMap);
74-
generator.flush();
118+
public void complete(final OutputStream outputStream) throws IOException {
119+
deprecatedSupportWriter.complete();
75120
}
76121

77-
private Map<String, Object> getDataMapToSerialize(Event event) throws JsonProcessingException {
78-
final Event modifiedEvent;
79-
if (codecContext.getTagsTargetKey() != null) {
80-
modifiedEvent = addTagsToEvent(event, codecContext.getTagsTargetKey());
81-
} else {
82-
modifiedEvent = event;
83-
}
84-
Map<String, Object> dataMap = modifiedEvent.toMap();
85-
86-
if ((codecContext.getIncludeKeys() != null && !codecContext.getIncludeKeys().isEmpty()) ||
87-
(codecContext.getExcludeKeys() != null && !codecContext.getExcludeKeys().isEmpty())) {
88-
89-
Map<String, Object> finalDataMap = dataMap;
90-
dataMap = dataMap.keySet()
91-
.stream()
92-
.filter(codecContext::shouldIncludeKey)
93-
.collect(Collectors.toMap(Function.identity(), finalDataMap::get));
94-
}
95-
return dataMap;
122+
@Override
123+
public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
124+
deprecatedSupportWriter.writeEvent(event);
96125
}
97126
}
98127

data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.junit.jupiter.api.Test;
1212
import org.junit.jupiter.params.ParameterizedTest;
1313
import org.junit.jupiter.params.provider.ValueSource;
14+
import org.opensearch.dataprepper.model.codec.OutputCodec;
1415
import org.opensearch.dataprepper.model.event.Event;
1516
import org.opensearch.dataprepper.model.log.JacksonLog;
1617
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
@@ -148,6 +149,120 @@ void writeEvent_with_exclude_keys(final int numberOfRecords) throws IOException
148149
assertThat(index, equalTo(numberOfRecords));
149150
}
150151

152+
153+
@ParameterizedTest
154+
@ValueSource(ints = {1, 2, 10, 100})
155+
void writer_happy_case(final int numberOfRecords) throws IOException {
156+
JsonOutputCodec jsonOutputCodec = createObjectUnderTest();
157+
outputStream = new ByteArrayOutputStream();
158+
OutputCodecContext codecContext = new OutputCodecContext();
159+
OutputCodec.Writer objectUnderTest = jsonOutputCodec.createWriter(outputStream, null, codecContext);
160+
161+
final List<Map<String, Object>> expectedData = generateRecords(numberOfRecords);
162+
for (int index = 0; index < numberOfRecords; index++) {
163+
final Event event = convertToEvent(expectedData.get(index));
164+
objectUnderTest.writeEvent(event);
165+
}
166+
objectUnderTest.complete();
167+
168+
int index = 0;
169+
ObjectMapper mapper = new ObjectMapper();
170+
JsonNode jsonNode = mapper.readTree(outputStream.toByteArray());
171+
assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.OBJECT));
172+
Map.Entry<String, JsonNode> nextField = jsonNode.fields().next();
173+
assertThat(nextField, notNullValue());
174+
assertThat(nextField.getKey(), equalTo(JsonOutputCodecConfig.DEFAULT_KEY_NAME));
175+
jsonNode = nextField.getValue();
176+
assertThat(jsonNode, notNullValue());
177+
assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.ARRAY));
178+
for (JsonNode actualElement : jsonNode) {
179+
Map<String, Object> expectedMap = expectedData.get(index);
180+
Set<String> keys = expectedMap.keySet();
181+
Map<String, Object> actualMap = new HashMap<>();
182+
for (String key : keys) {
183+
actualMap.put(key, getValue(actualElement.get(key)));
184+
}
185+
assertThat(actualMap, equalTo(expectedMap));
186+
index++;
187+
}
188+
189+
assertThat(index, equalTo(numberOfRecords));
190+
}
191+
192+
@ParameterizedTest
193+
@ValueSource(ints = {1, 2, 10, 100})
194+
void writer_writeEvent_with_include_keys(final int numberOfRecords) throws IOException {
195+
JsonOutputCodec jsonOutputCodec = createObjectUnderTest();
196+
outputStream = new ByteArrayOutputStream();
197+
OutputCodecContext codecContext = new OutputCodecContext(null, List.of("name"), null);
198+
OutputCodec.Writer objectUnderTest = jsonOutputCodec.createWriter(outputStream, null, codecContext);
199+
200+
final List<Map<String, Object>> expectedData = generateRecords(numberOfRecords);
201+
for (int index = 0; index < numberOfRecords; index++) {
202+
final Event event = convertToEvent(expectedData.get(index));
203+
objectUnderTest.writeEvent(event);
204+
}
205+
objectUnderTest.complete();
206+
207+
int index = 0;
208+
ObjectMapper mapper = new ObjectMapper();
209+
JsonNode jsonNode = mapper.readTree(outputStream.toByteArray());
210+
assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.OBJECT));
211+
Map.Entry<String, JsonNode> nextField = jsonNode.fields().next();
212+
assertThat(nextField, notNullValue());
213+
assertThat(nextField.getKey(), equalTo(JsonOutputCodecConfig.DEFAULT_KEY_NAME));
214+
jsonNode = nextField.getValue();
215+
assertThat(jsonNode, notNullValue());
216+
assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.ARRAY));
217+
for (JsonNode actualElement : jsonNode) {
218+
Map<String, Object> expectedMap = expectedData.get(index);
219+
assertThat(actualElement.has("age"), equalTo(false));
220+
assertThat(actualElement.has("name"), equalTo(true));
221+
assertThat(actualElement.get("name").getNodeType(), equalTo(JsonNodeType.STRING));
222+
assertThat(actualElement.get("name").asText(), equalTo(expectedMap.get("name")));
223+
index++;
224+
}
225+
226+
assertThat(index, equalTo(numberOfRecords));
227+
}
228+
229+
@ParameterizedTest
230+
@ValueSource(ints = {1, 2, 10, 100})
231+
void writer_writeEvent_with_exclude_keys(final int numberOfRecords) throws IOException {
232+
JsonOutputCodec jsonOutputCodec = createObjectUnderTest();
233+
outputStream = new ByteArrayOutputStream();
234+
OutputCodecContext codecContext = new OutputCodecContext(null, null, List.of("age"));
235+
OutputCodec.Writer objectUnderTest = jsonOutputCodec.createWriter(outputStream, null, codecContext);
236+
237+
final List<Map<String, Object>> expectedData = generateRecords(numberOfRecords);
238+
for (int index = 0; index < numberOfRecords; index++) {
239+
final Event event = convertToEvent(expectedData.get(index));
240+
objectUnderTest.writeEvent(event);
241+
}
242+
objectUnderTest.complete();
243+
244+
int index = 0;
245+
ObjectMapper mapper = new ObjectMapper();
246+
JsonNode jsonNode = mapper.readTree(outputStream.toByteArray());
247+
assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.OBJECT));
248+
Map.Entry<String, JsonNode> nextField = jsonNode.fields().next();
249+
assertThat(nextField, notNullValue());
250+
assertThat(nextField.getKey(), equalTo(JsonOutputCodecConfig.DEFAULT_KEY_NAME));
251+
jsonNode = nextField.getValue();
252+
assertThat(jsonNode, notNullValue());
253+
assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.ARRAY));
254+
for (JsonNode actualElement : jsonNode) {
255+
Map<String, Object> expectedMap = expectedData.get(index);
256+
assertThat(actualElement.has("age"), equalTo(false));
257+
assertThat(actualElement.has("name"), equalTo(true));
258+
assertThat(actualElement.get("name").getNodeType(), equalTo(JsonNodeType.STRING));
259+
assertThat(actualElement.get("name").asText(), equalTo(expectedMap.get("name")));
260+
index++;
261+
}
262+
263+
assertThat(index, equalTo(numberOfRecords));
264+
}
265+
151266
@Test
152267
void testGetEstimatedSize() throws Exception {
153268
int numberOfRecords = 1;

0 commit comments

Comments
 (0)