diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java index bf21444dbf..07e72747c8 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java @@ -22,6 +22,61 @@ public interface OutputCodec { static final ObjectMapper objectMapper = new ObjectMapper(); + /** + * A writer specific to a single buffer. + * + * @since 2.12 + */ + interface Writer { + /** + * Writes a single event to the {@link OutputStream}. + * + * @param event A Data Prepper {@link Event} + * @throws IOException An IO exception writing to the stream + * + * @since 2.12 + */ + void writeEvent(Event event) throws IOException; + + /** + * Completes a writer. + * + * @throws IOException An IO exception completing the stream + * + * @since 2.12 + */ + void complete() throws IOException; + } + + /** + * Creates a new {@link Writer} for a given {@link OutputStream}. + * Typically, you create one per buffer. + * + * @param outputStream The {@link OutputStream} to write to + * @param sampleEvent A sample Data Prepper {@link Event}. + * It is not written to the stream, but may be used for metadata. + * @param codecContext The {@link OutputCodecContext} + * @return A {@link Writer} to use for this buffer. + * @throws IOException An IO exception occurs initializing the writer or stream + * + * @since 2.12 + */ + default Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) throws IOException { + final OutputCodec codec = this; + codec.start(outputStream, sampleEvent, codecContext); + return new Writer() { + @Override + public void writeEvent(final Event event) throws IOException { + codec.writeEvent(event, outputStream); + } + + @Override + public void complete() throws IOException { + codec.complete(outputStream); + } + }; + } + /** * this method get called from {@link Sink} to do initial wrapping in {@link OutputStream} * Implementors should do initial wrapping according to the implementation @@ -30,7 +85,9 @@ public interface OutputCodec { * @param event Event to auto-generate schema * @param context Extra Context used in Codec. * @throws IOException throws IOException when invalid input is received or not able to create wrapping + * @deprecated Use {@link OutputCodec#createWriter(OutputStream, Event, OutputCodecContext)} instead. */ + @Deprecated void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException; /** @@ -40,7 +97,9 @@ public interface OutputCodec { * @param event event Record event * @param outputStream outputStream param to hold the event data * @throws IOException throws IOException when not able to write data to {@link OutputStream} + * @deprecated @deprecated Use {@link OutputCodec.Writer#writeEvent(Event)} instead. */ + @Deprecated void writeEvent(Event event, OutputStream outputStream) throws IOException; /** @@ -49,7 +108,9 @@ public interface OutputCodec { * * @param outputStream outputStream param for wrapping * @throws IOException throws IOException when invalid input is received or not able to create wrapping + * @deprecated @deprecated Use {@link Writer#complete()} instead. */ + @Deprecated void complete(OutputStream outputStream) throws IOException; /** diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java index f1038b5b6c..51386e89cf 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java @@ -1,8 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.model.codec; import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.event.DefaultEventMetadata; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; @@ -20,11 +29,16 @@ import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +@ExtendWith(MockitoExtension.class) public class OutputCodecTest { @Test void isCompressionInternal_returns_false() { @@ -85,4 +99,61 @@ private static Map generateJson() { UUID.randomUUID().toString(), UUID.randomUUID().toString())); return jsonObject; } + + @Nested + class DefaultWriter { + @Mock + private OutputStream outputStream; + @Mock + private Event event; + @Mock + private OutputCodecContext outputCodecContext; + + @Test + void createWriter_returns_new_instance() throws IOException { + final OutputCodec objectUnderTest = mock(OutputCodec.class); + + doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext); + + assertThat(objectUnderTest.createWriter(outputStream, event, outputCodecContext), + not(sameInstance(objectUnderTest.createWriter(outputStream, event, outputCodecContext)))); + } + + @Test + void createWriter_calls_start() throws IOException { + final OutputCodec objectUnderTest = mock(OutputCodec.class); + + doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext); + + objectUnderTest.createWriter(outputStream, event, outputCodecContext); + + verify(objectUnderTest).start(outputStream, event, outputCodecContext); + } + + @Test + void writer_writeEvent_calls_writeEvent_on_OutputCodec() throws IOException { + final OutputCodec objectUnderTest = mock(OutputCodec.class); + + doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext); + + OutputCodec.Writer writer = objectUnderTest.createWriter(outputStream, event, outputCodecContext); + + writer.writeEvent(event); + + verify(objectUnderTest).writeEvent(event, outputStream); + } + + @Test + void writer_complete_calls_complete_on_OutputCodec() throws IOException { + final OutputCodec objectUnderTest = mock(OutputCodec.class); + + doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext); + + OutputCodec.Writer writer = objectUnderTest.createWriter(outputStream, event, outputCodecContext); + + writer.complete(); + + verify(objectUnderTest).complete(outputStream); + } + } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java index 9e31a368d9..521b373d65 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodec.java @@ -30,10 +30,9 @@ public class JsonOutputCodec implements OutputCodec { private final ObjectMapper objectMapper = new ObjectMapper(); private static final String JSON = "json"; - private static final JsonFactory factory = new JsonFactory(); + private static final JsonFactory JSON_FACTORY = new JsonFactory(); private final JsonOutputCodecConfig config; - private JsonGenerator generator; - private OutputCodecContext codecContext; + private JsonWriter deprecatedSupportWriter; @DataPrepperPluginConstructor public JsonOutputCodec(final JsonOutputCodecConfig config) { @@ -41,58 +40,87 @@ public JsonOutputCodec(final JsonOutputCodecConfig config) { this.config = config; } + private class JsonWriter implements Writer { + private final JsonGenerator generator; + private final OutputStream outputStream; + private final OutputCodecContext codecContext; + + private JsonWriter(final OutputStream outputStream, final OutputCodecContext codecContext) throws IOException { + this.outputStream = outputStream; + this.codecContext = codecContext; + generator = JSON_FACTORY.createGenerator(outputStream, JsonEncoding.UTF8); + generator.writeStartObject(); + generator.writeFieldName(config.getKeyName()); + generator.writeStartArray(); + } + + @Override + public void writeEvent(final Event event) throws IOException { + Objects.requireNonNull(event); + final Map dataMap = getDataMapToSerialize(event); + objectMapper.writeValue(generator, dataMap); + generator.flush(); + } + + @Override + public void complete() throws IOException { + generator.writeEndArray(); + generator.writeEndObject(); + generator.close(); + outputStream.flush(); + outputStream.close(); + } + + private Map getDataMapToSerialize(final Event event) throws JsonProcessingException { + final Event modifiedEvent; + if (codecContext.getTagsTargetKey() != null) { + modifiedEvent = addTagsToEvent(event, codecContext.getTagsTargetKey()); + } else { + modifiedEvent = event; + } + Map dataMap = modifiedEvent.toMap(); + + if ((codecContext.getIncludeKeys() != null && !codecContext.getIncludeKeys().isEmpty()) || + (codecContext.getExcludeKeys() != null && !codecContext.getExcludeKeys().isEmpty())) { + + final Map finalDataMap = dataMap; + dataMap = dataMap.keySet() + .stream() + .filter(codecContext::shouldIncludeKey) + .collect(Collectors.toMap(Function.identity(), finalDataMap::get)); + } + return dataMap; + } + } + @Override public String getExtension() { return JSON; } @Override - public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException { + public Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) throws IOException { Objects.requireNonNull(outputStream); Objects.requireNonNull(codecContext); - this.codecContext = codecContext; - generator = factory.createGenerator(outputStream, JsonEncoding.UTF8); - generator.writeStartObject(); - generator.writeFieldName(config.getKeyName()); - generator.writeStartArray(); + + return new JsonWriter(outputStream, codecContext); } @Override - public void complete(final OutputStream outputStream) throws IOException { - generator.writeEndArray(); - generator.writeEndObject(); - generator.close(); - outputStream.flush(); - outputStream.close(); + public void start(final OutputStream outputStream, final Event event, final OutputCodecContext codecContext) throws IOException { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(codecContext); + deprecatedSupportWriter = new JsonWriter(outputStream, codecContext); } @Override - public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException { - Objects.requireNonNull(event); - Map dataMap = getDataMapToSerialize(event); - objectMapper.writeValue(generator, dataMap); - generator.flush(); + public void complete(final OutputStream outputStream) throws IOException { + deprecatedSupportWriter.complete(); } - private Map getDataMapToSerialize(Event event) throws JsonProcessingException { - final Event modifiedEvent; - if (codecContext.getTagsTargetKey() != null) { - modifiedEvent = addTagsToEvent(event, codecContext.getTagsTargetKey()); - } else { - modifiedEvent = event; - } - Map dataMap = modifiedEvent.toMap(); - - if ((codecContext.getIncludeKeys() != null && !codecContext.getIncludeKeys().isEmpty()) || - (codecContext.getExcludeKeys() != null && !codecContext.getExcludeKeys().isEmpty())) { - - Map finalDataMap = dataMap; - dataMap = dataMap.keySet() - .stream() - .filter(codecContext::shouldIncludeKey) - .collect(Collectors.toMap(Function.identity(), finalDataMap::get)); - } - return dataMap; + @Override + public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException { + deprecatedSupportWriter.writeEvent(event); } } diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputCodec.java index f179d1a3fe..0d92f6ef57 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputCodec.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputCodec.java @@ -4,7 +4,6 @@ */ package org.opensearch.dataprepper.plugins.codec.json; -import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.codec.OutputCodec; @@ -22,21 +21,46 @@ @DataPrepperPlugin(name = "ndjson", pluginType = OutputCodec.class, pluginConfigurationType = NdjsonOutputConfig.class) public class NdjsonOutputCodec implements OutputCodec { private static final String NDJSON = "ndjson"; - private static final ObjectMapper objectMapper = new ObjectMapper(); - private final NdjsonOutputConfig config; - private OutputCodecContext codecContext; + private OutputCodecContext deprecatedSupportCodecContext; @DataPrepperPluginConstructor public NdjsonOutputCodec(final NdjsonOutputConfig config) { Objects.requireNonNull(config); - this.config = config; + } + + private static class NdjsonWriter implements Writer { + private final OutputStream outputStream; + private final OutputCodecContext codecContext; + + private NdjsonWriter(final OutputStream outputStream, final OutputCodecContext codecContext) { + this.outputStream = outputStream; + this.codecContext = codecContext; + } + + @Override + public void writeEvent(final Event event) throws IOException { + doWriteEvent(outputStream, event, codecContext); + } + + @Override + public void complete() throws IOException { + outputStream.close(); + } + } + + @Override + public Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) { + Objects.requireNonNull(outputStream); + Objects.requireNonNull(codecContext); + + return new NdjsonWriter(outputStream, codecContext); } @Override public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException { Objects.requireNonNull(outputStream); Objects.requireNonNull(codecContext); - this.codecContext = codecContext; + this.deprecatedSupportCodecContext = codecContext; } @Override @@ -44,9 +68,9 @@ public void writeEvent(final Event event, final OutputStream outputStream) throw Objects.requireNonNull(event); String json = event.jsonBuilder() - .includeKeys(codecContext.getIncludeKeys()) - .excludeKeys(codecContext.getExcludeKeys()) - .includeTags(codecContext.getTagsTargetKey()) + .includeKeys(deprecatedSupportCodecContext.getIncludeKeys()) + .excludeKeys(deprecatedSupportCodecContext.getExcludeKeys()) + .includeTags(deprecatedSupportCodecContext.getTagsTargetKey()) .toJsonString(); outputStream.write(json.getBytes()); outputStream.write(System.lineSeparator().getBytes()); @@ -61,4 +85,16 @@ public void complete(final OutputStream outputStream) throws IOException { public String getExtension() { return NDJSON; } + + private static void doWriteEvent(final OutputStream outputStream, final Event event, final OutputCodecContext codecContext) throws IOException { + Objects.requireNonNull(event); + + String json = event.jsonBuilder() + .includeKeys(codecContext.getIncludeKeys()) + .excludeKeys(codecContext.getExcludeKeys()) + .includeTags(codecContext.getTagsTargetKey()) + .toJsonString(); + outputStream.write(json.getBytes()); + outputStream.write(System.lineSeparator().getBytes()); + } } diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java index 82d2f4130d..7189f67467 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/JsonOutputCodecTest.java @@ -11,6 +11,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.dataprepper.model.codec.OutputCodec; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.sink.OutputCodecContext; @@ -148,6 +149,120 @@ void writeEvent_with_exclude_keys(final int numberOfRecords) throws IOException assertThat(index, equalTo(numberOfRecords)); } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 100}) + void writer_happy_case(final int numberOfRecords) throws IOException { + JsonOutputCodec jsonOutputCodec = createObjectUnderTest(); + outputStream = new ByteArrayOutputStream(); + OutputCodecContext codecContext = new OutputCodecContext(); + OutputCodec.Writer objectUnderTest = jsonOutputCodec.createWriter(outputStream, null, codecContext); + + final List> expectedData = generateRecords(numberOfRecords); + for (int index = 0; index < numberOfRecords; index++) { + final Event event = convertToEvent(expectedData.get(index)); + objectUnderTest.writeEvent(event); + } + objectUnderTest.complete(); + + int index = 0; + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree(outputStream.toByteArray()); + assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.OBJECT)); + Map.Entry nextField = jsonNode.fields().next(); + assertThat(nextField, notNullValue()); + assertThat(nextField.getKey(), equalTo(JsonOutputCodecConfig.DEFAULT_KEY_NAME)); + jsonNode = nextField.getValue(); + assertThat(jsonNode, notNullValue()); + assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.ARRAY)); + for (JsonNode actualElement : jsonNode) { + Map expectedMap = expectedData.get(index); + Set keys = expectedMap.keySet(); + Map actualMap = new HashMap<>(); + for (String key : keys) { + actualMap.put(key, getValue(actualElement.get(key))); + } + assertThat(actualMap, equalTo(expectedMap)); + index++; + } + + assertThat(index, equalTo(numberOfRecords)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 100}) + void writer_writeEvent_with_include_keys(final int numberOfRecords) throws IOException { + JsonOutputCodec jsonOutputCodec = createObjectUnderTest(); + outputStream = new ByteArrayOutputStream(); + OutputCodecContext codecContext = new OutputCodecContext(null, List.of("name"), null); + OutputCodec.Writer objectUnderTest = jsonOutputCodec.createWriter(outputStream, null, codecContext); + + final List> expectedData = generateRecords(numberOfRecords); + for (int index = 0; index < numberOfRecords; index++) { + final Event event = convertToEvent(expectedData.get(index)); + objectUnderTest.writeEvent(event); + } + objectUnderTest.complete(); + + int index = 0; + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree(outputStream.toByteArray()); + assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.OBJECT)); + Map.Entry nextField = jsonNode.fields().next(); + assertThat(nextField, notNullValue()); + assertThat(nextField.getKey(), equalTo(JsonOutputCodecConfig.DEFAULT_KEY_NAME)); + jsonNode = nextField.getValue(); + assertThat(jsonNode, notNullValue()); + assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.ARRAY)); + for (JsonNode actualElement : jsonNode) { + Map expectedMap = expectedData.get(index); + assertThat(actualElement.has("age"), equalTo(false)); + assertThat(actualElement.has("name"), equalTo(true)); + assertThat(actualElement.get("name").getNodeType(), equalTo(JsonNodeType.STRING)); + assertThat(actualElement.get("name").asText(), equalTo(expectedMap.get("name"))); + index++; + } + + assertThat(index, equalTo(numberOfRecords)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 10, 100}) + void writer_writeEvent_with_exclude_keys(final int numberOfRecords) throws IOException { + JsonOutputCodec jsonOutputCodec = createObjectUnderTest(); + outputStream = new ByteArrayOutputStream(); + OutputCodecContext codecContext = new OutputCodecContext(null, null, List.of("age")); + OutputCodec.Writer objectUnderTest = jsonOutputCodec.createWriter(outputStream, null, codecContext); + + final List> expectedData = generateRecords(numberOfRecords); + for (int index = 0; index < numberOfRecords; index++) { + final Event event = convertToEvent(expectedData.get(index)); + objectUnderTest.writeEvent(event); + } + objectUnderTest.complete(); + + int index = 0; + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree(outputStream.toByteArray()); + assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.OBJECT)); + Map.Entry nextField = jsonNode.fields().next(); + assertThat(nextField, notNullValue()); + assertThat(nextField.getKey(), equalTo(JsonOutputCodecConfig.DEFAULT_KEY_NAME)); + jsonNode = nextField.getValue(); + assertThat(jsonNode, notNullValue()); + assertThat(jsonNode.getNodeType(), equalTo(JsonNodeType.ARRAY)); + for (JsonNode actualElement : jsonNode) { + Map expectedMap = expectedData.get(index); + assertThat(actualElement.has("age"), equalTo(false)); + assertThat(actualElement.has("name"), equalTo(true)); + assertThat(actualElement.get("name").getNodeType(), equalTo(JsonNodeType.STRING)); + assertThat(actualElement.get("name").asText(), equalTo(expectedMap.get("name"))); + index++; + } + + assertThat(index, equalTo(numberOfRecords)); + } + @Test void testGetEstimatedSize() throws Exception { int numberOfRecords = 1; diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputCodecTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputCodecTest.java new file mode 100644 index 0000000000..997e08d011 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonOutputCodecTest.java @@ -0,0 +1,209 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.codec.json; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.event.TestEventFactory; +import org.opensearch.dataprepper.model.codec.OutputCodec; +import org.opensearch.dataprepper.model.event.EventBuilder; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.sink.OutputCodecContext; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoInteractions; + +@ExtendWith(MockitoExtension.class) +class NdjsonOutputCodecTest { + @Mock + private NdjsonOutputConfig config; + + @Mock + private OutputCodecContext codecContext; + + private EventFactory eventFactory; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference<>() { + }; + + @BeforeEach + void setUp() { + eventFactory = TestEventFactory.getTestEventFactory(); + } + + private NdjsonOutputCodec createObjectUnderTest() { + return new NdjsonOutputCodec(config); + } + + @Test + void start_does_not_write_to_OutputStream() throws IOException { + final NdjsonOutputCodec objectUnderTest = createObjectUnderTest(); + + final OutputStream outputStream = mock(OutputStream.class); + + objectUnderTest.start(outputStream, null, codecContext); + + verifyNoInteractions(outputStream); + } + + @Test + void writer_does_not_write_to_OutputStream() throws IOException { + final NdjsonOutputCodec objectUnderTest = createObjectUnderTest(); + + final OutputStream outputStream = mock(OutputStream.class); + + objectUnderTest.createWriter(outputStream, null, codecContext); + + verifyNoInteractions(outputStream); + } + + @Test + void write_single() throws IOException { + final NdjsonOutputCodec objectUnderTest = createObjectUnderTest(); + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + final Map eventMap = generateEventMap(); + objectUnderTest.start(outputStream, null, codecContext); + objectUnderTest.writeEvent(eventFactory.eventBuilder(EventBuilder.class).withData(eventMap).build(), outputStream); + objectUnderTest.complete(outputStream); + + final Map serializedMap = OBJECT_MAPPER.readValue(outputStream.toByteArray(), Map.class); + + assertThat(serializedMap, equalTo(eventMap)); + } + + @Test + void write_single_using_writer() throws IOException { + final NdjsonOutputCodec objectUnderTest = createObjectUnderTest(); + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + final Map eventMap = generateEventMap(); + final OutputCodec.Writer writer = objectUnderTest.createWriter(outputStream, null, codecContext); + writer.writeEvent(eventFactory.eventBuilder(EventBuilder.class).withData(eventMap).build()); + writer.complete(); + + final Map serializedMap = OBJECT_MAPPER.readValue(outputStream.toByteArray(), Map.class); + + assertThat(serializedMap, equalTo(eventMap)); + } + + @ParameterizedTest + @ValueSource(ints = {2, 100}) + void write_multiple(final int numberOfEvents) throws IOException { + final NdjsonOutputCodec objectUnderTest = createObjectUnderTest(); + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + final List> eventMaps = IntStream.range(0, numberOfEvents) + .mapToObj(i -> generateEventMap()) + .collect(Collectors.toList()); + objectUnderTest.start(outputStream, null, codecContext); + + eventMaps.stream() + .map(eventMap -> eventFactory.eventBuilder(EventBuilder.class).withData(eventMap).build()) + .forEach(event -> { + try { + objectUnderTest.writeEvent(event, outputStream); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + objectUnderTest.complete(outputStream); + + final String jsonLinesCombined = new String(outputStream.toByteArray()); + + final String[] jsonLines = jsonLinesCombined.split("\n"); + + assertThat(jsonLines.length, equalTo(numberOfEvents)); + + for (int i = 0; i < numberOfEvents; i++) { + final Map eventMap = eventMaps.get(i); + final String jsonLine = jsonLines[i]; + final Map serializedMap = OBJECT_MAPPER.readValue(jsonLine, Map.class); + + assertThat(serializedMap, equalTo(eventMap)); + } + } + + @ParameterizedTest + @ValueSource(ints = {2, 100}) + void write_multiple_using_writer(final int numberOfEvents) throws IOException { + final NdjsonOutputCodec objectUnderTest = createObjectUnderTest(); + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + final List> eventMaps = IntStream.range(0, numberOfEvents) + .mapToObj(i -> generateEventMap()) + .collect(Collectors.toList()); + final OutputCodec.Writer writer = objectUnderTest.createWriter(outputStream, null, codecContext); + + eventMaps.stream() + .map(eventMap -> eventFactory.eventBuilder(EventBuilder.class).withData(eventMap).build()) + .forEach(event -> { + try { + writer.writeEvent(event); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + writer.complete(); + + final String jsonLinesCombined = new String(outputStream.toByteArray()); + + final String[] jsonLines = jsonLinesCombined.split("\n"); + + assertThat(jsonLines.length, equalTo(numberOfEvents)); + + for (int i = 0; i < numberOfEvents; i++) { + final Map eventMap = eventMaps.get(i); + final String jsonLine = jsonLines[i]; + final Map serializedMap = OBJECT_MAPPER.readValue(jsonLine, Map.class); + + assertThat(serializedMap, equalTo(eventMap)); + } + } + + + private static Map generateEventMap() { + final Map jsonObject = new LinkedHashMap<>(); + for (int i = 0; i < 1; i++) { + jsonObject.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + jsonObject.put(UUID.randomUUID().toString(), Arrays.asList(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString())); + + return jsonObject; + } + +} \ No newline at end of file