Skip to content

Commit 00d41cb

Browse files
Added support of newline in output codec (#6423)
* Added support of newline in the output codec Signed-off-by: Subrahmanyam-Gollapalli <subrahmanyam.gollapalli@freshworks.com> * config to write empty events Signed-off-by: Subrahmanyam-Gollapalli <subrahmanyam.gollapalli@freshworks.com> * updated licence header Signed-off-by: Subrahmanyam-Gollapalli <subrahmanyam.gollapalli@freshworks.com> --------- Signed-off-by: Subrahmanyam-Gollapalli <subrahmanyam.gollapalli@freshworks.com>
1 parent 22177f0 commit 00d41cb

3 files changed

Lines changed: 337 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
package org.opensearch.dataprepper.plugins.codec.newline;
11+
12+
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
13+
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
14+
import org.opensearch.dataprepper.model.codec.OutputCodec;
15+
import org.opensearch.dataprepper.model.event.Event;
16+
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
17+
18+
import java.io.IOException;
19+
import java.io.OutputStream;
20+
import java.util.Objects;
21+
22+
/**
23+
* An implementation of {@link OutputCodec} which writes the "message" field
24+
* from Data Prepper events as plain text, one message per line.
25+
* This matches Logstash's line format behavior.
26+
*/
27+
@DataPrepperPlugin(name = "newline", pluginType = OutputCodec.class, pluginConfigurationType = NewlineDelimitedOutputConfig.class)
28+
public class NewlineDelimitedOutputCodec implements OutputCodec {
29+
private static final String NEWLINE = "txt";
30+
private static final String MESSAGE_FIELD = "message";
31+
@SuppressWarnings("unused")
32+
private OutputCodecContext deprecatedSupportCodecContext;
33+
private final boolean includeEmptyObjects;
34+
35+
@DataPrepperPluginConstructor
36+
public NewlineDelimitedOutputCodec(final NewlineDelimitedOutputConfig config) {
37+
Objects.requireNonNull(config);
38+
this.includeEmptyObjects = config.isIncludeEmptyObjects();
39+
}
40+
41+
private static class NewlineWriter implements Writer {
42+
private final OutputStream outputStream;
43+
private final OutputCodecContext codecContext;
44+
private final boolean includeEmptyObjects;
45+
46+
private NewlineWriter(final OutputStream outputStream, final OutputCodecContext codecContext, final boolean includeEmptyObjects) {
47+
this.outputStream = outputStream;
48+
this.codecContext = codecContext;
49+
this.includeEmptyObjects = includeEmptyObjects;
50+
}
51+
52+
@Override
53+
public void writeEvent(final Event event) throws IOException {
54+
doWriteEvent(outputStream, event, codecContext, includeEmptyObjects);
55+
}
56+
57+
@Override
58+
public void complete() throws IOException {
59+
outputStream.close();
60+
}
61+
}
62+
63+
@Override
64+
public Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) throws IOException {
65+
Objects.requireNonNull(outputStream);
66+
Objects.requireNonNull(codecContext);
67+
68+
return new NewlineWriter(outputStream, codecContext, includeEmptyObjects);
69+
}
70+
71+
@Override
72+
public void start(final OutputStream outputStream, Event event, final OutputCodecContext codecContext) throws IOException {
73+
Objects.requireNonNull(outputStream);
74+
Objects.requireNonNull(codecContext);
75+
this.deprecatedSupportCodecContext = codecContext;
76+
}
77+
78+
@Override
79+
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
80+
doWriteEvent(outputStream, event, deprecatedSupportCodecContext, includeEmptyObjects);
81+
}
82+
83+
@Override
84+
public void complete(final OutputStream outputStream) throws IOException {
85+
outputStream.close();
86+
}
87+
88+
@Override
89+
public String getExtension() {
90+
return NEWLINE;
91+
}
92+
93+
private static void doWriteEvent(final OutputStream outputStream, final Event event, final OutputCodecContext codecContext, final boolean includeEmptyObjects) throws IOException {
94+
Objects.requireNonNull(event);
95+
96+
// Extract the message field and write it as plain text
97+
String message = null;
98+
if (event.containsKey(MESSAGE_FIELD)) {
99+
Object messageObj = event.get(MESSAGE_FIELD, Object.class);
100+
if (messageObj != null) {
101+
message = messageObj.toString();
102+
}
103+
}
104+
105+
// Default: do not write anything if message is null or empty
106+
if (message == null || message.isEmpty()) {
107+
if (!includeEmptyObjects) {
108+
return;
109+
}
110+
message = "";
111+
}
112+
113+
// Write the message as plain text followed by a newline
114+
outputStream.write(message.getBytes());
115+
outputStream.write(System.lineSeparator().getBytes());
116+
}
117+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.codec.newline;
12+
13+
import com.fasterxml.jackson.annotation.JsonProperty;
14+
15+
/**
16+
* Configuration class for the newline delimited output codec.
17+
*/
18+
public class NewlineDelimitedOutputConfig {
19+
20+
/**
21+
* When false (default), events with no message or an empty message are not written.
22+
* When true, empty messages are written as blank lines, similar to ndjson input codec.
23+
*/
24+
@JsonProperty("include_empty_objects")
25+
private boolean includeEmptyObjects = false;
26+
27+
public boolean isIncludeEmptyObjects() {
28+
return includeEmptyObjects;
29+
}
30+
31+
public void setIncludeEmptyObjects(final boolean includeEmptyObjects) {
32+
this.includeEmptyObjects = includeEmptyObjects;
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.codec.newline;
12+
13+
import org.junit.jupiter.api.BeforeEach;
14+
import org.junit.jupiter.api.Test;
15+
import org.opensearch.dataprepper.event.TestEventFactory;
16+
import org.opensearch.dataprepper.model.codec.OutputCodec;
17+
import org.opensearch.dataprepper.model.event.Event;
18+
import org.opensearch.dataprepper.model.event.EventBuilder;
19+
import org.opensearch.dataprepper.model.event.EventFactory;
20+
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
21+
22+
import java.io.ByteArrayOutputStream;
23+
import java.io.IOException;
24+
import java.nio.charset.StandardCharsets;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.UUID;
28+
29+
import static org.hamcrest.CoreMatchers.equalTo;
30+
import static org.hamcrest.MatcherAssert.assertThat;
31+
import static org.junit.jupiter.api.Assertions.assertThrows;
32+
33+
class NewlineDelimitedOutputCodecTest {
34+
35+
private NewlineDelimitedOutputCodec codec;
36+
private NewlineDelimitedOutputConfig config;
37+
private EventFactory eventFactory;
38+
39+
@BeforeEach
40+
void setUp() {
41+
config = new NewlineDelimitedOutputConfig();
42+
codec = new NewlineDelimitedOutputCodec(config);
43+
eventFactory = TestEventFactory.getTestEventFactory();
44+
}
45+
46+
@Test
47+
void constructor_throws_if_config_is_null() {
48+
assertThrows(NullPointerException.class, () -> new NewlineDelimitedOutputCodec(null));
49+
}
50+
51+
@Test
52+
void writeEvent_writes_message_field_as_plain_text() throws IOException {
53+
final String message = UUID.randomUUID().toString();
54+
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
55+
final OutputCodecContext codecContext = new OutputCodecContext();
56+
57+
final Map<String, Object> eventData = new HashMap<>();
58+
eventData.put("message", message);
59+
final Event event = eventFactory.eventBuilder(EventBuilder.class).withData(eventData).build();
60+
61+
codec.start(outputStream, event, codecContext);
62+
codec.writeEvent(event, outputStream);
63+
codec.complete(outputStream);
64+
65+
final String output = outputStream.toString(StandardCharsets.UTF_8);
66+
assertThat(output, equalTo(message + System.lineSeparator()));
67+
}
68+
69+
@Test
70+
void writeEvent_writes_nothing_when_message_is_missing_and_include_empty_objects_false() throws IOException {
71+
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
72+
final OutputCodecContext codecContext = new OutputCodecContext();
73+
74+
final Map<String, Object> eventData = new HashMap<>();
75+
eventData.put("other_field", "some value");
76+
final Event event = eventFactory.eventBuilder(EventBuilder.class).withData(eventData).build();
77+
78+
codec.start(outputStream, event, codecContext);
79+
codec.writeEvent(event, outputStream);
80+
codec.complete(outputStream);
81+
82+
final String output = outputStream.toString(StandardCharsets.UTF_8);
83+
assertThat(output, equalTo(""));
84+
}
85+
86+
@Test
87+
void writeEvent_writes_nothing_when_message_is_null_and_include_empty_objects_false() throws IOException {
88+
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
89+
final OutputCodecContext codecContext = new OutputCodecContext();
90+
91+
final Map<String, Object> eventData = new HashMap<>();
92+
eventData.put("message", null);
93+
final Event event = eventFactory.eventBuilder(EventBuilder.class).withData(eventData).build();
94+
95+
codec.start(outputStream, event, codecContext);
96+
codec.writeEvent(event, outputStream);
97+
codec.complete(outputStream);
98+
99+
final String output = outputStream.toString(StandardCharsets.UTF_8);
100+
assertThat(output, equalTo(""));
101+
}
102+
103+
@Test
104+
void writeEvent_writes_empty_line_when_message_is_missing_and_include_empty_objects_true() throws IOException {
105+
config.setIncludeEmptyObjects(true);
106+
codec = new NewlineDelimitedOutputCodec(config);
107+
108+
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
109+
final OutputCodecContext codecContext = new OutputCodecContext();
110+
111+
final Map<String, Object> eventData = new HashMap<>();
112+
eventData.put("other_field", "some value");
113+
final Event event = eventFactory.eventBuilder(EventBuilder.class).withData(eventData).build();
114+
115+
codec.start(outputStream, event, codecContext);
116+
codec.writeEvent(event, outputStream);
117+
codec.complete(outputStream);
118+
119+
final String output = outputStream.toString(StandardCharsets.UTF_8);
120+
assertThat(output, equalTo("" + System.lineSeparator()));
121+
}
122+
123+
@Test
124+
void writeEvent_writes_empty_line_when_message_is_null_and_include_empty_objects_true() throws IOException {
125+
config.setIncludeEmptyObjects(true);
126+
codec = new NewlineDelimitedOutputCodec(config);
127+
128+
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
129+
final OutputCodecContext codecContext = new OutputCodecContext();
130+
131+
final Map<String, Object> eventData = new HashMap<>();
132+
eventData.put("message", null);
133+
final Event event = eventFactory.eventBuilder(EventBuilder.class).withData(eventData).build();
134+
135+
codec.start(outputStream, event, codecContext);
136+
codec.writeEvent(event, outputStream);
137+
codec.complete(outputStream);
138+
139+
final String output = outputStream.toString(StandardCharsets.UTF_8);
140+
assertThat(output, equalTo("" + System.lineSeparator()));
141+
}
142+
143+
@Test
144+
void writeEvent_writes_multiple_events_on_separate_lines() throws IOException {
145+
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
146+
final OutputCodecContext codecContext = new OutputCodecContext();
147+
148+
final Map<String, Object> eventData1 = new HashMap<>();
149+
eventData1.put("message", "First message");
150+
final Event event1 = eventFactory.eventBuilder(EventBuilder.class).withData(eventData1).build();
151+
152+
final Map<String, Object> eventData2 = new HashMap<>();
153+
eventData2.put("message", "Second message");
154+
final Event event2 = eventFactory.eventBuilder(EventBuilder.class).withData(eventData2).build();
155+
156+
codec.start(outputStream, event1, codecContext);
157+
codec.writeEvent(event1, outputStream);
158+
codec.writeEvent(event2, outputStream);
159+
codec.complete(outputStream);
160+
161+
final String output = outputStream.toString(StandardCharsets.UTF_8);
162+
assertThat(output, equalTo("First message" + System.lineSeparator() + "Second message" + System.lineSeparator()));
163+
}
164+
165+
@Test
166+
void getExtension_returns_txt() {
167+
assertThat(codec.getExtension(), equalTo("txt"));
168+
}
169+
170+
@Test
171+
void createWriter_writes_message_field_as_plain_text() throws IOException {
172+
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
173+
final OutputCodecContext codecContext = new OutputCodecContext();
174+
175+
final Map<String, Object> eventData = new HashMap<>();
176+
eventData.put("message", "Test message content");
177+
final Event event = eventFactory.eventBuilder(EventBuilder.class).withData(eventData).build();
178+
179+
final OutputCodec.Writer writer = codec.createWriter(outputStream, event, codecContext);
180+
writer.writeEvent(event);
181+
writer.complete();
182+
183+
final String output = outputStream.toString(StandardCharsets.UTF_8);
184+
assertThat(output, equalTo("Test message content" + System.lineSeparator()));
185+
}
186+
}

0 commit comments

Comments
 (0)