Skip to content

Commit d40a066

Browse files
authored
* Addressed review comments and updated to use latest codec changes Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Removed failing testcase and added other tests Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Fix checkstyle error Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent 5340c3d commit d40a066

25 files changed

Lines changed: 3436 additions & 21 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/failures/DlqObject.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import java.time.ZoneId;
1616
import java.time.format.DateTimeFormatter;
1717
import java.util.Objects;
18+
import java.util.List;
19+
import java.util.ArrayList;
1820

1921
import static com.google.common.base.Preconditions.checkArgument;
2022
import static com.google.common.base.Preconditions.checkNotNull;
@@ -43,8 +45,11 @@ public class DlqObject {
4345
@JsonIgnore
4446
private final EventHandle eventHandle;
4547

48+
@JsonIgnore
49+
private final List<EventHandle> eventHandles;
50+
4651
private DlqObject(final String pluginId, final String pluginName, final String pipelineName,
47-
final String timestamp, final Object failedData, final EventHandle eventHandle) {
52+
final String timestamp, final Object failedData, final List<EventHandle> eventHandles) {
4853

4954
checkNotNull(pluginId, "pluginId cannot be null");
5055
checkArgument(!pluginId.isEmpty(), "pluginId cannot be an empty string");
@@ -58,7 +63,8 @@ private DlqObject(final String pluginId, final String pluginName, final String p
5863
this.pluginName = pluginName;
5964
this.pipelineName = pipelineName;
6065
this.failedData = failedData;
61-
this.eventHandle = eventHandle;
66+
this.eventHandles = eventHandles;
67+
this.eventHandle = null;
6268

6369
this.timestamp = StringUtils.isEmpty(timestamp) ? FORMATTER.format(Instant.now()) : timestamp;
6470
}
@@ -83,12 +89,18 @@ public String getTimestamp() {
8389
return timestamp;
8490
}
8591

86-
public EventHandle getEventHandle() {
87-
return eventHandle;
92+
public List<EventHandle> getEventHandles() {
93+
return eventHandles;
8894
}
8995

9096
public void releaseEventHandle(boolean result) {
91-
if (eventHandle != null) {
97+
if (eventHandles != null && eventHandles.size() == 1) {
98+
eventHandles.get(0).release(result);
99+
}
100+
}
101+
102+
public void releaseEventHandles(boolean result) {
103+
for (final EventHandle eventHandle: eventHandles) {
92104
eventHandle.release(result);
93105
}
94106
}
@@ -102,7 +114,7 @@ public boolean equals(final Object o) {
102114
&& Objects.equals(pluginId, that.pluginId)
103115
&& Objects.equals(pluginName, that.pluginName)
104116
&& Objects.equals(pipelineName, that.pipelineName)
105-
&& Objects.equals(eventHandle, that.eventHandle)
117+
&& Objects.equals(eventHandles, that.eventHandles)
106118
&& Objects.equals(timestamp, that.getTimestamp());
107119
}
108120

@@ -122,9 +134,9 @@ public String toString() {
122134
'}';
123135
}
124136

125-
public static DlqObject createDlqObject(PluginSetting pluginSetting, EventHandle eventHandle, Object failedData) {
137+
public static DlqObject createDlqObject(PluginSetting pluginSetting, List<EventHandle> eventHandles, Object failedData) {
126138
return DlqObject.builder()
127-
.withEventHandle(eventHandle)
139+
.withEventHandles(eventHandles)
128140
.withFailedData(failedData)
129141
.withPluginName(pluginSetting.getName())
130142
.withPipelineName(pluginSetting.getPipelineName())
@@ -142,7 +154,7 @@ public static class Builder {
142154
private String pluginName;
143155
private String pipelineName;
144156
private Object failedData;
145-
private EventHandle eventHandle;
157+
private List<EventHandle> eventHandles;
146158

147159
private String timestamp;
148160

@@ -171,8 +183,14 @@ public Builder withTimestamp(final String timestamp) {
171183
return this;
172184
}
173185

186+
public Builder withEventHandles(final List<EventHandle> eventHandles) {
187+
this.eventHandles = eventHandles;
188+
return this;
189+
}
190+
174191
public Builder withEventHandle(final EventHandle eventHandle) {
175-
this.eventHandle = eventHandle;
192+
this.eventHandles = new ArrayList<>();
193+
this.eventHandles.add(eventHandle);
176194
return this;
177195
}
178196

@@ -182,7 +200,7 @@ public Builder withTimestamp(final Instant instant) {
182200
}
183201

184202
public DlqObject build() {
185-
return new DlqObject(this.pluginId, this.pluginName, this.pipelineName, this.timestamp, this.failedData, this.eventHandle);
203+
return new DlqObject(this.pluginId, this.pluginName, this.pipelineName, this.timestamp, this.failedData, this.eventHandles);
186204
}
187205

188206
}

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/failures/DlqObjectTest.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import java.util.List;
2425
import static java.util.UUID.randomUUID;
2526
import static org.hamcrest.CoreMatchers.allOf;
2627
import static org.hamcrest.CoreMatchers.containsString;
@@ -70,6 +71,22 @@ public void test_build_with_timestamp() {
7071
assertThat(testObject, is(notNullValue()));
7172
}
7273

74+
@Test
75+
public void test_build_with_timestamp_with_event_handles() {
76+
77+
final DlqObject testObject = DlqObject.builder()
78+
.withPluginId(pluginId)
79+
.withPluginName(pluginName)
80+
.withPipelineName(pipelineName)
81+
.withFailedData(failedData)
82+
.withEventHandles(List.of(eventHandle))
83+
.withTimestamp(randomUUID().toString())
84+
.build();
85+
86+
assertThat(testObject, is(notNullValue()));
87+
}
88+
89+
7390
@Test
7491
public void test_build_without_timestamp() {
7592

@@ -133,9 +150,9 @@ public void test_createDlqObject() {
133150
when(pluginSetting.getPipelineName()).thenReturn(testPipelineName);
134151
eventHandle = mock(EventHandle.class);
135152
Map<String, Object> data = new HashMap<>();
136-
DlqObject dlqObject = DlqObject.createDlqObject(pluginSetting, eventHandle, data);
153+
DlqObject dlqObject = DlqObject.createDlqObject(pluginSetting, List.of(eventHandle), data);
137154
assertThat(dlqObject, is(notNullValue()));
138-
assertThat(dlqObject.getEventHandle(), is(eventHandle));
155+
assertThat(dlqObject.getEventHandles(), is(List.of(eventHandle)));
139156
assertThat(dlqObject.getFailedData(), is(data));
140157
assertThat(dlqObject.getPluginName(), is(testName));
141158
assertThat(dlqObject.getPipelineName(), is(testPipelineName));
@@ -191,13 +208,23 @@ public void test_get_failedData() {
191208
@Test
192209
public void test_get_release_eventHandle() {
193210
doAnswer(a -> { return null; }).when(eventHandle).release(any(Boolean.class));
194-
final Object actualEventHandle = testObject.getEventHandle();
195-
assertThat(actualEventHandle, is(notNullValue()));
196-
assertThat(actualEventHandle, is(eventHandle));
211+
final List<EventHandle> actualEventHandles = testObject.getEventHandles();
212+
assertThat(actualEventHandles, is(notNullValue()));
213+
assertThat(actualEventHandles, is(List.of(eventHandle)));
197214
testObject.releaseEventHandle(true);
198215
verify(eventHandle).release(any(Boolean.class));
199216
}
200217

218+
@Test
219+
public void test_get_release_eventHandles() {
220+
doAnswer(a -> { return null; }).when(eventHandle).release(any(Boolean.class));
221+
final List<EventHandle> actualEventHandles = testObject.getEventHandles();
222+
assertThat(actualEventHandles, is(notNullValue()));
223+
assertThat(actualEventHandles, is(List.of(eventHandle)));
224+
testObject.releaseEventHandles(true);
225+
verify(eventHandle).release(any(Boolean.class));
226+
}
227+
201228
@Test
202229
public void test_get_timestamp() {
203230
final String string = testObject.getTimestamp();

data-prepper-plugins/aws-plugin-api/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ dependencies {
33
implementation 'software.amazon.awssdk:auth'
44
implementation 'software.amazon.awssdk:apache-client'
55
implementation 'org.apache.httpcomponents.client5:httpclient5:5.3.1'
6+
implementation 'com.fasterxml.jackson.core:jackson-annotations'
7+
testImplementation libs.commons.lang3
68
testImplementation 'org.hibernate.validator:hibernate-validator:8.0.2.Final'
79
}
810

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.aws.api;
7+
8+
import com.fasterxml.jackson.annotation.JsonProperty;
9+
import jakarta.validation.constraints.Size;
10+
import software.amazon.awssdk.regions.Region;
11+
12+
import java.util.Map;
13+
14+
/**
15+
* AwsConfig is based on the S3-Sink AwsAuthenticationOptions
16+
* where the configuration allows the sink to fetch Aws credentials
17+
* and resources.
18+
*/
19+
public class AwsConfig {
20+
@JsonProperty("region")
21+
@Size(min = 1, message = "Region cannot be empty string")
22+
private String awsRegion;
23+
24+
@JsonProperty("sts_role_arn")
25+
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
26+
private String awsStsRoleArn;
27+
28+
@JsonProperty("sts_header_overrides")
29+
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
30+
private Map<String, String> awsStsHeaderOverrides;
31+
32+
@JsonProperty("sts_external_id")
33+
@Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters")
34+
private String awsStsExternalId;
35+
36+
public Region getAwsRegion() {
37+
return awsRegion != null ? Region.of(awsRegion) : null;
38+
}
39+
40+
public String getAwsStsRoleArn() {
41+
return awsStsRoleArn;
42+
}
43+
44+
public String getAwsStsExternalId() {
45+
return awsStsExternalId;
46+
}
47+
48+
public Map<String, String> getAwsStsHeaderOverrides() {
49+
return awsStsHeaderOverrides;
50+
}
51+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.aws.api;
7+
8+
import org.junit.jupiter.api.BeforeEach;
9+
import org.junit.jupiter.api.Test;
10+
11+
import java.lang.reflect.Field;
12+
13+
import static org.hamcrest.CoreMatchers.equalTo;
14+
import static org.hamcrest.MatcherAssert.assertThat;
15+
import org.apache.commons.lang3.RandomStringUtils;
16+
import software.amazon.awssdk.regions.Region;
17+
18+
import java.util.Map;
19+
20+
public class AwsConfigTest {
21+
22+
private AwsConfig awsConfig;
23+
24+
@BeforeEach
25+
void setUp() {
26+
awsConfig = new AwsConfig();
27+
}
28+
29+
@Test
30+
void TestConfigOptions_notNull() throws NoSuchFieldException, IllegalAccessException {
31+
32+
final String testStsRoleArn = RandomStringUtils.randomAlphabetic(10);
33+
reflectivelySetField(awsConfig, "awsStsRoleArn", testStsRoleArn);
34+
assertThat(awsConfig.getAwsStsRoleArn(), equalTo(testStsRoleArn));
35+
final String testStsExternalId = RandomStringUtils.randomAlphabetic(10);
36+
reflectivelySetField(awsConfig, "awsStsExternalId", testStsExternalId);
37+
assertThat(awsConfig.getAwsStsExternalId(), equalTo(testStsExternalId));
38+
39+
final Map<String, String> testStsHeaderOverrides = Map.of(RandomStringUtils.randomAlphabetic(5), RandomStringUtils.randomAlphabetic(10));
40+
reflectivelySetField(awsConfig, "awsStsHeaderOverrides", testStsHeaderOverrides);
41+
assertThat(awsConfig.getAwsStsHeaderOverrides(), equalTo(testStsHeaderOverrides));
42+
43+
final String testRegion = RandomStringUtils.randomAlphabetic(8);
44+
reflectivelySetField(awsConfig, "awsRegion", testRegion);
45+
assertThat(awsConfig.getAwsRegion(), equalTo(Region.of(testRegion)));
46+
}
47+
48+
private void reflectivelySetField(final AwsConfig awsConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException {
49+
final Field field = AwsConfig.class.getDeclaredField(fieldName);
50+
try {
51+
field.setAccessible(true);
52+
field.set(awsConfig, value);
53+
} finally {
54+
field.setAccessible(false);
55+
}
56+
}
57+
}
58+

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/utils/CloudWatchLogsSinkUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class CloudWatchLogsSinkUtils {
1919
public static DlqObject createDlqObject(final int status, final EventHandle eventHandle, final String message, final String failureMessage, final DlqPushHandler dlqPushHandler) {
2020
if (dlqPushHandler != null) {
2121
CloudWatchLogsSinkDlqData cloudWatchLogsSinkDlqData = CloudWatchLogsSinkDlqData.createDlqData(status, message, failureMessage);
22-
return DlqObject.createDlqObject(dlqPushHandler.getPluginSetting(), eventHandle, cloudWatchLogsSinkDlqData);
22+
return DlqObject.createDlqObject(dlqPushHandler.getPluginSetting(), List.of(eventHandle), cloudWatchLogsSinkDlqData);
2323
} else {
2424
eventHandle.release(false);
2525
}
@@ -35,7 +35,7 @@ public static void handleDlqObjects(List<DlqObject> dlqObjects, final DlqPushHan
3535
result = dlqPushHandler.perform(dlqObjects);
3636
}
3737
for (final DlqObject dlqObject : dlqObjects) {
38-
dlqObject.getEventHandle().release(result);
38+
dlqObject.releaseEventHandles(result);
3939
}
4040
}
4141
}

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/plugins/accumulator/InMemoryBuffer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.plugins.accumulator;
77

88
import org.apache.commons.lang3.time.StopWatch;
9+
910
import java.io.ByteArrayOutputStream;
1011
import java.io.IOException;
1112
import java.io.OutputStream;
@@ -16,12 +17,12 @@
1617
*/
1718
public class InMemoryBuffer implements Buffer {
1819

19-
private static final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
20+
private final ByteArrayOutputStream byteArrayOutputStream;
2021
private int eventCount;
2122
private final StopWatch watch;
2223

2324
InMemoryBuffer() {
24-
byteArrayOutputStream.reset();
25+
byteArrayOutputStream = new ByteArrayOutputStream();
2526
eventCount = 0;
2627
watch = new StopWatch();
2728
watch.start();
@@ -59,4 +60,4 @@ public OutputStream getOutputStream() {
5960
public void setEventCount(int eventCount) {
6061
this.eventCount = eventCount;
6162
}
62-
}
63+
}

0 commit comments

Comments
 (0)