Skip to content

Commit 66e6e3e

Browse files
committed
Merge branch 'main' into raw-string
2 parents 3acd31b + b422250 commit 66e6e3e

107 files changed

Lines changed: 6590 additions & 819 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void parse(InputStream inputStream, Instant timeReceived, Consumer<Record
7272
}
7373

7474
if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) {
75-
if (keyName != null && !nodeName.equals(keyName)) {
75+
if (keyName != null && !keyName.equals(nodeName)) {
7676
continue;
7777
}
7878
parseRecordsArray(jsonParser, timeReceived, eventConsumer, includeKeysMap, includeMetadataKeysMap);

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,61 @@ public interface OutputCodec {
2222

2323
static final ObjectMapper objectMapper = new ObjectMapper();
2424

25+
/**
26+
* A writer specific to a single buffer.
27+
*
28+
* @since 2.12
29+
*/
30+
interface Writer {
31+
/**
32+
* Writes a single event to the {@link OutputStream}.
33+
*
34+
* @param event A Data Prepper {@link Event}
35+
* @throws IOException An IO exception writing to the stream
36+
*
37+
* @since 2.12
38+
*/
39+
void writeEvent(Event event) throws IOException;
40+
41+
/**
42+
* Completes a writer.
43+
*
44+
* @throws IOException An IO exception completing the stream
45+
*
46+
* @since 2.12
47+
*/
48+
void complete() throws IOException;
49+
}
50+
51+
/**
52+
* Creates a new {@link Writer} for a given {@link OutputStream}.
53+
* Typically, you create one per buffer.
54+
*
55+
* @param outputStream The {@link OutputStream} to write to
56+
* @param sampleEvent A sample Data Prepper {@link Event}.
57+
* It is not written to the stream, but may be used for metadata.
58+
* @param codecContext The {@link OutputCodecContext}
59+
* @return A {@link Writer} to use for this buffer.
60+
* @throws IOException An IO exception occurs initializing the writer or stream
61+
*
62+
* @since 2.12
63+
*/
64+
default Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) throws IOException {
65+
final OutputCodec codec = this;
66+
codec.start(outputStream, sampleEvent, codecContext);
67+
return new Writer() {
68+
@Override
69+
public void writeEvent(final Event event) throws IOException {
70+
codec.writeEvent(event, outputStream);
71+
}
72+
73+
@Override
74+
public void complete() throws IOException {
75+
codec.complete(outputStream);
76+
}
77+
};
78+
}
79+
2580
/**
2681
* this method get called from {@link Sink} to do initial wrapping in {@link OutputStream}
2782
* Implementors should do initial wrapping according to the implementation
@@ -30,7 +85,9 @@ public interface OutputCodec {
3085
* @param event Event to auto-generate schema
3186
* @param context Extra Context used in Codec.
3287
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
88+
* @deprecated Use {@link OutputCodec#createWriter(OutputStream, Event, OutputCodecContext)} instead.
3389
*/
90+
@Deprecated
3491
void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException;
3592

3693
/**
@@ -40,7 +97,9 @@ public interface OutputCodec {
4097
* @param event event Record event
4198
* @param outputStream outputStream param to hold the event data
4299
* @throws IOException throws IOException when not able to write data to {@link OutputStream}
100+
* @deprecated @deprecated Use {@link OutputCodec.Writer#writeEvent(Event)} instead.
43101
*/
102+
@Deprecated
44103
void writeEvent(Event event, OutputStream outputStream) throws IOException;
45104

46105
/**
@@ -49,7 +108,9 @@ public interface OutputCodec {
49108
*
50109
* @param outputStream outputStream param for wrapping
51110
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
111+
* @deprecated @deprecated Use {@link Writer#complete()} instead.
52112
*/
113+
@Deprecated
53114
void complete(OutputStream outputStream) throws IOException;
54115

55116
/**

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/DlqObject.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.time.ZoneId;
1616
import java.time.format.DateTimeFormatter;
1717
import java.util.Objects;
18+
import java.util.List;
19+
import java.util.ArrayList;
1820

1921
import static com.google.common.base.Preconditions.checkArgument;
2022
import static com.google.common.base.Preconditions.checkNotNull;
@@ -43,8 +45,11 @@ public class DlqObject {
4345
@JsonIgnore
4446
private final EventHandle eventHandle;
4547

48+
@JsonIgnore
49+
private final List<EventHandle> eventHandles;
50+
4651
private DlqObject(final String pluginId, final String pluginName, final String pipelineName,
47-
final String timestamp, final Object failedData, final EventHandle eventHandle) {
52+
final String timestamp, final Object failedData, final List<EventHandle> eventHandles) {
4853

4954
checkNotNull(pluginId, "pluginId cannot be null");
5055
checkArgument(!pluginId.isEmpty(), "pluginId cannot be an empty string");
@@ -58,7 +63,8 @@ private DlqObject(final String pluginId, final String pluginName, final String p
5863
this.pluginName = pluginName;
5964
this.pipelineName = pipelineName;
6065
this.failedData = failedData;
61-
this.eventHandle = eventHandle;
66+
this.eventHandles = eventHandles;
67+
this.eventHandle = null;
6268

6369
this.timestamp = StringUtils.isEmpty(timestamp) ? FORMATTER.format(Instant.now()) : timestamp;
6470
}
@@ -83,12 +89,18 @@ public String getTimestamp() {
8389
return timestamp;
8490
}
8591

86-
public EventHandle getEventHandle() {
87-
return eventHandle;
92+
public List<EventHandle> getEventHandles() {
93+
return eventHandles;
8894
}
8995

9096
public void releaseEventHandle(boolean result) {
91-
if (eventHandle != null) {
97+
if (eventHandles != null && eventHandles.size() == 1) {
98+
eventHandles.get(0).release(result);
99+
}
100+
}
101+
102+
public void releaseEventHandles(boolean result) {
103+
for (final EventHandle eventHandle: eventHandles) {
92104
eventHandle.release(result);
93105
}
94106
}
@@ -102,7 +114,7 @@ public boolean equals(final Object o) {
102114
&& Objects.equals(pluginId, that.pluginId)
103115
&& Objects.equals(pluginName, that.pluginName)
104116
&& Objects.equals(pipelineName, that.pipelineName)
105-
&& Objects.equals(eventHandle, that.eventHandle)
117+
&& Objects.equals(eventHandles, that.eventHandles)
106118
&& Objects.equals(timestamp, that.getTimestamp());
107119
}
108120

@@ -122,9 +134,9 @@ public String toString() {
122134
'}';
123135
}
124136

125-
public static DlqObject createDlqObject(PluginSetting pluginSetting, EventHandle eventHandle, Object failedData) {
137+
public static DlqObject createDlqObject(PluginSetting pluginSetting, List<EventHandle> eventHandles, Object failedData) {
126138
return DlqObject.builder()
127-
.withEventHandle(eventHandle)
139+
.withEventHandles(eventHandles)
128140
.withFailedData(failedData)
129141
.withPluginName(pluginSetting.getName())
130142
.withPipelineName(pluginSetting.getPipelineName())
@@ -142,7 +154,7 @@ public static class Builder {
142154
private String pluginName;
143155
private String pipelineName;
144156
private Object failedData;
145-
private EventHandle eventHandle;
157+
private List<EventHandle> eventHandles;
146158

147159
private String timestamp;
148160

@@ -171,8 +183,14 @@ public Builder withTimestamp(final String timestamp) {
171183
return this;
172184
}
173185

186+
public Builder withEventHandles(final List<EventHandle> eventHandles) {
187+
this.eventHandles = eventHandles;
188+
return this;
189+
}
190+
174191
public Builder withEventHandle(final EventHandle eventHandle) {
175-
this.eventHandle = eventHandle;
192+
this.eventHandles = new ArrayList<>();
193+
this.eventHandles.add(eventHandle);
176194
return this;
177195
}
178196

@@ -182,7 +200,7 @@ public Builder withTimestamp(final Instant instant) {
182200
}
183201

184202
public DlqObject build() {
185-
return new DlqObject(this.pluginId, this.pluginName, this.pipelineName, this.timestamp, this.failedData, this.eventHandle);
203+
return new DlqObject(this.pluginId, this.pluginName, this.pipelineName, this.timestamp, this.failedData, this.eventHandles);
186204
}
187205

188206
}

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/JsonDecoderTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,20 @@ void test_basicJsonDecoder_withInputConfig() throws IOException {
171171
assertThat(receivedTime, equalTo(now));
172172
}
173173

174+
@Test
175+
void test_JsonDecoder_withKeyName_when_parsing_json_array_should_skip() throws IOException {
176+
final Instant now = Instant.now();
177+
String inputString = "[]";
178+
List<Record<Event>> records = new ArrayList<>();
179+
jsonDecoder = new JsonDecoder(key_name, null, null, maxEventLength);
180+
jsonDecoder.parse(new ByteArrayInputStream(inputString.getBytes()), now, (record) -> {
181+
records.add(record);
182+
receivedTime = record.getData().getEventHandle().getInternalOriginationTime();
183+
});
184+
185+
assertTrue(records.isEmpty());
186+
}
187+
174188
@Test
175189
void test_basicJsonDecoder_withInputConfig_withoutEvents_empty_metadata_keys() throws IOException {
176190
final Instant now = Instant.now();

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/codec/OutputCodecTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,17 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
16
package org.opensearch.dataprepper.model.codec;
27

38
import com.fasterxml.jackson.core.JsonProcessingException;
9+
import org.junit.jupiter.api.Nested;
410
import org.junit.jupiter.api.Test;
11+
import org.junit.jupiter.api.extension.ExtendWith;
12+
import org.mockito.Mock;
513
import org.mockito.invocation.InvocationOnMock;
14+
import org.mockito.junit.jupiter.MockitoExtension;
615
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
716
import org.opensearch.dataprepper.model.event.Event;
817
import org.opensearch.dataprepper.model.event.EventMetadata;
@@ -20,11 +29,16 @@
2029
import java.util.UUID;
2130

2231
import static org.hamcrest.CoreMatchers.equalTo;
32+
import static org.hamcrest.CoreMatchers.not;
33+
import static org.hamcrest.CoreMatchers.sameInstance;
2334
import static org.hamcrest.MatcherAssert.assertThat;
2435
import static org.junit.jupiter.api.Assertions.assertNotEquals;
36+
import static org.mockito.Mockito.doCallRealMethod;
2537
import static org.mockito.Mockito.mock;
38+
import static org.mockito.Mockito.verify;
2639
import static org.mockito.Mockito.verifyNoInteractions;
2740

41+
@ExtendWith(MockitoExtension.class)
2842
public class OutputCodecTest {
2943
@Test
3044
void isCompressionInternal_returns_false() {
@@ -85,4 +99,61 @@ private static Map<String, Object> generateJson() {
8599
UUID.randomUUID().toString(), UUID.randomUUID().toString()));
86100
return jsonObject;
87101
}
102+
103+
@Nested
104+
class DefaultWriter {
105+
@Mock
106+
private OutputStream outputStream;
107+
@Mock
108+
private Event event;
109+
@Mock
110+
private OutputCodecContext outputCodecContext;
111+
112+
@Test
113+
void createWriter_returns_new_instance() throws IOException {
114+
final OutputCodec objectUnderTest = mock(OutputCodec.class);
115+
116+
doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext);
117+
118+
assertThat(objectUnderTest.createWriter(outputStream, event, outputCodecContext),
119+
not(sameInstance(objectUnderTest.createWriter(outputStream, event, outputCodecContext))));
120+
}
121+
122+
@Test
123+
void createWriter_calls_start() throws IOException {
124+
final OutputCodec objectUnderTest = mock(OutputCodec.class);
125+
126+
doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext);
127+
128+
objectUnderTest.createWriter(outputStream, event, outputCodecContext);
129+
130+
verify(objectUnderTest).start(outputStream, event, outputCodecContext);
131+
}
132+
133+
@Test
134+
void writer_writeEvent_calls_writeEvent_on_OutputCodec() throws IOException {
135+
final OutputCodec objectUnderTest = mock(OutputCodec.class);
136+
137+
doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext);
138+
139+
OutputCodec.Writer writer = objectUnderTest.createWriter(outputStream, event, outputCodecContext);
140+
141+
writer.writeEvent(event);
142+
143+
verify(objectUnderTest).writeEvent(event, outputStream);
144+
}
145+
146+
@Test
147+
void writer_complete_calls_complete_on_OutputCodec() throws IOException {
148+
final OutputCodec objectUnderTest = mock(OutputCodec.class);
149+
150+
doCallRealMethod().when(objectUnderTest).createWriter(outputStream, event, outputCodecContext);
151+
152+
OutputCodec.Writer writer = objectUnderTest.createWriter(outputStream, event, outputCodecContext);
153+
154+
writer.complete();
155+
156+
verify(objectUnderTest).complete(outputStream);
157+
}
158+
}
88159
}

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/failures/DlqObjectTest.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.List;
2425
import static java.util.UUID.randomUUID;
2526
import static org.hamcrest.CoreMatchers.allOf;
2627
import static org.hamcrest.CoreMatchers.containsString;
@@ -70,6 +71,22 @@ public void test_build_with_timestamp() {
7071
assertThat(testObject, is(notNullValue()));
7172
}
7273

74+
@Test
75+
public void test_build_with_timestamp_with_event_handles() {
76+
77+
final DlqObject testObject = DlqObject.builder()
78+
.withPluginId(pluginId)
79+
.withPluginName(pluginName)
80+
.withPipelineName(pipelineName)
81+
.withFailedData(failedData)
82+
.withEventHandles(List.of(eventHandle))
83+
.withTimestamp(randomUUID().toString())
84+
.build();
85+
86+
assertThat(testObject, is(notNullValue()));
87+
}
88+
89+
7390
@Test
7491
public void test_build_without_timestamp() {
7592

@@ -133,9 +150,9 @@ public void test_createDlqObject() {
133150
when(pluginSetting.getPipelineName()).thenReturn(testPipelineName);
134151
eventHandle = mock(EventHandle.class);
135152
Map<String, Object> data = new HashMap<>();
136-
DlqObject dlqObject = DlqObject.createDlqObject(pluginSetting, eventHandle, data);
153+
DlqObject dlqObject = DlqObject.createDlqObject(pluginSetting, List.of(eventHandle), data);
137154
assertThat(dlqObject, is(notNullValue()));
138-
assertThat(dlqObject.getEventHandle(), is(eventHandle));
155+
assertThat(dlqObject.getEventHandles(), is(List.of(eventHandle)));
139156
assertThat(dlqObject.getFailedData(), is(data));
140157
assertThat(dlqObject.getPluginName(), is(testName));
141158
assertThat(dlqObject.getPipelineName(), is(testPipelineName));
@@ -191,13 +208,23 @@ public void test_get_failedData() {
191208
@Test
192209
public void test_get_release_eventHandle() {
193210
doAnswer(a -> { return null; }).when(eventHandle).release(any(Boolean.class));
194-
final Object actualEventHandle = testObject.getEventHandle();
195-
assertThat(actualEventHandle, is(notNullValue()));
196-
assertThat(actualEventHandle, is(eventHandle));
211+
final List<EventHandle> actualEventHandles = testObject.getEventHandles();
212+
assertThat(actualEventHandles, is(notNullValue()));
213+
assertThat(actualEventHandles, is(List.of(eventHandle)));
197214
testObject.releaseEventHandle(true);
198215
verify(eventHandle).release(any(Boolean.class));
199216
}
200217

218+
@Test
219+
public void test_get_release_eventHandles() {
220+
doAnswer(a -> { return null; }).when(eventHandle).release(any(Boolean.class));
221+
final List<EventHandle> actualEventHandles = testObject.getEventHandles();
222+
assertThat(actualEventHandles, is(notNullValue()));
223+
assertThat(actualEventHandles, is(List.of(eventHandle)));
224+
testObject.releaseEventHandles(true);
225+
verify(eventHandle).release(any(Boolean.class));
226+
}
227+
201228
@Test
202229
public void test_get_timestamp() {
203230
final String string = testObject.getTimestamp();

0 commit comments

Comments
 (0)