Skip to content

Commit ab3c85f

Browse files
config to write empty events
Signed-off-by: Subrahmanyam-Gollapalli <subrahmanyam.gollapalli@freshworks.com>
1 parent ae66059 commit ab3c85f

3 files changed

Lines changed: 87 additions & 29 deletions

File tree

data-prepper-plugins/newline-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodec.java

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510
package org.opensearch.dataprepper.plugins.codec.newline;
611

@@ -25,24 +30,28 @@ public class NewlineDelimitedOutputCodec implements OutputCodec {
2530
private static final String MESSAGE_FIELD = "message";
2631
@SuppressWarnings("unused")
2732
private OutputCodecContext deprecatedSupportCodecContext;
33+
private final boolean includeEmptyObjects;
2834

2935
@DataPrepperPluginConstructor
3036
public NewlineDelimitedOutputCodec(final NewlineDelimitedOutputConfig config) {
3137
Objects.requireNonNull(config);
38+
this.includeEmptyObjects = config.isIncludeEmptyObjects();
3239
}
3340

3441
private static class NewlineWriter implements Writer {
3542
private final OutputStream outputStream;
3643
private final OutputCodecContext codecContext;
44+
private final boolean includeEmptyObjects;
3745

38-
private NewlineWriter(final OutputStream outputStream, final OutputCodecContext codecContext) {
46+
private NewlineWriter(final OutputStream outputStream, final OutputCodecContext codecContext, final boolean includeEmptyObjects) {
3947
this.outputStream = outputStream;
4048
this.codecContext = codecContext;
49+
this.includeEmptyObjects = includeEmptyObjects;
4150
}
4251

4352
@Override
4453
public void writeEvent(final Event event) throws IOException {
45-
doWriteEvent(outputStream, event, codecContext);
54+
doWriteEvent(outputStream, event, codecContext, includeEmptyObjects);
4655
}
4756

4857
@Override
@@ -56,7 +65,7 @@ public Writer createWriter(final OutputStream outputStream, final Event sampleEv
5665
Objects.requireNonNull(outputStream);
5766
Objects.requireNonNull(codecContext);
5867

59-
return new NewlineWriter(outputStream, codecContext);
68+
return new NewlineWriter(outputStream, codecContext, includeEmptyObjects);
6069
}
6170

6271
@Override
@@ -68,25 +77,7 @@ public void start(final OutputStream outputStream, Event event, final OutputCode
6877

6978
@Override
7079
public void writeEvent(final Event event, final OutputStream outputStream) throws IOException {
71-
Objects.requireNonNull(event);
72-
73-
// Extract the message field and write it as plain text
74-
String message = null;
75-
if (event.containsKey(MESSAGE_FIELD)) {
76-
Object messageObj = event.get(MESSAGE_FIELD, Object.class);
77-
if (messageObj != null) {
78-
message = messageObj.toString();
79-
}
80-
}
81-
82-
// If message is null or empty, write empty string
83-
if (message == null) {
84-
message = "";
85-
}
86-
87-
// Write the message as plain text followed by a newline
88-
outputStream.write(message.getBytes());
89-
outputStream.write(System.lineSeparator().getBytes());
80+
doWriteEvent(outputStream, event, deprecatedSupportCodecContext, includeEmptyObjects);
9081
}
9182

9283
@Override
@@ -99,7 +90,7 @@ public String getExtension() {
9990
return NEWLINE;
10091
}
10192

102-
private static void doWriteEvent(final OutputStream outputStream, final Event event, final OutputCodecContext codecContext) throws IOException {
93+
private static void doWriteEvent(final OutputStream outputStream, final Event event, final OutputCodecContext codecContext, final boolean includeEmptyObjects) throws IOException {
10394
Objects.requireNonNull(event);
10495

10596
// Extract the message field and write it as plain text
@@ -111,8 +102,11 @@ private static void doWriteEvent(final OutputStream outputStream, final Event ev
111102
}
112103
}
113104

114-
// If message is null or empty, write empty string
115-
if (message == null) {
105+
// Default: do not write anything if message is null or empty
106+
if (message == null || message.isEmpty()) {
107+
if (!includeEmptyObjects) {
108+
return;
109+
}
116110
message = "";
117111
}
118112

Original file line numberDiff line numberDiff line change
@@ -1,12 +1,34 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

611
package org.opensearch.dataprepper.plugins.codec.newline;
712

13+
import com.fasterxml.jackson.annotation.JsonProperty;
14+
815
/**
916
* Configuration class for the newline delimited output codec.
1017
*/
1118
public class NewlineDelimitedOutputConfig {
19+
20+
/**
21+
* When false (default), events with no message or an empty message are not written.
22+
* When true, empty messages are written as blank lines, similar to ndjson input codec.
23+
*/
24+
@JsonProperty("include_empty_objects")
25+
private boolean includeEmptyObjects = false;
26+
27+
public boolean isIncludeEmptyObjects() {
28+
return includeEmptyObjects;
29+
}
30+
31+
public void setIncludeEmptyObjects(final boolean includeEmptyObjects) {
32+
this.includeEmptyObjects = includeEmptyObjects;
33+
}
1234
}

data-prepper-plugins/newline-codecs/src/test/java/org/opensearch/dataprepper/plugins/codec/newline/NewlineDelimitedOutputCodecTest.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.nio.charset.StandardCharsets;
2020
import java.util.HashMap;
2121
import java.util.Map;
22+
import java.util.UUID;
2223

2324
import static org.hamcrest.CoreMatchers.equalTo;
2425
import static org.hamcrest.MatcherAssert.assertThat;
@@ -44,23 +45,61 @@ void constructor_throws_if_config_is_null() {
4445

4546
@Test
4647
void writeEvent_writes_message_field_as_plain_text() throws IOException {
48+
final String message = UUID.randomUUID().toString();
4749
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
4850
final OutputCodecContext codecContext = new OutputCodecContext();
4951

5052
final Map<String, Object> eventData = new HashMap<>();
51-
eventData.put("message", "r_id=2bb2bd0aeece11f0bea286fb87d48915-ticket_update,tp=00-13a3bb055e6b589dbc0f952e0d75020a-1f2e986790c19742-01");
53+
eventData.put("message", message);
5254
final Event event = eventFactory.eventBuilder(EventBuilder.class).withData(eventData).build();
5355

5456
codec.start(outputStream, event, codecContext);
5557
codec.writeEvent(event, outputStream);
5658
codec.complete(outputStream);
5759

5860
final String output = outputStream.toString(StandardCharsets.UTF_8);
59-
assertThat(output, equalTo("r_id=2bb2bd0aeece11f0bea286fb87d48915-ticket_update,tp=00-13a3bb055e6b589dbc0f952e0d75020a-1f2e986790c19742-01" + System.lineSeparator()));
61+
assertThat(output, equalTo(message + System.lineSeparator()));
6062
}
6163

6264
@Test
63-
void writeEvent_writes_empty_string_when_message_is_missing() throws IOException {
65+
void writeEvent_writes_nothing_when_message_is_missing_and_include_empty_objects_false() throws IOException {
66+
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
67+
final OutputCodecContext codecContext = new OutputCodecContext();
68+
69+
final Map<String, Object> eventData = new HashMap<>();
70+
eventData.put("other_field", "some value");
71+
final Event event = eventFactory.eventBuilder(EventBuilder.class).withData(eventData).build();
72+
73+
codec.start(outputStream, event, codecContext);
74+
codec.writeEvent(event, outputStream);
75+
codec.complete(outputStream);
76+
77+
final String output = outputStream.toString(StandardCharsets.UTF_8);
78+
assertThat(output, equalTo(""));
79+
}
80+
81+
@Test
82+
void writeEvent_writes_nothing_when_message_is_null_and_include_empty_objects_false() throws IOException {
83+
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
84+
final OutputCodecContext codecContext = new OutputCodecContext();
85+
86+
final Map<String, Object> eventData = new HashMap<>();
87+
eventData.put("message", null);
88+
final Event event = eventFactory.eventBuilder(EventBuilder.class).withData(eventData).build();
89+
90+
codec.start(outputStream, event, codecContext);
91+
codec.writeEvent(event, outputStream);
92+
codec.complete(outputStream);
93+
94+
final String output = outputStream.toString(StandardCharsets.UTF_8);
95+
assertThat(output, equalTo(""));
96+
}
97+
98+
@Test
99+
void writeEvent_writes_empty_line_when_message_is_missing_and_include_empty_objects_true() throws IOException {
100+
config.setIncludeEmptyObjects(true);
101+
codec = new NewlineDelimitedOutputCodec(config);
102+
64103
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
65104
final OutputCodecContext codecContext = new OutputCodecContext();
66105

@@ -77,7 +116,10 @@ void writeEvent_writes_empty_string_when_message_is_missing() throws IOException
77116
}
78117

79118
@Test
80-
void writeEvent_writes_empty_string_when_message_is_null() throws IOException {
119+
void writeEvent_writes_empty_line_when_message_is_null_and_include_empty_objects_true() throws IOException {
120+
config.setIncludeEmptyObjects(true);
121+
codec = new NewlineDelimitedOutputCodec(config);
122+
81123
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
82124
final OutputCodecContext codecContext = new OutputCodecContext();
83125

0 commit comments

Comments
 (0)