Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data-prepper-plugins/sqs-sink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = 0.90
minimum = 0.99
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.aws.api.AwsConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -65,6 +66,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -125,6 +127,8 @@ public class SqsSinkIT {
private Counter requestsFailedCounter;
@Mock
private Counter dlqSuccessCounter;
@Mock
private DistributionSummary summary;

private JsonOutputCodec jsonCodec;
private String bucket;
Expand Down Expand Up @@ -165,6 +169,8 @@ void setUp() {
requestsSuccessCounter = mock(Counter.class);
requestsFailedCounter = mock(Counter.class);
dlqSuccessCounter = mock(Counter.class);
summary = mock(DistributionSummary.class);
doNothing().when(summary).record(any(Double.class));
lenient().doAnswer((a)-> {
int v = (int)(double)(a.getArgument(0));
eventsSuccessCount.addAndGet(v);
Expand Down Expand Up @@ -213,6 +219,7 @@ void setUp() {
}
return null;
}).when(pluginMetrics).counter(anyString());
when(pluginMetrics.summary(anyString())).thenReturn(summary);
messages = new ArrayList<>();
pluginFactory = mock(PluginFactory.class);
jsonCodec = new JsonOutputCodec(new JsonOutputCodecConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,9 @@ public SqsSink(final PluginSetting pluginSetting,
codecPluginSettings = new PluginSetting("ndjson", Map.of());
}

final OutputCodec outputCodec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings);
AwsConfig awsConfig = sqsSinkConfig.getAwsConfig();
if (awsConfig == null && awsCredentialsSupplier == null) {
throw new RuntimeException("Missing awsConfig and awsCredentialsSupplier");
}
final AwsCredentialsProvider awsCredentialsProvider = awsConfig != null ? awsCredentialsSupplier.getProvider(convertToCredentialOptions(awsConfig)) : awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder().build());
Region region = awsConfig != null ? awsConfig.getAwsRegion() : awsCredentialsSupplier.getDefaultRegion().get();
final AwsCredentialsProvider awsCredentialsProvider = (awsConfig != null) ? awsCredentialsSupplier.getProvider(convertToCredentialOptions(awsConfig)) : awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder().build());
Region region = (awsConfig != null) ? awsConfig.getAwsRegion() : awsCredentialsSupplier.getDefaultRegion().get();
final SqsClient sqsClient = SqsClientFactory.createSqsClient(region, awsCredentialsProvider);

