Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.List;
import java.util.ArrayList;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -43,8 +45,11 @@ public class DlqObject {
@JsonIgnore
private final EventHandle eventHandle;

@JsonIgnore
private final List<EventHandle> eventHandles;

private DlqObject(final String pluginId, final String pluginName, final String pipelineName,
final String timestamp, final Object failedData, final EventHandle eventHandle) {
final String timestamp, final Object failedData, final List<EventHandle> eventHandles) {

checkNotNull(pluginId, "pluginId cannot be null");
checkArgument(!pluginId.isEmpty(), "pluginId cannot be an empty string");
Expand All @@ -58,7 +63,8 @@ private DlqObject(final String pluginId, final String pluginName, final String p
this.pluginName = pluginName;
this.pipelineName = pipelineName;
this.failedData = failedData;
this.eventHandle = eventHandle;
this.eventHandles = eventHandles;
this.eventHandle = null;

this.timestamp = StringUtils.isEmpty(timestamp) ? FORMATTER.format(Instant.now()) : timestamp;
}
Expand All @@ -83,12 +89,18 @@ public String getTimestamp() {
return timestamp;
}

public EventHandle getEventHandle() {
return eventHandle;
public List<EventHandle> getEventHandles() {
return eventHandles;
}

public void releaseEventHandle(boolean result) {
if (eventHandle != null) {
if (eventHandles != null && eventHandles.size() == 1) {
eventHandles.get(0).release(result);
}
}

public void releaseEventHandles(boolean result) {
for (final EventHandle eventHandle: eventHandles) {
eventHandle.release(result);
}
}
Expand All @@ -102,7 +114,7 @@ public boolean equals(final Object o) {
&& Objects.equals(pluginId, that.pluginId)
&& Objects.equals(pluginName, that.pluginName)
&& Objects.equals(pipelineName, that.pipelineName)
&& Objects.equals(eventHandle, that.eventHandle)
&& Objects.equals(eventHandles, that.eventHandles)
&& Objects.equals(timestamp, that.getTimestamp());
}

Expand All @@ -122,9 +134,9 @@ public String toString() {
'}';
}

public static DlqObject createDlqObject(PluginSetting pluginSetting, EventHandle eventHandle, Object failedData) {
public static DlqObject createDlqObject(PluginSetting pluginSetting, List<EventHandle> eventHandles, Object failedData) {
return DlqObject.builder()
.withEventHandle(eventHandle)
.withEventHandles(eventHandles)
.withFailedData(failedData)
.withPluginName(pluginSetting.getName())
.withPipelineName(pluginSetting.getPipelineName())
Expand All @@ -142,7 +154,7 @@ public static class Builder {
private String pluginName;
private String pipelineName;
private Object failedData;
private EventHandle eventHandle;
private List<EventHandle> eventHandles;

private String timestamp;

Expand Down Expand Up @@ -171,8 +183,14 @@ public Builder withTimestamp(final String timestamp) {
return this;
}

public Builder withEventHandles(final List<EventHandle> eventHandles) {
this.eventHandles = eventHandles;
return this;
}

public Builder withEventHandle(final EventHandle eventHandle) {
this.eventHandle = eventHandle;
this.eventHandles = new ArrayList<>();
this.eventHandles.add(eventHandle);
return this;
}

Expand All @@ -182,7 +200,7 @@ public Builder withTimestamp(final Instant instant) {
}

public DlqObject build() {
return new DlqObject(this.pluginId, this.pluginName, this.pipelineName, this.timestamp, this.failedData, this.eventHandle);
return new DlqObject(this.pluginId, this.pluginName, this.pipelineName, this.timestamp, this.failedData, this.eventHandles);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.List;
import static java.util.UUID.randomUUID;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
Expand Down Expand Up @@ -70,6 +71,22 @@ public void test_build_with_timestamp() {
assertThat(testObject, is(notNullValue()));
}

@Test
public void test_build_with_timestamp_with_event_handles() {

final DlqObject testObject = DlqObject.builder()
.withPluginId(pluginId)
.withPluginName(pluginName)
.withPipelineName(pipelineName)
.withFailedData(failedData)
.withEventHandles(List.of(eventHandle))
.withTimestamp(randomUUID().toString())
.build();

assertThat(testObject, is(notNullValue()));
}


@Test
public void test_build_without_timestamp() {

Expand Down Expand Up @@ -133,9 +150,9 @@ public void test_createDlqObject() {
when(pluginSetting.getPipelineName()).thenReturn(testPipelineName);
eventHandle = mock(EventHandle.class);
Map<String, Object> data = new HashMap<>();
DlqObject dlqObject = DlqObject.createDlqObject(pluginSetting, eventHandle, data);
DlqObject dlqObject = DlqObject.createDlqObject(pluginSetting, List.of(eventHandle), data);
assertThat(dlqObject, is(notNullValue()));
assertThat(dlqObject.getEventHandle(), is(eventHandle));
assertThat(dlqObject.getEventHandles(), is(List.of(eventHandle)));
assertThat(dlqObject.getFailedData(), is(data));
assertThat(dlqObject.getPluginName(), is(testName));
assertThat(dlqObject.getPipelineName(), is(testPipelineName));
Expand Down Expand Up @@ -191,13 +208,23 @@ public void test_get_failedData() {
@Test
public void test_get_release_eventHandle() {
doAnswer(a -> { return null; }).when(eventHandle).release(any(Boolean.class));
final Object actualEventHandle = testObject.getEventHandle();
assertThat(actualEventHandle, is(notNullValue()));
assertThat(actualEventHandle, is(eventHandle));
final List<EventHandle> actualEventHandles = testObject.getEventHandles();
assertThat(actualEventHandles, is(notNullValue()));
assertThat(actualEventHandles, is(List.of(eventHandle)));
testObject.releaseEventHandle(true);
verify(eventHandle).release(any(Boolean.class));
}

@Test
public void test_get_release_eventHandles() {
doAnswer(a -> { return null; }).when(eventHandle).release(any(Boolean.class));
final List<EventHandle> actualEventHandles = testObject.getEventHandles();
assertThat(actualEventHandles, is(notNullValue()));
assertThat(actualEventHandles, is(List.of(eventHandle)));
testObject.releaseEventHandles(true);
verify(eventHandle).release(any(Boolean.class));
}

@Test
public void test_get_timestamp() {
final String string = testObject.getTimestamp();
Expand Down
2 changes: 2 additions & 0 deletions data-prepper-plugins/aws-plugin-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ dependencies {
implementation 'software.amazon.awssdk:auth'
implementation 'software.amazon.awssdk:apache-client'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.3.1'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
testImplementation libs.commons.lang3
testImplementation 'org.hibernate.validator:hibernate-validator:8.0.2.Final'
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.aws.api;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.regions.Region;

import java.util.Map;

/**
* AwsConfig is based on the S3-Sink AwsAuthenticationOptions
* where the configuration allows the sink to fetch Aws credentials
* and resources.
*/
public class AwsConfig {
@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

@JsonProperty("sts_header_overrides")
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

@JsonProperty("sts_external_id")
@Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters")
private String awsStsExternalId;

public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public String getAwsStsRoleArn() {
return awsStsRoleArn;
}

public String getAwsStsExternalId() {
return awsStsExternalId;
}

public Map<String, String> getAwsStsHeaderOverrides() {
return awsStsHeaderOverrides;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.aws.api;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.lang.reflect.Field;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import org.apache.commons.lang3.RandomStringUtils;
import software.amazon.awssdk.regions.Region;

import java.util.Map;

public class AwsConfigTest {

private AwsConfig awsConfig;

@BeforeEach
void setUp() {
awsConfig = new AwsConfig();
}

@Test
void TestConfigOptions_notNull() throws NoSuchFieldException, IllegalAccessException {

final String testStsRoleArn = RandomStringUtils.randomAlphabetic(10);
reflectivelySetField(awsConfig, "awsStsRoleArn", testStsRoleArn);
assertThat(awsConfig.getAwsStsRoleArn(), equalTo(testStsRoleArn));
final String testStsExternalId = RandomStringUtils.randomAlphabetic(10);
reflectivelySetField(awsConfig, "awsStsExternalId", testStsExternalId);
assertThat(awsConfig.getAwsStsExternalId(), equalTo(testStsExternalId));

final Map<String, String> testStsHeaderOverrides = Map.of(RandomStringUtils.randomAlphabetic(5), RandomStringUtils.randomAlphabetic(10));
reflectivelySetField(awsConfig, "awsStsHeaderOverrides", testStsHeaderOverrides);
assertThat(awsConfig.getAwsStsHeaderOverrides(), equalTo(testStsHeaderOverrides));

final String testRegion = RandomStringUtils.randomAlphabetic(8);
reflectivelySetField(awsConfig, "awsRegion", testRegion);
assertThat(awsConfig.getAwsRegion(), equalTo(Region.of(testRegion)));
}

private void reflectivelySetField(final AwsConfig awsConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException {
final Field field = AwsConfig.class.getDeclaredField(fieldName);
try {
field.setAccessible(true);
field.set(awsConfig, value);
} finally {
field.setAccessible(false);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class CloudWatchLogsSinkUtils {
public static DlqObject createDlqObject(final int status, final EventHandle eventHandle, final String message, final String failureMessage, final DlqPushHandler dlqPushHandler) {
if (dlqPushHandler != null) {
CloudWatchLogsSinkDlqData cloudWatchLogsSinkDlqData = CloudWatchLogsSinkDlqData.createDlqData(status, message, failureMessage);
return DlqObject.createDlqObject(dlqPushHandler.getPluginSetting(), eventHandle, cloudWatchLogsSinkDlqData);
return DlqObject.createDlqObject(dlqPushHandler.getPluginSetting(), List.of(eventHandle), cloudWatchLogsSinkDlqData);
} else {
eventHandle.release(false);
}
Expand All @@ -35,7 +35,7 @@ public static void handleDlqObjects(List<DlqObject> dlqObjects, final DlqPushHan
result = dlqPushHandler.perform(dlqObjects);
}
for (final DlqObject dlqObject : dlqObjects) {
dlqObject.getEventHandle().release(result);
dlqObject.releaseEventHandles(result);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.accumulator;

import org.apache.commons.lang3.time.StopWatch;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -16,12 +17,12 @@
*/
public class InMemoryBuffer implements Buffer {

private static final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
private final ByteArrayOutputStream byteArrayOutputStream;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to make this change for all sinks? This may have a performance impact.

private int eventCount;
private final StopWatch watch;

InMemoryBuffer() {
byteArrayOutputStream.reset();
byteArrayOutputStream = new ByteArrayOutputStream();
eventCount = 0;
watch = new StopWatch();
watch.start();
Expand Down Expand Up @@ -59,4 +60,4 @@ public OutputStream getOutputStream() {
public void setEventCount(int eventCount) {
this.eventCount = eventCount;
}
}
}
Loading
Loading