Skip to content

Commit 2f8c836

Browse files
authored
Adding support for pipeline DLQ to SQS sink (#6593)
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent db75470 commit 2f8c836

12 files changed

Lines changed: 219 additions & 26 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventFailureMetadata.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,15 @@
1111

1212
public interface EventFailureMetadata {
1313
EventFailureMetadata with(String key, Object value);
14+
15+
EventFailureMetadata withPluginId(String value);
16+
17+
EventFailureMetadata withPluginName(String value);
18+
19+
EventFailureMetadata withPipelineName(String value);
20+
21+
EventFailureMetadata withErrorMessage(Object value);
22+
23+
1424
}
1525

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,32 @@
6464
public class JacksonEvent implements Event {
6565
class DefaultEventFailureMetadata implements EventFailureMetadata {
6666
static final String FAILURE_METADATA = "_failure_metadata";
67+
static final String PLUGIN_ID = "pluginId";
68+
static final String PLUGIN_NAME = "pluginName";
69+
static final String PIPELINE_NAME = "pipelineName";
70+
static final String ERROR_MESSAGE = "errorMessage";
71+
6772

6873
public DefaultEventFailureMetadata with(String key, Object value) {
6974
put(FAILURE_METADATA+"/"+key, value);
7075
return this;
7176
}
77+
78+
public DefaultEventFailureMetadata withPluginId(String value) {
79+
return with(PLUGIN_ID, value);
80+
}
81+
82+
public DefaultEventFailureMetadata withPluginName(String value) {
83+
return with(PLUGIN_NAME, value);
84+
}
85+
86+
public DefaultEventFailureMetadata withPipelineName(String value) {
87+
return with(PIPELINE_NAME, value);
88+
}
89+
90+
public DefaultEventFailureMetadata withErrorMessage(Object value) {
91+
return with(ERROR_MESSAGE, value);
92+
}
7293
}
7394

7495
private static final Logger LOG = LoggerFactory.getLogger(JacksonEvent.class);

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,15 +202,28 @@ void testUpdateFailureMetadata() {
202202
@Test
203203
public void testDefaultEventFailureMetadata() {
204204
String eventType = UUID.randomUUID().toString();
205-
205+
String pluginId = UUID.randomUUID().toString();
206+
String pluginName = UUID.randomUUID().toString();
207+
String pipelineName = UUID.randomUUID().toString();
208+
String errorMessage = UUID.randomUUID().toString();
206209
Event event = JacksonEvent.builder()
207210
.withEventType(eventType)
208211
.build();
209212

210213
EventFailureMetadata eventFailureMetadata = event.updateFailureMetadata();
211214
eventFailureMetadata.with("key1", "value1").with("key2", 2);
212-
assertThat(event.get(JacksonEvent.DefaultEventFailureMetadata.FAILURE_METADATA+"/key1", String.class), equalTo("value1"));
213-
assertThat(event.get(JacksonEvent.DefaultEventFailureMetadata.FAILURE_METADATA+"/key2", Integer.class), equalTo(2));
215+
eventFailureMetadata.withPluginId(pluginId);
216+
eventFailureMetadata.withPluginName(pluginName);
217+
eventFailureMetadata.withPipelineName(pipelineName);
218+
eventFailureMetadata.withErrorMessage(errorMessage);
219+
String base = JacksonEvent.DefaultEventFailureMetadata.FAILURE_METADATA + "/";
220+
assertThat(event.get(base + "key1", String.class), equalTo("value1"));
221+
assertThat(event.get(base + "key2", Integer.class), equalTo(2));
222+
assertThat(event.get(base + JacksonEvent.DefaultEventFailureMetadata.PLUGIN_ID, String.class), equalTo(pluginId));
223+
assertThat(event.get(base + JacksonEvent.DefaultEventFailureMetadata.PLUGIN_NAME, String.class), equalTo(pluginName));
224+
assertThat(event.get(base + JacksonEvent.DefaultEventFailureMetadata.PIPELINE_NAME, String.class), equalTo(pipelineName));
225+
assertThat(event.get(base + JacksonEvent.DefaultEventFailureMetadata.ERROR_MESSAGE, String.class), equalTo(errorMessage));
226+
214227
}
215228

216229
@Test

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -649,10 +649,10 @@ private void logFailureForDlqObjects(final List<DlqObject> dlqObjects, final Thr
649649

650650
if (event != null) {
651651
event.updateFailureMetadata()
652-
.with("pluginId", dlqObject.getPluginId())
653-
.with("pluginName", dlqObject.getPluginName())
654-
.with("pipelineName", dlqObject.getPipelineName())
655-
.with("message", ((FailedDlqData) dlqObject.getFailedData()).getMessage())
652+
.withPluginId(dlqObject.getPluginId())
653+
.withPluginName(dlqObject.getPluginName())
654+
.withPipelineName(dlqObject.getPipelineName())
655+
.withErrorMessage(((FailedDlqData) dlqObject.getFailedData()).getMessage())
656656
.with("status", ((FailedDlqData) dlqObject.getFailedData()).getStatus())
657657
.with("index", ((FailedDlqData) dlqObject.getFailedData()).getIndex())
658658
.with("indexId", ((FailedDlqData) dlqObject.getFailedData()).getIndexId());

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,11 @@ public void addFailedEventsToDlq(final List<Event> failedEvents, final Throwable
9393
}
9494
event.updateFailureMetadata()
9595
.with("statusCode", statusCode)
96-
.with("pluginName", PLUGIN_NAME)
97-
.with("pipelineName", pipelineDescription.getPipelineName());
96+
.withPluginName(PLUGIN_NAME)
97+
.withPipelineName(pipelineDescription.getPipelineName());
9898
if (ex != null) {
9999
event.updateFailureMetadata()
100-
.with("message", ex.getMessage());
100+
.withErrorMessage(ex.getMessage());
101101
}
102102
dlqRecords.add(new Record<>(event));
103103
}

data-prepper-plugins/sqs-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkIT.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.plugins.sink.sqs;
77

8+
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
89
import org.opensearch.dataprepper.model.configuration.PluginSetting;
910
import org.opensearch.dataprepper.model.configuration.PluginModel;
1011
import org.opensearch.dataprepper.model.plugin.PluginFactory;
@@ -117,6 +118,9 @@ public class SqsSinkIT {
117118
@Mock
118119
private PluginModel codec;
119120

121+
@Mock
122+
private PipelineDescription pipelineDescription;
123+
120124
@Mock
121125
private Counter eventsSuccessCounter;
122126
@Mock
@@ -256,7 +260,7 @@ void setUp() {
256260
when(sqsSinkConfig.getCodec()).thenReturn(codec);
257261
when(sqsSinkConfig.getAwsConfig()).thenReturn(awsConfig);
258262
when(sqsSinkConfig.getDlq()).thenReturn(null);
259-
263+
when(pipelineDescription.getPipelineName()).thenReturn("test-pipeline");
260264
thresholdConfig = mock(SqsThresholdConfig.class);
261265
lenient().when(sqsSinkConfig.getMaxRetries()).thenReturn(3);
262266
when(thresholdConfig.getMaxEventsPerMessage()).thenReturn(1);
@@ -323,7 +327,7 @@ private void deleteObjectsWithPrefix(String bucketName, String prefix) {
323327
}
324328

325329
private SqsSink createObjectUnderTest() {
326-
return new SqsSink(pluginSetting, pluginMetrics, pluginFactory, sqsSinkConfig, sinkContext, expressionEvaluator, awsCredentialsSupplier);
330+
return new SqsSink(pluginSetting, pluginMetrics, pluginFactory, sqsSinkConfig, sinkContext, expressionEvaluator, awsCredentialsSupplier, pipelineDescription);
327331
}
328332

329333
private List<Message> getMessages(final String queueUrl) {

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSink.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.plugins.sink.sqs;
77

8+
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
89
import org.opensearch.dataprepper.model.configuration.PluginSetting;
910
import org.opensearch.dataprepper.metrics.PluginMetrics;
1011
import org.opensearch.dataprepper.model.configuration.PluginModel;
@@ -51,7 +52,8 @@ public SqsSink(final PluginSetting pluginSetting,
5152
final SqsSinkConfig sqsSinkConfig,
5253
final SinkContext sinkContext,
5354
final ExpressionEvaluator expressionEvaluator,
54-
final AwsCredentialsSupplier awsCredentialsSupplier) {
55+
final AwsCredentialsSupplier awsCredentialsSupplier,
56+
final PipelineDescription pipelineDescription) {
5557
super(pluginSetting);
5658
this.sqsSinkConfig = sqsSinkConfig;
5759
sinkInitialized = false;
@@ -79,7 +81,7 @@ public SqsSink(final PluginSetting pluginSetting,
7981
dlqPushHandler = new DlqPushHandler(pluginFactory, pluginSetting, pluginMetrics, sqsSinkConfig.getDlq(), region.toString(), role, "sqsSink");
8082
}
8183
final OutputCodec outputCodec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings);
82-
sqsSinkService = new SqsSinkService(sqsSinkConfig, sqsClient, expressionEvaluator, outputCodec, sinkContext, dlqPushHandler, pluginMetrics);
84+
sqsSinkService = new SqsSinkService(sqsSinkConfig, sqsClient, expressionEvaluator, outputCodec, sinkContext, dlqPushHandler, pluginMetrics, pluginSetting, pipelineDescription);
8385
}
8486

8587
private static AwsCredentialsOptions convertToCredentialOptions(final AwsConfig awsConfig) {
@@ -99,6 +101,7 @@ public boolean isReady() {
99101
@Override
100102
public void doInitialize() {
101103
sinkInitialized = true;
104+
sqsSinkService.setDlqPipeline(getFailurePipeline());
102105
}
103106

104107
/**

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkBatchEntry.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
public class SqsSinkBatchEntry {
1919
private final List<EventHandle> eventHandles;
20+
private final List<Event> events;
2021
private final String groupId;
2122
private final String deDupId;
2223
private final Buffer buffer;
@@ -29,6 +30,7 @@ public class SqsSinkBatchEntry {
2930

3031
public SqsSinkBatchEntry(final Buffer buffer, final String groupId, final String deDupId, final OutputCodec codec, final OutputCodecContext codecContext) {
3132
this.eventHandles = new ArrayList<>();
33+
this.events = new ArrayList<>();
3234
this.buffer = buffer;
3335
completed = false;
3436
this.groupId = groupId;
@@ -58,6 +60,7 @@ public void addEvent(final Event event) throws Exception {
5860
writer = codec.createWriter(buffer.getOutputStream(), null, codecContext);
5961
}
6062
writer.writeEvent(event);
63+
events.add(event);
6164
eventHandles.add(event.getEventHandle());
6265
eventCount++;
6366
}
@@ -91,5 +94,9 @@ public int getEventCount() {
9194
return eventCount;
9295
}
9396

97+
public List<Event> getEvents() {
98+
return events;
99+
}
100+
94101
}
95102

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkService.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
import org.opensearch.dataprepper.metrics.PluginMetrics;
1313
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
1414
import org.opensearch.dataprepper.model.codec.OutputCodec;
15+
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
16+
import org.opensearch.dataprepper.model.configuration.PluginSetting;
17+
import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline;
1518
import org.opensearch.dataprepper.model.sink.OutputCodecContext;
1619
import org.opensearch.dataprepper.plugins.accumulator.BufferFactory;
1720
import org.opensearch.dataprepper.plugins.accumulator.InMemoryBufferFactory;
@@ -62,16 +65,24 @@ public class SqsSinkService extends SqsSinkExecutor {
6265
private final SqsSinkMetrics sinkMetrics;
6366
private final DlqPushHandler dlqPushHandler;
6467
private final List<DlqObject> dlqObjects;
68+
private HeadlessPipeline dlqPipeline;
69+
private final List<Record<Event>> dlqPipelineRecords;
70+
private final PluginSetting pluginSetting;
71+
private final PipelineDescription pipelineDescription;
6572

6673
public SqsSinkService(final SqsSinkConfig sqsSinkConfig,
6774
final SqsClient sqsClient,
6875
final ExpressionEvaluator expressionEvaluator,
6976
final OutputCodec codec,
7077
final SinkContext sinkContext,
7178
final DlqPushHandler dlqPushHandler,
72-
final PluginMetrics pluginMetrics) {
79+
final PluginMetrics pluginMetrics,
80+
final PluginSetting pluginSetting,
81+
final PipelineDescription pipelineDescription
82+
) {
7383
batchUrlMap = new HashMap<>();
7484
dlqObjects = new ArrayList<>();
85+
dlqPipelineRecords = new ArrayList<>();
7586
inMemoryBufferFactory =new InMemoryBufferFactory();
7687
this.sqsClient = sqsClient;
7788
this.dlqPushHandler = dlqPushHandler;
@@ -82,7 +93,8 @@ public SqsSinkService(final SqsSinkConfig sqsSinkConfig,
8293
this.sqsSinkConfig = sqsSinkConfig;
8394
reentrantLock = new ReentrantLock();
8495
this.sinkMetrics = new SqsSinkMetrics(pluginMetrics);
85-
96+
this.pluginSetting = pluginSetting;
97+
this.pipelineDescription = pipelineDescription;
8698
queueUrl = sqsSinkConfig.getQueueUrl();
8799
isDynamicQueueUrl = queueUrl.contains("${");
88100
if (isDynamicQueueUrl) {
@@ -109,13 +121,23 @@ public SqsSinkService(final SqsSinkConfig sqsSinkConfig,
109121

110122
}
111123

124+
public void setDlqPipeline(HeadlessPipeline pipeline) {
125+
this.dlqPipeline = pipeline;
126+
}
127+
112128
@Override
113129
public boolean exceedsMaxEventSizeThreshold(final long estimatedSize) {
114130
return estimatedSize > MAX_EVENT_SIZE;
115131
}
116132

117133
@Override
118134
public void pushDLQList() {
135+
if (dlqPipeline != null && !dlqPipelineRecords.isEmpty()) {
136+
dlqPipeline.sendEvents(dlqPipelineRecords);
137+
dlqPipelineRecords.clear();
138+
return;
139+
}
140+
119141
// If DLQ push handler is null, dlqObjects list
120142
// would be empty
121143
if (dlqObjects.size() == 0) {
@@ -286,10 +308,24 @@ public boolean exceedsFlushTimeInterval() {
286308
}
287309

288310
private void addBatchEntryToDLQ(final SqsSinkBatchEntry batchEntry, final String errorMessage) {
289-
addMessageToDLQ(batchEntry.getBody(), batchEntry.getEventHandles(), errorMessage);
311+
addMessageToDLQ(batchEntry.getBody(), batchEntry.getEventHandles(), batchEntry.getEvents(), errorMessage);
290312
}
291313

292-
private void addMessageToDLQ(final String message, final List<EventHandle> eventHandles, final String errorMessage) {
314+
private void addMessageToDLQ(final String message, final List<EventHandle> eventHandles, List<Event> events, final String errorMessage) {
315+
if (dlqPipeline != null) {
316+
for (Event event: events) {
317+
if (event != null) {
318+
event.updateFailureMetadata()
319+
.withPluginId(pluginSetting.getName())
320+
.withPluginName(pluginSetting.getName())
321+
.withPipelineName(pipelineDescription.getPipelineName())
322+
.withErrorMessage(errorMessage)
323+
.with("sqsSinkQueueUrl", queueUrl);
324+
dlqPipelineRecords.add(new Record<>(event));
325+
}
326+
}
327+
return;
328+
}
293329
if (dlqPushHandler != null) {
294330
SqsSinkDlqData sqsSinkDlqData = SqsSinkDlqData.createDlqData(message, errorMessage);
295331
DlqObject dlqObject = DlqObject.createDlqObject(dlqPushHandler.getPluginSetting(), eventHandles, sqsSinkDlqData);
@@ -309,8 +345,9 @@ public void recordLatency(long amount, TimeUnit timeUnit) {
309345
@Override
310346
public void addEventToDLQList(final Event event, Throwable ex) {
311347
List<EventHandle> eventHandles = new ArrayList<>();
348+
List<Event> events = List.of(event);
312349
eventHandles.add(event.getEventHandle());
313-
addMessageToDLQ(event.toJsonString(), eventHandles, ex.getMessage());
350+
addMessageToDLQ(event.toJsonString(), eventHandles, events, ex.getMessage());
314351
}
315352

316353
@Override

data-prepper-plugins/sqs-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkBatchEntryTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ void TestBasic() {
6767
assertThat(sqsSinkBatchEntry.getGroupId(), equalTo(groupId));
6868
assertThat(sqsSinkBatchEntry.getDedupId(), equalTo(deDupId));
6969
assertTrue(sqsSinkBatchEntry.getEventHandles().isEmpty());
70+
assertTrue(sqsSinkBatchEntry.getEvents().isEmpty());
7071
}
7172

7273
@Test
@@ -83,6 +84,7 @@ void TestAddingOneEvent() throws Exception {
8384
assertThat(sqsSinkBatchEntry.getGroupId(), equalTo(groupId));
8485
assertThat(sqsSinkBatchEntry.getDedupId(), equalTo(deDupId));
8586
assertThat(sqsSinkBatchEntry.getEventHandles().size(), equalTo(1));
87+
assertThat(sqsSinkBatchEntry.getEvents().size(), equalTo(1));
8688
}
8789

8890

@@ -123,6 +125,7 @@ void TestAddingMultipleEvents(int numRecords) throws Exception {
123125
assertThat(sqsSinkBatchEntry.getDedupId(), equalTo(deDupId));
124126
assertThat(sqsSinkBatchEntry.getSize(), equalTo(expectedSize));
125127
assertThat(sqsSinkBatchEntry.getEventHandles().size(), equalTo(numRecords));
128+
assertThat(sqsSinkBatchEntry.getEvents().size(), equalTo(numRecords));
126129
}
127130

128131
@Test

0 commit comments

Comments
 (0)