Skip to content

Commit dd55aaf

Browse files
Merge branch 'main' into fix-issue-5530
Signed-off-by: Saketh Pallempati <pallempati.saketh@fmr.com>
2 parents 8a8b864 + 14f53cd commit dd55aaf

551 files changed

Lines changed: 29570 additions & 2780 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

RELEASING.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,12 @@ You can also deny a release by using _deny_ or _denied_ in the comment.
170170

171171
**NOTE** The smoke tests currently report a build failure, even when they succeed. Thus, you need to manually verify the output.
172172

173+
### OpenSearch CI server build
174+
175+
After two maintainers approve the GitHub issue, the [OpenSearch CI server](https://build.ci.opensearch.org/) will start a build.
176+
This is a Jenkins server that promotes our releases.
177+
You can check the promotion status by checking the [release-data-prepper](https://build.ci.opensearch.org/job/release-data-prepper/) job.
178+
173179
### Further details
174180

175181
For more details on the release build, or to setup your own GitHub repository, see [release/README.md](release/README.md).

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.model.buffer;
77

88
import org.opensearch.dataprepper.model.CheckpointState;
9+
import org.opensearch.dataprepper.model.plugin.PluginComponentType;
910
import org.opensearch.dataprepper.model.record.Record;
1011

1112
import java.time.Duration;
@@ -18,6 +19,7 @@
1819
* Buffer queues the records between TI components and acts as a layer between source and processor/sink. Buffer can
1920
* be in-memory, disk based or other a standalone implementation.
2021
*/
22+
@PluginComponentType("buffer")
2123
public interface Buffer<T extends Record<?>> {
2224
/**
2325
* writes the record to the buffer

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/JsonDecoder.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.fasterxml.jackson.core.JsonFactory;
99
import com.fasterxml.jackson.core.JsonParser;
1010
import com.fasterxml.jackson.core.JsonToken;
11+
import com.fasterxml.jackson.core.StreamReadConstraints;
1112
import com.fasterxml.jackson.databind.ObjectMapper;
1213

1314
import org.opensearch.dataprepper.model.event.Event;
@@ -30,10 +31,15 @@ public class JsonDecoder implements ByteDecoder {
3031
private Collection<String> includeKeys;
3132
private Collection<String> includeKeysMetadata;
3233

33-
public JsonDecoder(String keyName, Collection<String> includeKeys, Collection<String> includeKeysMetadata) {
34+
public JsonDecoder(String keyName, Collection<String> includeKeys, Collection<String> includeKeysMetadata, Integer maxEventLength) {
3435
this.keyName = keyName;
3536
this.includeKeys = includeKeys;
3637
this.includeKeysMetadata = includeKeysMetadata;
38+
if (maxEventLength != null) {
39+
jsonFactory.setStreamReadConstraints(StreamReadConstraints.builder()
40+
.maxStringLength(maxEventLength)
41+
.build());
42+
}
3743
}
3844

3945
public JsonDecoder() {
@@ -66,7 +72,7 @@ public void parse(InputStream inputStream, Instant timeReceived, Consumer<Record
6672
}
6773

6874
if (jsonParser.getCurrentToken() == JsonToken.START_ARRAY) {
69-
if (keyName != null && !nodeName.equals(keyName)) {
75+
if (keyName != null && !keyName.equals(nodeName)) {
7076
continue;
7177
}
7278
parseRecordsArray(jsonParser, timeReceived, eventConsumer, includeKeysMap, includeMetadataKeysMap);

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/codec/OutputCodec.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,79 @@
1515

1616
import java.io.IOException;
1717
import java.io.OutputStream;
18+
import java.io.ByteArrayOutputStream;
1819
import java.util.Map;
1920

2021
public interface OutputCodec {
2122

2223
static final ObjectMapper objectMapper = new ObjectMapper();
2324

25+
/**
26+
* A writer specific to a single buffer.
27+
*
28+
* @since 2.12
29+
*/
30+
interface Writer {
31+
/**
32+
* Writes a single event to the {@link OutputStream}.
33+
*
34+
* @param event A Data Prepper {@link Event}
35+
* @throws IOException An IO exception writing to the stream
36+
*
37+
* @since 2.12
38+
*/
39+
void writeEvent(Event event) throws IOException;
40+
41+
/**
42+
* Completes a writer.
43+
*
44+
* @throws IOException An IO exception completing the stream
45+
*
46+
* @since 2.12
47+
*/
48+
void complete() throws IOException;
49+
}
50+
51+
/**
52+
* Creates a new {@link Writer} for a given {@link OutputStream}.
53+
* Typically, you create one per buffer.
54+
*
55+
* @param outputStream The {@link OutputStream} to write to
56+
* @param sampleEvent A sample Data Prepper {@link Event}.
57+
* It is not written to the stream, but may be used for metadata.
58+
* @param codecContext The {@link OutputCodecContext}
59+
* @return A {@link Writer} to use for this buffer.
60+
* @throws IOException An IO exception occurs initializing the writer or stream
61+
*
62+
* @since 2.12
63+
*/
64+
default Writer createWriter(final OutputStream outputStream, final Event sampleEvent, final OutputCodecContext codecContext) throws IOException {
65+
final OutputCodec codec = this;
66+
codec.start(outputStream, sampleEvent, codecContext);
67+
return new Writer() {
68+
@Override
69+
public void writeEvent(final Event event) throws IOException {
70+
codec.writeEvent(event, outputStream);
71+
}
72+
73+
@Override
74+
public void complete() throws IOException {
75+
codec.complete(outputStream);
76+
}
77+
};
78+
}
79+
2480
/**
2581
* this method get called from {@link Sink} to do initial wrapping in {@link OutputStream}
2682
* Implementors should do initial wrapping according to the implementation
2783
*
2884
* @param outputStream outputStream param for wrapping
2985
* @param event Event to auto-generate schema
3086
* @param context Extra Context used in Codec.
31-
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
87+
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
88+
* @deprecated Use {@link OutputCodec#createWriter(OutputStream, Event, OutputCodecContext)} instead.
3289
*/
90+
@Deprecated
3391
void start(OutputStream outputStream, Event event, OutputCodecContext context) throws IOException;
3492

3593
/**
@@ -39,7 +97,9 @@ public interface OutputCodec {
3997
* @param event event Record event
4098
* @param outputStream outputStream param to hold the event data
4199
* @throws IOException throws IOException when not able to write data to {@link OutputStream}
100+
* @deprecated @deprecated Use {@link OutputCodec.Writer#writeEvent(Event)} instead.
42101
*/
102+
@Deprecated
43103
void writeEvent(Event event, OutputStream outputStream) throws IOException;
44104

45105
/**
@@ -48,9 +108,26 @@ public interface OutputCodec {
48108
*
49109
* @param outputStream outputStream param for wrapping
50110
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
111+
* @deprecated @deprecated Use {@link Writer#complete()} instead.
51112
*/
113+
@Deprecated
52114
void complete(OutputStream outputStream) throws IOException;
53115

116+
/**
117+
* this method get called from {@link Sink} to estimate size of event in {@link OutputStream}
118+
*
119+
* @param event event Record event
120+
* @return long size of the serialized event
121+
* @throws IOException throws IOException when invalid input is received or not able to create wrapping
122+
*/
123+
default long getEstimatedSize(Event event, OutputCodecContext codecContext) throws IOException {
124+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
125+
start(outputStream, event, codecContext);
126+
writeEvent(event, outputStream);
127+
complete(outputStream);
128+
return outputStream.toByteArray().length;
129+
}
130+
54131
/**
55132
* used to get extension of file
56133
*

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/DataPrepperVersion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import java.util.regex.Pattern;
77

88
public class DataPrepperVersion {
9-
private static final String CURRENT_VERSION = "2.11";
9+
private static final String CURRENT_VERSION = "2.12";
1010

1111
private static final String FULL_FORMAT = "%d.%d";
1212
private static final String SHORTHAND_FORMAT = "%d";
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.encryption;
7+
8+
public interface EncryptionEngine {
9+
/**
10+
* Encrypts raw data into {@link EncryptionEnvelope}.
11+
*
12+
* @param data the raw data in bytes
13+
* @return returns the encryption envelope
14+
*/
15+
EncryptionEnvelope encrypt(byte[] data);
16+
17+
/**
18+
* Decrypts the encryption envelope into raw data.
19+
*
20+
* @param encryptionEnvelope the encryption envelope
21+
* @return returns the raw data in bytes
22+
*/
23+
byte[] decrypt(EncryptionEnvelope encryptionEnvelope);
24+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.encryption;
7+
8+
public interface EncryptionEnvelope {
9+
/**
10+
* The encrypted data.
11+
*/
12+
byte[] getEncryptedData();
13+
14+
/**
15+
* The encrypted data key.
16+
*/
17+
String getEncryptedDataKey();
18+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.model.encryption;
7+
8+
@FunctionalInterface
9+
public interface KeyProvider {
10+
byte[] decryptKey(byte[] encryptedKey);
11+
}

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public class JacksonEvent implements Event {
6161

6262
private static final Logger LOG = LoggerFactory.getLogger(JacksonEvent.class);
6363

64+
private static final int FILL_OUT_OF_BOUNDS_ELEMENTS_LIMIT = 0;
65+
6466
private static final String SEPARATOR = "/";
6567

6668
private static final ObjectMapper mapper = JsonMapper.builder()
@@ -195,8 +197,31 @@ private JsonNode getOrCreateNode(final JsonNode node, final String key) {
195197
JsonNode childNode = node.get(key);
196198
if (childNode == null) {
197199
childNode = mapper.createObjectNode();
198-
((ObjectNode) node).set(key, childNode);
200+
if (node.isArray()) {
201+
int index = Integer.parseInt(key);
202+
ArrayNode arrayNode = (ArrayNode) node;
203+
204+
int distanceFromArrayEnd = index - arrayNode.size();
205+
if (distanceFromArrayEnd >= FILL_OUT_OF_BOUNDS_ELEMENTS_LIMIT + 1) {
206+
throw new IndexOutOfBoundsException(
207+
String.format("Cannot expand array past the limit of size %s to reach index %s", arrayNode.size(), index));
208+
}
209+
while (arrayNode.size() <= index) {
210+
arrayNode.addNull();
211+
}
212+
213+
JsonNode existing = arrayNode.get(index);
214+
if (existing == null || !existing.isObject()) {
215+
childNode = mapper.createObjectNode();
216+
arrayNode.set(index, childNode);
217+
} else {
218+
childNode = existing;
219+
}
220+
} else {
221+
((ObjectNode) node).set(key, childNode);
222+
}
199223
}
224+
200225
return childNode;
201226
}
202227

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/DlqObject.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.time.ZoneId;
1616
import java.time.format.DateTimeFormatter;
1717
import java.util.Objects;
18+
import java.util.List;
19+
import java.util.ArrayList;
1820

1921
import static com.google.common.base.Preconditions.checkArgument;
2022
import static com.google.common.base.Preconditions.checkNotNull;
@@ -43,8 +45,11 @@ public class DlqObject {
4345
@JsonIgnore
4446
private final EventHandle eventHandle;
4547

48+
@JsonIgnore
49+
private final List<EventHandle> eventHandles;
50+
4651
private DlqObject(final String pluginId, final String pluginName, final String pipelineName,
47-
final String timestamp, final Object failedData, final EventHandle eventHandle) {
52+
final String timestamp, final Object failedData, final List<EventHandle> eventHandles) {
4853

4954
checkNotNull(pluginId, "pluginId cannot be null");
5055
checkArgument(!pluginId.isEmpty(), "pluginId cannot be an empty string");
@@ -58,7 +63,8 @@ private DlqObject(final String pluginId, final String pluginName, final String p
5863
this.pluginName = pluginName;
5964
this.pipelineName = pipelineName;
6065
this.failedData = failedData;
61-
this.eventHandle = eventHandle;
66+
this.eventHandles = eventHandles;
67+
this.eventHandle = null;
6268

6369
this.timestamp = StringUtils.isEmpty(timestamp) ? FORMATTER.format(Instant.now()) : timestamp;
6470
}
@@ -83,12 +89,18 @@ public String getTimestamp() {
8389
return timestamp;
8490
}
8591

86-
public EventHandle getEventHandle() {
87-
return eventHandle;
92+
public List<EventHandle> getEventHandles() {
93+
return eventHandles;
8894
}
8995

9096
public void releaseEventHandle(boolean result) {
91-
if (eventHandle != null) {
97+
if (eventHandles != null && eventHandles.size() == 1) {
98+
eventHandles.get(0).release(result);
99+
}
100+
}
101+
102+
public void releaseEventHandles(boolean result) {
103+
for (final EventHandle eventHandle: eventHandles) {
92104
eventHandle.release(result);
93105
}
94106
}
@@ -102,7 +114,7 @@ public boolean equals(final Object o) {
102114
&& Objects.equals(pluginId, that.pluginId)
103115
&& Objects.equals(pluginName, that.pluginName)
104116
&& Objects.equals(pipelineName, that.pipelineName)
105-
&& Objects.equals(eventHandle, that.eventHandle)
117+
&& Objects.equals(eventHandles, that.eventHandles)
106118
&& Objects.equals(timestamp, that.getTimestamp());
107119
}
108120

@@ -122,9 +134,9 @@ public String toString() {
122134
'}';
123135
}
124136

125-
public static DlqObject createDlqObject(PluginSetting pluginSetting, EventHandle eventHandle, Object failedData) {
137+
public static DlqObject createDlqObject(PluginSetting pluginSetting, List<EventHandle> eventHandles, Object failedData) {
126138
return DlqObject.builder()
127-
.withEventHandle(eventHandle)
139+
.withEventHandles(eventHandles)
128140
.withFailedData(failedData)
129141
.withPluginName(pluginSetting.getName())
130142
.withPipelineName(pluginSetting.getPipelineName())
@@ -142,7 +154,7 @@ public static class Builder {
142154
private String pluginName;
143155
private String pipelineName;
144156
private Object failedData;
145-
private EventHandle eventHandle;
157+
private List<EventHandle> eventHandles;
146158

147159
private String timestamp;
148160

@@ -171,8 +183,14 @@ public Builder withTimestamp(final String timestamp) {
171183
return this;
172184
}
173185

186+
public Builder withEventHandles(final List<EventHandle> eventHandles) {
187+
this.eventHandles = eventHandles;
188+
return this;
189+
}
190+
174191
public Builder withEventHandle(final EventHandle eventHandle) {
175-
this.eventHandle = eventHandle;
192+
this.eventHandles = new ArrayList<>();
193+
this.eventHandles.add(eventHandle);
176194
return this;
177195
}
178196

@@ -182,7 +200,7 @@ public Builder withTimestamp(final Instant instant) {
182200
}
183201

184202
public DlqObject build() {
185-
return new DlqObject(this.pluginId, this.pluginName, this.pipelineName, this.timestamp, this.failedData, this.eventHandle);
203+
return new DlqObject(this.pluginId, this.pluginName, this.pipelineName, this.timestamp, this.failedData, this.eventHandles);
186204
}
187205

188206
}

0 commit comments

Comments
 (0)