Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,61 @@ public interface OutputCodec {

static final ObjectMapper objectMapper = new ObjectMapper();

/**
* A writer specific to a single buffer.
*
* @since 2.12
*/
interface Writer {
/**
* Writes a single event to the {@link OutputStream}.
*
* @param event A Data Prepper {@link Event}
* @throws IOException An IO exception writing to the stream
*
* @since 2.12
*/
void writeEvent(Event event) throws IOException;

/**
* Completes a writer.
*
* @throws IOException An IO exception completing the stream
*
* @since 2.12
*/
void complete() throws IOException;
}

/**
* Creates a new {@link Writer} for a given {@link OutputStream}.
* Typically, you create one per buffer.
*
* @param outputStream The {@link OutputStream} to write to
* @param sampleEvent A sample Data Prepper {@link Event}.
* It is not written to the stream, but may be used for metadata.
* @param codecContext The {@link OutputCodecContext}
* @return A {@link Writer} to use for this buffer.
* @throws IOException An IO exception occurs initializing the writer or stream
*
* @since 2.12
*/
default Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) throws IOException {
final OutputCodec codec = this;
codec.start(outputStream, sampleEvent, codecContext);
return new Writer() {
@Override
public void writeEvent(final Event event) throws IOException {
codec.writeEvent(event, outputStream);
}

@Override
public void complete() throws IOException {
codec.complete(outputStream);
}
};
}

/**
* this method get called from {@link Sink} to do initial wrapping in {@link OutputStream}
* Implementors should do initial wrapping according to the implementation
Expand All @@ -30,7 +85,9 @@ public interface OutputCodec {
* @param event Event to auto-generate schema
* @param context Extra Context used in Codec.
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
* @deprecated Use {@link OutputCodec#createWriter(OutputStream, Event, OutputCodecContext)} instead.
*/
@Deprecated
void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException;

/**
Expand All @@ -40,7 +97,9 @@ public interface OutputCodec {
* @param event event Record event
* @param outputStream outputStream param to hold the event data
* @throws IOException throws IOException when not able to write data to {@link OutputStream}
* @deprecated @deprecated Use {@link OutputCodec.Writer#writeEvent(Event)} instead.
*/
@Deprecated
void writeEvent(Event event, OutputStream outputStream) throws IOException;

/**
Expand All @@ -49,7 +108,9 @@ public interface OutputCodec {
*
* @param outputStream outputStream param for wrapping
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
* @deprecated @deprecated Use {@link Writer#complete()} instead.
*/
@Deprecated
void complete(OutputStream outputStream) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.codec;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
Expand All @@ -20,11 +29,16 @@
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

@ExtendWith(MockitoExtension.class)
public class OutputCodecTest {
@Test
void isCompressionInternal_returns_false() {
Expand Down Expand Up @@ -85,4 +99,61 @@ private static Map<String, Object> generateJson() {
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
return jsonObject;
}

@Nested
class DefaultWriter {
@Mock
private OutputStream outputStream;
@Mock
private Event event;
@Mock
private OutputCodecContext outputCodecContext;

@Test
void createWriter_returns_new_instance() throws IOException {
final OutputCodec objectUnderTest = mock(OutputCodec.class);

doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext);

assertThat(objectUnderTest.createWriter(outputStream, event, outputCodecContext),
not(sameInstance(objectUnderTest.createWriter(outputStream, event, outputCodecContext))));
}

@Test
void createWriter_calls_start() throws IOException {
final OutputCodec objectUnderTest = mock(OutputCodec.class);

doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext);

objectUnderTest.createWriter(outputStream, event, outputCodecContext);

verify(objectUnderTest).start(outputStream, event, outputCodecContext);
}

@Test
void writer_writeEvent_calls_writeEvent_on_OutputCodec() throws IOException {
final OutputCodec objectUnderTest = mock(OutputCodec.class);

doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext);

OutputCodec.Writer writer = objectUnderTest.createWriter(outputStream, event, outputCodecContext);

writer.writeEvent(event);

verify(objectUnderTest).writeEvent(event, outputStream);
}

@Test
void writer_complete_calls_complete_on_OutputCodec() throws IOException {
final OutputCodec objectUnderTest = mock(OutputCodec.class);

doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext);

OutputCodec.Writer writer = objectUnderTest.createWriter(outputStream, event, outputCodecContext);

writer.complete();

