Skip to content

Commit ae58d34

Browse files
committed
Removed unnecessary changes
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent f732245 commit ae58d34

10 files changed

Lines changed: 29 additions & 1552 deletions

File tree

data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/sink/DefaultSinkMetrics.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class DefaultSinkMetrics implements SinkMetrics {
1414
public static final String SINK_REQUESTS_SUCCEEDED = "SinkRequestsSucceeded";
1515
public static final String SINK_EVENTS_SUCCEEDED = "SinkEventsSucceeded";
1616
public static final String SINK_EVENTS_FAILED = "SinkEventsFailed";
17-
public static final String SINK_EVENTS_FAILED = "SinkEventsDropped";
17+
public static final String SINK_EVENTS_DROPPED = "SinkEventsDropped";
1818
public static final String SINK_REQUESTS_FAILED = "SinkRequestsFailed";
1919
public static final String SINK_REQUEST_LATENCY = "SinkRequestLatency";
2020
public static final String SINK_RETRIES = "SinkRetries";
@@ -33,7 +33,7 @@ public DefaultSinkMetrics(final PluginMetrics pluginMetrics, final String sinkPr
3333
this.sinkEventsSucceeded = pluginMetrics.counter(sinkPrefix + "Sink"+eventName+"sSucceeded");
3434
this.sinkRequestsFailed = pluginMetrics.counter(sinkPrefix + SINK_REQUESTS_FAILED);
3535
this.sinkEventsFailed = pluginMetrics.counter(sinkPrefix + "Sink"+eventName+"sFailed");
36-
this.sinkEventsFailed = pluginMetrics.counter(sinkPrefix + "Sink"+eventName+"sDropped");
36+
this.sinkEventsDropped = pluginMetrics.counter(sinkPrefix + "Sink"+eventName+"sDropped");
3737
this.sinkRetries = pluginMetrics.counter(sinkPrefix + SINK_RETRIES);
3838
this.sinkRequestLatency = pluginMetrics.summary(sinkPrefix + SINK_REQUEST_LATENCY);
3939
this.sinkRequestSize = pluginMetrics.summary(sinkPrefix + SINK_REQUEST_SIZE);

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkBatch.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package org.opensearch.dataprepper.plugins.sink.sqs;
77

88
import org.opensearch.dataprepper.model.event.Event;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
911
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
1012
import org.opensearch.dataprepper.model.codec.OutputCodec;
1113
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
@@ -30,7 +32,10 @@
3032
import java.util.UUID;
3133
import java.util.function.BiConsumer;
3234

35+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
36+
3337
public class SqsSinkBatch {
38+
private static final Logger LOG = LoggerFactory.getLogger(SqsSinkBatch.class);
3439
public static final int MAX_MESSAGES_PER_BATCH = 10;
3540
public static final int MAX_BATCH_SIZE_BYTES = 256*1024;
3641
private static final String SQS_FIFO_SUFFIX = ".fifo";
@@ -227,6 +232,7 @@ public boolean flushOnce() {
227232
sinkMetrics.recordRequestSize((double)requestSize);
228233

229234
} catch (SqsException e) {
235+
LOG.error(NOISY, "Failed to send messages to SQS: {}", e.getMessage());
230236
sinkMetrics.incrementRequestsFailedCounter(1);
231237
sinkMetrics.incrementEventsFailedCounter(entries.size());
232238
if (!isRetryableException(e)) {

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class SqsSinkConfig {
3131
private PluginModel codec;
3232

3333
@JsonProperty("threshold")
34+
@Valid
3435
private SqsThresholdConfig thresholdConfig = new SqsThresholdConfig();
3536

3637
@JsonProperty("max_retries")

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkService.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@
3030
import java.util.Iterator;
3131
import java.util.List;
3232
import java.util.Map;
33-
import org.opensearch.dataprepper.common.sink.RetrySinkOutputStrategy;
33+
import java.util.concurrent.locks.ReentrantLock;
3434

3535
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
3636

37-
public class SqsSinkService extends RetrySinkOutputStrategy {
37+
public class SqsSinkService extends SqsSinkExecutor {
3838
private static final Logger LOG = LoggerFactory.getLogger(SqsSinkService.class);
3939
public static final int MAX_BYTES_IN_BATCH = 256*1024;
4040
public static final int MAX_EVENT_SIZE = 256*1024;
@@ -48,6 +48,7 @@ public class SqsSinkService extends RetrySinkOutputStrategy {
4848
private final boolean isDynamicDeDupId;
4949
private final boolean isDynamicQueueUrl;
5050
private final ExpressionEvaluator expressionEvaluator;
51+
private final ReentrantLock reentrantLock;
5152
private final SqsThresholdConfig thresholdConfig;
5253
private final SqsSinkConfig sqsSinkConfig;
5354
private final SinkContext sinkContext;
@@ -74,6 +75,7 @@ public SqsSinkService(final SqsSinkConfig sqsSinkConfig,
7475
this.thresholdConfig = sqsSinkConfig.getThresholdConfig();
7576
this.codec = codec;
7677
this.sqsSinkConfig = sqsSinkConfig;
78+
reentrantLock = new ReentrantLock();
7779
this.sinkMetrics = new SqsSinkMetrics(pluginMetrics);
7880

7981
queueUrl = sqsSinkConfig.getQueueUrl();
@@ -122,7 +124,7 @@ public void pushDLQList() {
122124
}
123125

124126
@Override
125-
public void addFailedObjectsToDlqList(Object object) {
127+
public void pushFailedObjectsToDlq(Object object) {
126128
List<SqsSinkBatch> failedBatches = (List<SqsSinkBatch>) object;
127129
for (SqsSinkBatch failedBatch: failedBatches) {
128130
for (Map.Entry<String, SqsSinkBatchEntry> entry: failedBatch.getEntries().entrySet()) {
@@ -138,7 +140,7 @@ public long getEstimatedSize(final Event event) throws Exception {
138140
}
139141

140142
@Override
141-
public boolean willExceedMaxRequestSizeBytes(final Event event, final long estimatedSize) throws Exception {
143+
public boolean willExceedMaxBatchSize(final Event event, final long estimatedSize) throws Exception {
142144
String qUrl = getQueueUrl(event, false);
143145
if (qUrl == null)
144146
return false;
@@ -306,6 +308,16 @@ public void addEventToDLQList(final Event event, Throwable ex) {
306308
addMessageToDLQ(event.toJsonString(), eventHandles, ex.getMessage());
307309
}
308310

311+
@Override
312+
public void lock() {
313+
reentrantLock.lock();
314+
}
315+
316+
@Override
317+
public void unlock() {
318+
reentrantLock.unlock();
319+
}
320+
309321
void output(Collection<Record<Event>> records) {
310322
execute(records);
311323
}

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsThresholdConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import jakarta.validation.constraints.Max;
1111
import org.hibernate.validator.constraints.time.DurationMax;
1212
import org.hibernate.validator.constraints.time.DurationMin;
13+
//import org.opensearch.dataprepper.model.constraints.ByteCountMax;
14+
//import org.opensearch.dataprepper.model.constraints.ByteCountMin;
1315
import org.opensearch.dataprepper.model.types.ByteCount;
1416

1517
import java.time.Duration;
@@ -25,6 +27,8 @@ public class SqsThresholdConfig {
2527
private int maxEventsPerMessage = DEFAULT_MESSAGES_PER_EVENT;
2628

2729
@JsonProperty("max_message_size")
30+
//@ByteCountMin("1b")
31+
//@ByteCountMax("1mb")
2832
private ByteCount maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
2933

3034
@JsonProperty("flush_interval")

wasabi-toolbag/.metadata/bootstrap-time.txt

Lines changed: 0 additions & 1 deletion
This file was deleted.

wasabi-toolbag/content/workspace-summary.md

Lines changed: 0 additions & 248 deletions
This file was deleted.

0 commit comments

Comments
 (0)