Skip to content

Commit b42daa1

Browse files
committed
Adding support for pipeline DLQ to SQS sink
1 parent 8e09a2a commit b42daa1

7 files changed

Lines changed: 159 additions & 14 deletions

File tree

data-prepper-plugins/sqs-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkIT.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.plugins.sink.sqs;
77

8+
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
89
import org.opensearch.dataprepper.model.configuration.PluginSetting;
910
import org.opensearch.dataprepper.model.configuration.PluginModel;
1011
import org.opensearch.dataprepper.model.plugin.PluginFactory;
@@ -117,6 +118,9 @@ public class SqsSinkIT {
117118
@Mock
118119
private PluginModel codec;
119120

121+
@Mock
122+
private PipelineDescription pipelineDescription;
123+
120124
@Mock
121125
private Counter eventsSuccessCounter;
122126
@Mock
@@ -256,7 +260,7 @@ void setUp() {
256260
when(sqsSinkConfig.getCodec()).thenReturn(codec);
257261
when(sqsSinkConfig.getAwsConfig()).thenReturn(awsConfig);
258262
when(sqsSinkConfig.getDlq()).thenReturn(null);
259-
263+
when(pipelineDescription.getPipelineName()).thenReturn("test-pipeline");
260264
thresholdConfig = mock(SqsThresholdConfig.class);
261265
lenient().when(sqsSinkConfig.getMaxRetries()).thenReturn(3);
262266
when(thresholdConfig.getMaxEventsPerMessage()).thenReturn(1);
@@ -323,7 +327,7 @@ private void deleteObjectsWithPrefix(String bucketName, String prefix) {
323327
}
324328

325329
private SqsSink createObjectUnderTest() {
326-
return new SqsSink(pluginSetting, pluginMetrics, pluginFactory, sqsSinkConfig, sinkContext, expressionEvaluator, awsCredentialsSupplier);
330+
return new SqsSink(pluginSetting, pluginMetrics, pluginFactory, sqsSinkConfig, sinkContext, expressionEvaluator, awsCredentialsSupplier, pipelineDescription);
327331
}
328332

329333
private List<Message> getMessages(final String queueUrl) {

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSink.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.plugins.sink.sqs;
77

8+
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
89
import org.opensearch.dataprepper.model.configuration.PluginSetting;
910
import org.opensearch.dataprepper.metrics.PluginMetrics;
1011
import org.opensearch.dataprepper.model.configuration.PluginModel;
@@ -54,7 +55,8 @@ public SqsSink(final PluginSetting pluginSetting,
5455
final SqsSinkConfig sqsSinkConfig,
5556
final SinkContext sinkContext,
5657
final ExpressionEvaluator expressionEvaluator,
57-
final AwsCredentialsSupplier awsCredentialsSupplier) {
58+
final AwsCredentialsSupplier awsCredentialsSupplier,
59+
final PipelineDescription pipelineDescription) {
5860
super(pluginSetting);
5961
this.sqsSinkConfig = sqsSinkConfig;
6062
sinkInitialized = false;
@@ -86,7 +88,7 @@ public SqsSink(final PluginSetting pluginSetting,
8688
dlqPushHandler = new DlqPushHandler(pluginFactory, pluginSetting, pluginMetrics, sqsSinkConfig.getDlq(), region.toString(), role, "sqsSink");
8789
}
8890
final OutputCodec outputCodec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings);
89-
sqsSinkService = new SqsSinkService(sqsSinkConfig, sqsClient, expressionEvaluator, outputCodec, sinkContext, dlqPushHandler, pluginMetrics);
91+
sqsSinkService = new SqsSinkService(sqsSinkConfig, sqsClient, expressionEvaluator, outputCodec, sinkContext, dlqPushHandler, pluginMetrics, pluginSetting, pipelineDescription);
9092
}
9193

9294
private static AwsCredentialsOptions convertToCredentialOptions(final AwsConfig awsConfig) {
@@ -106,6 +108,7 @@ public boolean isReady() {
106108
@Override
107109
public void doInitialize() {
108110
sinkInitialized = true;
111+
sqsSinkService.setDlqPipeline(getFailurePipeline());
109112
}
110113

111114
/**

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkBatchEntry.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
public class SqsSinkBatchEntry {
1919
private final List<EventHandle> eventHandles;
20+
private final List<Event> events;
2021
private final String groupId;
2122
private final String deDupId;
2223
private final Buffer buffer;
@@ -29,6 +30,7 @@ public class SqsSinkBatchEntry {
2930

3031
public SqsSinkBatchEntry(final Buffer buffer, final String groupId, final String deDupId, final OutputCodec codec, final OutputCodecContext codecContext) {
3132
this.eventHandles = new ArrayList<>();
33+
this.events = new ArrayList<>();
3234
this.buffer = buffer;
3335
completed = false;
3436
this.groupId = groupId;
@@ -58,6 +60,7 @@ public void addEvent(final Event event) throws Exception {
5860
writer = codec.createWriter(buffer.getOutputStream(), null, codecContext);
5961
}
6062
writer.writeEvent(event);
63+
events.add(event);
6164
eventHandles.add(event.getEventHandle());
6265
eventCount++;
6366
}
@@ -91,5 +94,9 @@ public int getEventCount() {
9194
return eventCount;
9295
}
9396

97+
public List<Event> getEvents() {
98+
return events;
99+
}
100+
94101
}
95102

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkService.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
import org.opensearch.dataprepper.metrics.PluginMetrics;
1313
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
1414
import org.opensearch.dataprepper.model.codec.OutputCodec;
15+
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
16+
import org.opensearch.dataprepper.model.configuration.PluginSetting;
17+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1518
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
1619
import org.opensearch.dataprepper.plugins.accumulator.BufferFactory;
1720
import org.opensearch.dataprepper.plugins.accumulator.InMemoryBufferFactory;
@@ -62,16 +65,24 @@ public class SqsSinkService extends SqsSinkExecutor {
6265
private final SqsSinkMetrics sinkMetrics;
6366
private final DlqPushHandler dlqPushHandler;
6467
private final List<DlqObject> dlqObjects;
68+
private HeadlessPipeline dlqPipeline;
69+
private final List<Record<Event>> dlqPipelineRecords;
70+
private final PluginSetting pluginSetting;
71+
private final PipelineDescription pipelineDescription;
6572

6673
public SqsSinkService(final SqsSinkConfig sqsSinkConfig,
6774
final SqsClient sqsClient,
6875
final ExpressionEvaluator expressionEvaluator,
6976
final OutputCodec codec,
7077
final SinkContext sinkContext,
7178
final DlqPushHandler dlqPushHandler,
72-
final PluginMetrics pluginMetrics) {
79+
final PluginMetrics pluginMetrics,
80+
final PluginSetting pluginSetting,
81+
final PipelineDescription pipelineDescription
82+
) {
7383
batchUrlMap = new HashMap<>();
7484
dlqObjects = new ArrayList<>();
85+
dlqPipelineRecords = new ArrayList<>();
7586
inMemoryBufferFactory =new InMemoryBufferFactory();
7687
this.sqsClient = sqsClient;
7788
this.dlqPushHandler = dlqPushHandler;
@@ -82,7 +93,8 @@ public SqsSinkService(final SqsSinkConfig sqsSinkConfig,
8293
this.sqsSinkConfig = sqsSinkConfig;
8394
reentrantLock = new ReentrantLock();
8495
this.sinkMetrics = new SqsSinkMetrics(pluginMetrics);
85-
96+
this.pluginSetting = pluginSetting;
97+
this.pipelineDescription = pipelineDescription;
8698
queueUrl = sqsSinkConfig.getQueueUrl();
8799
isDynamicQueueUrl = queueUrl.contains("${");
88100
if (isDynamicQueueUrl) {
@@ -109,13 +121,23 @@ public SqsSinkService(final SqsSinkConfig sqsSinkConfig,
109121

110122
}
111123

124+
public void setDlqPipeline(HeadlessPipeline pipeline) {
125+
this.dlqPipeline = pipeline;
126+
}
127+
112128
@Override
113129
public boolean exceedsMaxEventSizeThreshold(final long estimatedSize) {
114130
return estimatedSize > MAX_EVENT_SIZE;
115131
}
116132

117133
@Override
118134
public void pushDLQList() {
135+
if (dlqPipeline != null && !dlqPipelineRecords.isEmpty()) {
136+
dlqPipeline.sendEvents(dlqPipelineRecords);
137+
dlqPipelineRecords.clear();
138+
return;
139+
}
140+
119141
// If DLQ push handler is null, dlqObjects list
120142
// would be empty
121143
if (dlqObjects.size() == 0) {
@@ -286,10 +308,24 @@ public boolean exceedsFlushTimeInterval() {
286308
}
287309

288310
private void addBatchEntryToDLQ(final SqsSinkBatchEntry batchEntry, final String errorMessage) {
289-
addMessageToDLQ(batchEntry.getBody(), batchEntry.getEventHandles(), errorMessage);
311+
addMessageToDLQ(batchEntry.getBody(), batchEntry.getEventHandles(), batchEntry.getEvents(), errorMessage);
290312
}
291313

292-
private void addMessageToDLQ(final String message, final List<EventHandle> eventHandles, final String errorMessage) {
314+
private void addMessageToDLQ(final String message, final List<EventHandle> eventHandles, List<Event> events, final String errorMessage) {
315+
if (dlqPipeline != null) {
316+
for (Event event: events) {
317+
if (event != null) {
318+
event.updateFailureMetadata()
319+
.with("pluginId", pluginSetting.getName())
320+
.with("pluginName", pluginSetting.getName())
321+
.with("pipelineName", pipelineDescription.getPipelineName())
322+
.with("sqsSinkQueueUrl", queueUrl)
323+
.with("message", errorMessage);
324+
dlqPipelineRecords.add(new Record<>(event));
325+
}
326+
}
327+
return;
328+
}
293329
if (dlqPushHandler != null) {
294330
SqsSinkDlqData sqsSinkDlqData = SqsSinkDlqData.createDlqData(message, errorMessage);
295331
DlqObject dlqObject = DlqObject.createDlqObject(dlqPushHandler.getPluginSetting(), eventHandles, sqsSinkDlqData);
@@ -309,8 +345,9 @@ public void recordLatency(long amount, TimeUnit timeUnit) {
309345
@Override
310346
public void addEventToDLQList(final Event event, Throwable ex) {
311347
List<EventHandle> eventHandles = new ArrayList<>();
348+
List<Event> events = List.of(event);
312349
eventHandles.add(event.getEventHandle());
313-
addMessageToDLQ(event.toJsonString(), eventHandles, ex.getMessage());
350+
addMessageToDLQ(event.toJsonString(), eventHandles, events, ex.getMessage());
314351
}
315352

316353
@Override

data-prepper-plugins/sqs-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkBatchEntryTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ void TestBasic() {
6767
assertThat(sqsSinkBatchEntry.getGroupId(), equalTo(groupId));
6868
assertThat(sqsSinkBatchEntry.getDedupId(), equalTo(deDupId));
6969
assertTrue(sqsSinkBatchEntry.getEventHandles().isEmpty());
70+
assertTrue(sqsSinkBatchEntry.getEvents().isEmpty());
7071
}
7172

7273
@Test
@@ -83,6 +84,7 @@ void TestAddingOneEvent() throws Exception {
8384
assertThat(sqsSinkBatchEntry.getGroupId(), equalTo(groupId));
8485
assertThat(sqsSinkBatchEntry.getDedupId(), equalTo(deDupId));
8586
assertThat(sqsSinkBatchEntry.getEventHandles().size(), equalTo(1));
87+
assertThat(sqsSinkBatchEntry.getEvents().size(), equalTo(1));
8688
}
8789

8890

@@ -123,6 +125,7 @@ void TestAddingMultipleEvents(int numRecords) throws Exception {
123125
assertThat(sqsSinkBatchEntry.getDedupId(), equalTo(deDupId));
124126
assertThat(sqsSinkBatchEntry.getSize(), equalTo(expectedSize));
125127
assertThat(sqsSinkBatchEntry.getEventHandles().size(), equalTo(numRecords));
128+
assertThat(sqsSinkBatchEntry.getEvents().size(), equalTo(numRecords));
126129
}
127130

128131
@Test

data-prepper-plugins/sqs-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkServiceTest.java

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.junit.jupiter.api.extension.ExtendWith;
1414
import org.mockito.junit.jupiter.MockitoExtension;
1515
import org.opensearch.dataprepper.model.codec.OutputCodec;
16+
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
17+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1618
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
1719
import org.opensearch.dataprepper.model.plugin.PluginFactory;
1820
import org.opensearch.dataprepper.metrics.PluginMetrics;
@@ -28,7 +30,9 @@
2830
import org.junit.jupiter.params.ParameterizedTest;
2931
import org.junit.jupiter.params.provider.ValueSource;
3032
import org.mockito.Mock;
33+
import static org.mockito.Mockito.doAnswer;
3134
import static org.mockito.Mockito.mock;
35+
import static org.mockito.Mockito.never;
3236
import static org.mockito.Mockito.when;
3337
import static org.mockito.Mockito.lenient;
3438
import static org.mockito.Mockito.verify;
@@ -37,6 +41,7 @@
3741
import static org.junit.jupiter.api.Assertions.assertThrows;
3842
import static org.hamcrest.CoreMatchers.equalTo;
3943
import static org.hamcrest.CoreMatchers.not;
44+
import static org.mockito.ArgumentMatchers.anyBoolean;
4045
import static org.mockito.ArgumentMatchers.anyString;
4146
import static org.mockito.ArgumentMatchers.any;
4247
import static org.mockito.ArgumentMatchers.eq;
@@ -92,6 +97,12 @@ class SqsSinkServiceTest {
9297
@Mock(strictness = Mock.Strictness.LENIENT)
9398
private SinkContext sinkContext;
9499

100+
private HeadlessPipeline dlqPipeline;
101+
@Mock(strictness = Mock.Strictness.LENIENT)
102+
private PipelineDescription pipelineDescription;
103+
@Mock(strictness = Mock.Strictness.LENIENT)
104+
private PluginSetting pluginSetting;
105+
95106
@Mock
96107
private Counter eventsSuccessCounter;
97108
@Mock
@@ -118,7 +129,8 @@ class SqsSinkServiceTest {
118129
private String queueUrl;
119130

120131
private SqsSinkService createObjectUnderTest() {
121-
return new SqsSinkService(sqsSinkConfig, sqsClient, expressionEvaluator, outputCodec, sinkContext, dlqPushHandler, pluginMetrics);
132+
return new SqsSinkService(sqsSinkConfig, sqsClient, expressionEvaluator, outputCodec, sinkContext, dlqPushHandler,
133+
pluginMetrics, pluginSetting, pipelineDescription);
122134
}
123135

124136
@BeforeEach
@@ -139,12 +151,12 @@ void setup() {
139151
when (thresholdConfig.getMaxMessageSizeBytes()).thenReturn(256*1024L);
140152
when (thresholdConfig.getMaxEventsPerMessage()).thenReturn(1);
141153
when (sqsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig);
154+
when (pipelineDescription.getPipelineName()).thenReturn("pipeline");
142155
lenient().when (sqsSinkConfig.getMaxRetries()).thenReturn(3);
143156
lenient().when(flushResponse.hasFailed()).thenReturn(false);
144157
when(sqsClient.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(flushResponse);
145158
when(expressionEvaluator.isValidFormatExpression(anyString())).thenReturn(true);
146159
when(dlqPushHandler.perform(any(List.class))).thenReturn(true);
147-
PluginSetting pluginSetting = mock(PluginSetting.class);
148160
lenient().when(pluginSetting.getName()).thenReturn("name");
149161
lenient().when(pluginSetting.getPipelineName()).thenReturn("pipeline");
150162
when(dlqPushHandler.getPluginSetting()).thenReturn(pluginSetting);
@@ -520,6 +532,83 @@ void TestWithOneBatch_RetryFlushes() throws Exception {
520532
verify(eventHandle, times(numRecords)).release(true);
521533
}
522534

535+
@Test
536+
void TestLargeRecordToPipelineDLQ() {
537+
dlqPipeline = mock(HeadlessPipeline.class);
538+
539+
List<Record<Event>> capturedRecords = new ArrayList<>();
540+
doAnswer(invocation -> {
541+
List<Record<Event>> arg = invocation.getArgument(0);
542+
capturedRecords.addAll(arg);
543+
return null;
544+
}).when(dlqPipeline).sendEvents(any());
545+
SqsSinkService sqsSinkService = createObjectUnderTest();
546+
sqsSinkService.setDlqPipeline(dlqPipeline);
547+
Record<Event> record = getLargeRecord(300 * 1024);
548+
sqsSinkService.execute(List.of(record));
549+
550+
verify(dlqPipeline, times(1)).sendEvents(any());
551+
assertThat(capturedRecords.size(), equalTo(1));
552+
553+
Event event = capturedRecords.get(0).getData();
554+
assertThat(event.get("_failure_metadata/pluginId", String.class), equalTo("name"));
555+
assertThat(event.get("_failure_metadata/pluginName", String.class), equalTo("name"));
556+
assertThat(event.get("_failure_metadata/pipelineName", String.class), equalTo("pipeline"));
557+
assertThat(event.get("_failure_metadata/sqsSinkQueueUrl", String.class), equalTo(queueUrl));
558+
verify(eventHandle, never()).release(anyBoolean());
559+
}
560+
561+
@Test
562+
void TestSendingToPipelineDLQAfterMultipleRetries() {
563+
dlqPipeline = mock(HeadlessPipeline.class);
564+
List<Record<Event>> capturedRecords = new ArrayList<>();
565+
doAnswer(invocation -> {
566+
List<Record<Event>> arg = invocation.getArgument(0);
567+
capturedRecords.addAll(arg);
568+
return null;
569+
}).when(dlqPipeline).sendEvents(any());
570+
571+
final int numRecords = 10;
572+
RequestThrottledException requestThrottledException = mock(RequestThrottledException.class);
573+
when(sqsClient.sendMessageBatch(any(SendMessageBatchRequest.class))).thenThrow(requestThrottledException);
574+
575+
SqsSinkService sqsSinkService = createObjectUnderTest();
576+
sqsSinkService.setDlqPipeline(dlqPipeline);
577+
List<Record<Event>> records = getRecordList(numRecords);
578+
sqsSinkService.execute(records);
579+
580+
assertThat(capturedRecords.size(), equalTo(numRecords));
581+
assertThat(sqsSinkService.getBatchUrlMap().size(), equalTo(0));
582+
assertThat(eventsSuccessCount.get(), equalTo(0));
583+
assertThat(requestsSuccessCount.get(), equalTo(0));
584+
verify(eventHandle, never()).release(anyBoolean());
585+
}
586+
587+
@Test
588+
void TestSendingToPipelineDLQAfterNonRetryableException() {
589+
dlqPipeline = mock(HeadlessPipeline.class);
590+
List<Record<Event>> capturedRecords = new ArrayList<>();
591+
doAnswer(invocation -> {
592+
List<Record<Event>> arg = invocation.getArgument(0);
593+
capturedRecords.addAll(arg);
594+
return null;
595+
}).when(dlqPipeline).sendEvents(any());
596+
597+
final int numRecords = 10;
598+
UnsupportedOperationException unsupportedOperationException = mock(UnsupportedOperationException.class);
599+
when(unsupportedOperationException.getMessage()).thenReturn("Unsupported operation");
600+
when(sqsClient.sendMessageBatch(any(SendMessageBatchRequest.class))).thenThrow(unsupportedOperationException);
601+
602+
SqsSinkService sqsSinkService = createObjectUnderTest();
603+
sqsSinkService.setDlqPipeline(dlqPipeline);
604+
605+
List<Record<Event>> records = getRecordList(numRecords);
606+
sqsSinkService.execute(records);
607+
608+
assertThat(capturedRecords.size(), equalTo(numRecords));
609+
verify(eventHandle, never()).release(anyBoolean());
610+
}
611+
523612
private List<Record<Event>> getLargeRecordList(int numberOfRecords) {
524613
final List<Record<Event>> recordList = new ArrayList<>();
525614
for (int i = 0; i < numberOfRecords; i++) {

0 commit comments

Comments
 (0)