verify(objectUnderTest).complete(outputStream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,69 +30,97 @@
public class JsonOutputCodec implements OutputCodec {
private final ObjectMapper objectMapper = new ObjectMapper();
private static final String JSON = "json";
private static final JsonFactory factory = new JsonFactory();
private static final JsonFactory JSON_FACTORY = new JsonFactory();
private final JsonOutputCodecConfig config;
private JsonGenerator generator;
private OutputCodecContext codecContext;
private JsonWriter deprecatedSupportWriter;

@DataPrepperPluginConstructor
public JsonOutputCodec(final JsonOutputCodecConfig config) {
Objects.requireNonNull(config);
this.config = config;
}

private class JsonWriter implements Writer {
private final JsonGenerator generator;
private final OutputStream outputStream;
private final OutputCodecContext codecContext;

private JsonWriter(final OutputStream outputStream, final OutputCodecContext codecContext) throws IOException {
this.outputStream = outputStream;
this.codecContext = codecContext;
generator = JSON_FACTORY.createGenerator(outputStream, JsonEncoding.UTF8);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getEstimatedSize() is called millions of times. Is it a good idea to create a new generator each time?

generator.writeStartObject();
generator.writeFieldName(config.getKeyName());
generator.writeStartArray();
}

@Override
public void writeEvent(final Event event) throws IOException {
Objects.requireNonNull(event);
final Map<String, Object> dataMap = getDataMapToSerialize(event);
objectMapper.writeValue(generator, dataMap);
generator.flush();
}

@Override
public void complete() throws IOException {
generator.writeEndArray();
generator.writeEndObject();
generator.close();
outputStream.flush();
outputStream.close();
}

private Map<String, Object> getDataMapToSerialize(final Event event) throws JsonProcessingException {
final Event modifiedEvent;
if (codecContext.getTagsTargetKey() != null) {
modifiedEvent = addTagsToEvent(event, codecContext.getTagsTargetKey());
} else {
modifiedEvent = event;
}
Map<String, Object> dataMap = modifiedEvent.toMap();

if ((codecContext.getIncludeKeys() != null && !codecContext.getIncludeKeys().isEmpty()) ||
(codecContext.getExcludeKeys() != null && !codecContext.getExcludeKeys().isEmpty())) {

final Map<String, Object> finalDataMap = dataMap;
dataMap = dataMap.keySet()
.stream()
.filter(codecContext::shouldIncludeKey)
.collect(Collectors.toMap(Function.identity(), finalDataMap::get));
}
return dataMap;
}
}

@Override
public String getExtension() {
return JSON;
}

@Override
public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException {
public Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) throws IOException {
Objects.requireNonNull(outputStream);
Objects.requireNonNull(codecContext);
this.codecContext = codecContext;
generator = factory.createGenerator(outputStream, JsonEncoding.UTF8);
generator.writeStartObject();
generator.writeFieldName(config.getKeyName());
generator.writeStartArray();

return new JsonWriter(outputStream, codecContext);
}

@Override
public void complete(final OutputStream outputStream) throws IOException {
generator.writeEndArray();
generator.writeEndObject();
generator.close();
outputStream.flush();
outputStream.close();
public void start(final OutputStream outputStream, final Event event, final OutputCodecContext codecContext) throws IOException {
Objects.requireNonNull(outputStream);
Objects.requireNonNull(codecContext);
deprecatedSupportWriter = new JsonWriter(outputStream, codecContext);
}

@Override
public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
Objects.requireNonNull(event);
Map<String, Object> dataMap = getDataMapToSerialize(event);
objectMapper.writeValue(generator, dataMap);
generator.flush();
public void complete(final OutputStream outputStream) throws IOException {
deprecatedSupportWriter.complete();
}

private Map<String, Object> getDataMapToSerialize(Event event) throws JsonProcessingException {
final Event modifiedEvent;
if (codecContext.getTagsTargetKey() != null) {
modifiedEvent = addTagsToEvent(event, codecContext.getTagsTargetKey());
} else {
modifiedEvent = event;
}
Map<String, Object> dataMap = modifiedEvent.toMap();

if ((codecContext.getIncludeKeys() != null && !codecContext.getIncludeKeys().isEmpty()) ||
(codecContext.getExcludeKeys() != null && !codecContext.getExcludeKeys().isEmpty())) {

Map<String, Object> finalDataMap = dataMap;
dataMap = dataMap.keySet()
.stream()
.filter(codecContext::shouldIncludeKey)
.collect(Collectors.toMap(Function.identity(), finalDataMap::get));
}
return dataMap;
@Override
public synchronized void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
deprecatedSupportWriter.writeEvent(event);
}
}

Expand Down
Loading
Loading