DlqPushHandler dlqPushHandler = null;
Expand All @@ -89,6 +85,7 @@ public SqsSink(final PluginSetting pluginSetting,
String role = stsClient.getCallerIdentity().arn();
dlqPushHandler = new DlqPushHandler(pluginFactory, pluginSetting, pluginMetrics, sqsSinkConfig.getDlq(), region.toString(), role, "sqsSink");
}
final OutputCodec outputCodec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings);
sqsSinkService = new SqsSinkService(sqsSinkConfig, sqsClient, expressionEvaluator, outputCodec, sinkContext, dlqPushHandler, pluginMetrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@


import java.time.Instant;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -43,19 +46,22 @@ public class SqsSinkBatch {
private final SqsClient sqsClient;
private final BufferFactory bufferFactory;
private final SqsSinkMetrics sinkMetrics;
private String currentId;
private SqsSinkBatchEntry currentBatchEntry;
private final BiConsumer<SqsSinkBatchEntry, String> addToDLQList;

public SqsSinkBatch(final BufferFactory bufferFactory,
final SqsClient sqsClient,
final SqsSinkMetrics sinkMetrics,
final String queueUrl,
final OutputCodec codec,
final OutputCodecContext codecContext,
final long maxMessageSize,
final int maxEvents) {
this.maxMessageSize = maxMessageSize;
final SqsThresholdConfig thresholdConfig,
final BiConsumer<SqsSinkBatchEntry, String> addToDLQList) {
this.bufferFactory = bufferFactory;
this.maxEvents = maxEvents;
this.maxMessageSize = thresholdConfig.getMaxMessageSizeBytes();
this.maxEvents = thresholdConfig.getMaxEventsPerMessage();
this.addToDLQList = addToDLQList;
this.codec = codec;
this.sinkMetrics = sinkMetrics;
this.codecContext = codecContext;
Expand All @@ -66,14 +72,15 @@ public SqsSinkBatch(final BufferFactory bufferFactory,
fifoQueue = queueUrl.endsWith(SQS_FIFO_SUFFIX);
entries = new HashMap<>();
currentBatchEntry = null;
currentId = null;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please explain what is the purpose of this

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in deletion of messages.

}

public String getQueueUrl() {
return queueUrl;
}

private boolean isFull() {
return entries.size() == MAX_MESSAGES_PER_BATCH && (currentBatchEntry.getEventCount() == maxEvents || currentBatchEntry.getSize() == maxMessageSize);
return entries.size() == MAX_MESSAGES_PER_BATCH && currentBatchEntry.getEventCount() == maxEvents;
}

public boolean willExceedLimits(long estimatedSize) {
Expand All @@ -99,7 +106,12 @@ public boolean addEntry(final Event event, String groupId, String deDupId, final
currentBatchEntry.addEvent(event);
return isFull();
} else {
currentBatchEntry.complete();
try {
currentBatchEntry.complete();
} catch (IOException ex) {
addToDLQList.accept(currentBatchEntry, ex.getMessage());
entries.remove(currentId);
}
}
}
if (entries.size() == MAX_MESSAGES_PER_BATCH) {
Expand All @@ -115,6 +127,7 @@ public boolean addEntry(final Event event, String groupId, String deDupId, final

currentBatchEntry.addEvent(event);
final String id = UUID.randomUUID().toString();
currentId = id;
entries.put(id, currentBatchEntry);
return isFull();
}
Expand All @@ -126,7 +139,16 @@ public long getLastFlushedTime() {
public long getCurrentBatchSize() {
long sum = 0;
for (Map.Entry<String, SqsSinkBatchEntry> entry : entries.entrySet()) {
sum += entry.getValue().getSize();
SqsSinkBatchEntry batchEntry = entry.getValue();
sum += batchEntry.getSize();
if (fifoQueue) {
if (batchEntry.getGroupId() != null) {
sum += batchEntry.getGroupId().length();
}
if (batchEntry.getDedupId() != null) {
sum += batchEntry.getDedupId().length();
}
}
}
return sum;
}
Expand All @@ -135,9 +157,16 @@ public int getEventCount() {
return entries.values().stream().mapToInt(SqsSinkBatchEntry::getEventCount).sum();
}

public void setFlushReady() throws Exception {
for (Map.Entry<String, SqsSinkBatchEntry> entry: entries.entrySet()) {
entry.getValue().complete();
public void setFlushReady() {
Iterator<Map.Entry<String, SqsSinkBatchEntry>> iterator = entries.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, SqsSinkBatchEntry> entry = iterator.next();
try {
entry.getValue().complete();
} catch (IOException ex) {
addToDLQList.accept(entry.getValue(), ex.getMessage());
iterator.remove();
}
}
flushReady = true;
}
Expand All @@ -162,16 +191,31 @@ private boolean isRetryableException(SqsException e) {
return (e instanceof RequestThrottledException);
}

public boolean flushOnce(final BiConsumer<SqsSinkBatchEntry, String> addToDLQList) {
if (!isReady()) {
private long getEntrySize(SqsSinkBatchEntry entry) {
long result = entry.getBody().getBytes(StandardCharsets.UTF_8).length;
if (fifoQueue) {
if (entry.getGroupId() != null) {
result += entry.getGroupId().getBytes(StandardCharsets.UTF_8).length;
}
if (entry.getDedupId() != null) {
result += entry.getDedupId().getBytes(StandardCharsets.UTF_8).length;
}
}
return result;
}

public boolean flushOnce() {
if (!isReady() || entries.size() == 0) {
return true;
}
SendMessageBatchResponse flushResponse;
List<SendMessageBatchRequestEntry> requestEntries = new ArrayList<>();
long requestSize = 0;
for (Map.Entry<String, SqsSinkBatchEntry> groupEntry: entries.entrySet()) {
final String id = groupEntry.getKey();
final SqsSinkBatchEntry entry = groupEntry.getValue();
requestEntries.add(getRequestEntry(id, entry));
requestSize += getEntrySize(entry);
}
SendMessageBatchRequest batchRequest =
SendMessageBatchRequest.builder()
Expand All @@ -180,6 +224,8 @@ public boolean flushOnce(final BiConsumer<SqsSinkBatchEntry, String> addToDLQLis
.build();
try {
flushResponse = sqsClient.sendMessageBatch(batchRequest);
sinkMetrics.recordRequestSize((double)requestSize);

} catch (SqsException e) {
sinkMetrics.incrementRequestsFailedCounter(1);
sinkMetrics.incrementEventsFailedCounter(entries.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
import org.opensearch.dataprepper.plugins.accumulator.Buffer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -65,12 +66,12 @@ public long getSize() {
return buffer.getSize();
}

public void complete() throws Exception {
public void complete() throws IOException {
if (completed) {
return;
}
writer.complete();
completed = true;
writer.complete();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
import java.time.Duration;
import java.util.Collection;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;

public abstract class SqsSinkExecutor {
private static final Logger LOG = LoggerFactory.getLogger(SqsSinkExecutor.class);
private static final long INITIAL_DELAY_MS = 10;
private static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis();


public void execute(Collection<Record<Event>> records) {
if (records.isEmpty()) {
lock();
Expand All @@ -33,6 +30,7 @@ public void execute(Collection<Record<Event>> records) {
} finally {
unlock();
}
pushDLQList();
return;
}
lock();
Expand Down Expand Up @@ -66,23 +64,25 @@ public void flushBuffer() {
Object failedStatus = null;
int maxRetries = getMaxRetries();
final Backoff backoff = Backoff.exponential(INITIAL_DELAY_MS, MAXIMUM_DELAY_MS).withMaxAttempts(maxRetries);
long startTime = System.nanoTime();
while (retryCount <= maxRetries) {
failedStatus = doFlushOnce(failedStatus);
if (failedStatus != null) {
final long delayMillis = backoff.nextDelayMillis(retryCount);
if (delayMillis < 0) {
break;
}
try {
Thread.sleep(delayMillis);
} catch (final InterruptedException e){
LOG.error(NOISY, "Thread is interrupted while attempting to SQS with retry.", e);
}
if (failedStatus == null) {
break;
}
final long delayMillis = backoff.nextDelayMillis(retryCount);
if (delayMillis < 0) {
break;
}
try {
Thread.sleep(delayMillis);
} catch (final InterruptedException e){}
retryCount++;
}
if (failedStatus != null) {
pushFailedObjectsToDlq(failedStatus);
} else {
recordLatency((double)System.nanoTime() - startTime);
}
}

Expand All @@ -96,6 +96,7 @@ public void flushBuffer() {
public abstract boolean willExceedMaxBatchSize(final Event event, final long estimatedSize) throws Exception;
public abstract boolean exceedsMaxEventSizeThreshold(final long estimatedSize);
public abstract long getEstimatedSize(final Event event) throws Exception;
public abstract void recordLatency(double latencyMillis);

public abstract void lock();
public abstract void unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,30 @@
package org.opensearch.dataprepper.plugins.sink.sqs;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.metrics.PluginMetrics;

public class SqsSinkMetrics {
public static final String SQS_SINK_REQUESTS_SUCCEEDED = "sqsSinkRequestsSucceeded";
public static final String SQS_SINK_EVENTS_SUCCEEDED = "sqsSinkEventsSucceeded";
public static final String SQS_SINK_EVENTS_FAILED = "sqsSinkEventsFailed";
public static final String SQS_SINK_REQUESTS_FAILED = "sqsSinkRequestsFailed";
public static final String SQS_SINK_REQUEST_LATENCY = "sqsSinkRequestLatency";
public static final String SQS_SINK_REQUEST_SIZE = "sqsSinkRequestSize";
private final Counter sqsSinkRequestsSucceeded;
private final Counter sqsSinkEventsSucceeded;
private final Counter sqsSinkRequestsFailed;
private final Counter sqsSinkEventsFailed;
private final DistributionSummary sqsSinkRequestLatency;
private final DistributionSummary sqsSinkRequestSize;

public SqsSinkMetrics(final PluginMetrics pluginMetrics) {
this.sqsSinkRequestsSucceeded = pluginMetrics.counter(SQS_SINK_REQUESTS_SUCCEEDED);
this.sqsSinkEventsSucceeded = pluginMetrics.counter(SQS_SINK_EVENTS_SUCCEEDED);
this.sqsSinkRequestsFailed = pluginMetrics.counter(SQS_SINK_REQUESTS_FAILED);
this.sqsSinkEventsFailed = pluginMetrics.counter(SQS_SINK_EVENTS_FAILED);
this.sqsSinkRequestLatency = pluginMetrics.summary(SQS_SINK_REQUEST_LATENCY);
this.sqsSinkRequestSize = pluginMetrics.summary(SQS_SINK_REQUEST_SIZE);
}

public void incrementEventsSuccessCounter(int value) {
Expand All @@ -40,4 +47,12 @@ public void incrementEventsFailedCounter(int value) {
public void incrementRequestsFailedCounter(int value) {
sqsSinkRequestsFailed.increment(value);
}

public void recordRequestLatency(double value) {
sqsSinkRequestLatency.record(value);
}

public void recordRequestSize(double value) {
sqsSinkRequestSize.record(value);
}
}
Loading
Loading