Skip to content

Commit cb5347a

Browse files
committed
Adding support for pipeline DLQ to SQS sink
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent fd8c66e commit cb5347a

7 files changed

Lines changed: 70 additions & 22 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/EventFailureMetadata.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,15 @@
1111

1212
public interface EventFailureMetadata {
1313
EventFailureMetadata with(String key, Object value);
14+
15+
EventFailureMetadata withPluginId(String value);
16+
17+
EventFailureMetadata withPluginName(String value);
18+
19+
EventFailureMetadata withPipelineName(String value);
20+
21+
EventFailureMetadata withErrorMessage(Object value);
22+
23+
1424
}
1525

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,32 @@
6464
public class JacksonEvent implements Event {
6565
class DefaultEventFailureMetadata implements EventFailureMetadata {
6666
static final String FAILURE_METADATA = "_failure_metadata";
67+
static final String PLUGIN_ID = "pluginId";
68+
static final String PLUGIN_NAME = "pluginName";
69+
static final String PIPELINE_NAME = "pipelineName";
70+
static final String ERROR_MESSAGE = "errorMessage";
71+
6772

6873
public DefaultEventFailureMetadata with(String key, Object value) {
6974
put(FAILURE_METADATA+"/"+key, value);
7075
return this;
7176
}
77+
78+
public DefaultEventFailureMetadata withPluginId(String value) {
79+
return with(PLUGIN_ID, value);
80+
}
81+
82+
public DefaultEventFailureMetadata withPluginName(String value) {
83+
return with(PLUGIN_NAME, value);
84+
}
85+
86+
public DefaultEventFailureMetadata withPipelineName(String value) {
87+
return with(PIPELINE_NAME, value);
88+
}
89+
90+
public DefaultEventFailureMetadata withErrorMessage(Object value) {
91+
return with(ERROR_MESSAGE, value);
92+
}
7293
}
7394

7495
private static final Logger LOG = LoggerFactory.getLogger(JacksonEvent.class);

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,15 +202,28 @@ void testUpdateFailureMetadata() {
202202
@Test
203203
public void testDefaultEventFailureMetadata() {
204204
String eventType = UUID.randomUUID().toString();
205-
205+
String pluginId = UUID.randomUUID().toString();
206+
String pluginName = UUID.randomUUID().toString();
207+
String pipelineName = UUID.randomUUID().toString();
208+
String errorMessage = UUID.randomUUID().toString();
206209
Event event = JacksonEvent.builder()
207210
.withEventType(eventType)
208211
.build();
209212

210213
EventFailureMetadata eventFailureMetadata = event.updateFailureMetadata();
211214
eventFailureMetadata.with("key1", "value1").with("key2", 2);
212-
assertThat(event.get(JacksonEvent.DefaultEventFailureMetadata.FAILURE_METADATA+"/key1", String.class), equalTo("value1"));
213-
assertThat(event.get(JacksonEvent.DefaultEventFailureMetadata.FAILURE_METADATA+"/key2", Integer.class), equalTo(2));
215+
eventFailureMetadata.withPluginId(pluginId);
216+
eventFailureMetadata.withPluginName(pluginName);
217+
eventFailureMetadata.withPipelineName(pipelineName);
218+
eventFailureMetadata.withErrorMessage(errorMessage);
219+
String base = JacksonEvent.DefaultEventFailureMetadata.FAILURE_METADATA + "/";
220+
assertThat(event.get(base + "key1", String.class), equalTo("value1"));
221+
assertThat(event.get(base + "key2", Integer.class), equalTo(2));
222+
assertThat(event.get(base + JacksonEvent.DefaultEventFailureMetadata.PLUGIN_ID, String.class), equalTo(pluginId));
223+
assertThat(event.get(base + JacksonEvent.DefaultEventFailureMetadata.PLUGIN_NAME, String.class), equalTo(pluginName));
224+
assertThat(event.get(base + JacksonEvent.DefaultEventFailureMetadata.PIPELINE_NAME, String.class), equalTo(pipelineName));
225+
assertThat(event.get(base + JacksonEvent.DefaultEventFailureMetadata.ERROR_MESSAGE, String.class), equalTo(errorMessage));
226+
214227
}
215228

216229
@Test

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -641,10 +641,10 @@ private void logFailureForDlqObjects(final List<DlqObject> dlqObjects, final Thr
641641

642642
if (event != null) {
643643
event.updateFailureMetadata()
644-
.with("pluginId", dlqObject.getPluginId())
645-
.with("pluginName", dlqObject.getPluginName())
646-
.with("pipelineName", dlqObject.getPipelineName())
647-
.with("message", ((FailedDlqData) dlqObject.getFailedData()).getMessage())
644+
.withPluginId(dlqObject.getPluginId())
645+
.withPluginName(dlqObject.getPluginName())
646+
.withPipelineName(dlqObject.getPipelineName())
647+
.withErrorMessage(((FailedDlqData) dlqObject.getFailedData()).getMessage())
648648
.with("status", ((FailedDlqData) dlqObject.getFailedData()).getStatus())
649649
.with("index", ((FailedDlqData) dlqObject.getFailedData()).getIndex())
650650
.with("indexId", ((FailedDlqData) dlqObject.getFailedData()).getIndexId());

data-prepper-plugins/prometheus-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/prometheus/service/PrometheusSinkService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,11 @@ public void addFailedEventsToDlq(final List<Event> failedEvents, final Throwable
9393
}
9494
event.updateFailureMetadata()
9595
.with("statusCode", statusCode)
96-
.with("pluginName", PLUGIN_NAME)
97-
.with("pipelineName", pipelineDescription.getPipelineName());
96+
.withPluginName(PLUGIN_NAME)
97+
.withPipelineName(pipelineDescription.getPipelineName());
9898
if (ex != null) {
9999
event.updateFailureMetadata()
100-
.with("message", ex.getMessage());
100+
.withErrorMessage(ex.getMessage());
101101
}
102102
dlqRecords.add(new Record<>(event));
103103
}

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -316,11 +316,11 @@ private void addMessageToDLQ(final String message, final List<EventHandle> event
316316
for (Event event: events) {
317317
if (event != null) {
318318
event.updateFailureMetadata()
319-
.with("pluginId", pluginSetting.getName())
320-
.with("pluginName", pluginSetting.getName())
321-
.with("pipelineName", pipelineDescription.getPipelineName())
322-
.with("sqsSinkQueueUrl", queueUrl)
323-
.with("message", errorMessage);
319+
.withPluginId(pluginSetting.getName())
320+
.withPluginName(pluginSetting.getName())
321+
.withPipelineName(pipelineDescription.getPipelineName())
322+
.withErrorMessage(errorMessage)
323+
.with("sqsSinkQueueUrl", queueUrl);
324324
dlqPipelineRecords.add(new Record<>(event));
325325
}
326326
}

data-prepper-plugins/sqs-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkServiceTest.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ class SqsSinkServiceTest {
128128
private OutputCodecContext outputCodecContext;
129129
private String queueUrl;
130130

131+
private String pluginName;
132+
private String pipelineName;
133+
131134
private SqsSinkService createObjectUnderTest() {
132135
return new SqsSinkService(sqsSinkConfig, sqsClient, expressionEvaluator, outputCodec, sinkContext, dlqPushHandler,
133136
pluginMetrics, pluginSetting, pipelineDescription);
@@ -151,14 +154,16 @@ void setup() {
151154
when (thresholdConfig.getMaxMessageSizeBytes()).thenReturn(256*1024L);
152155
when (thresholdConfig.getMaxEventsPerMessage()).thenReturn(1);
153156
when (sqsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig);
154-
when (pipelineDescription.getPipelineName()).thenReturn("pipeline");
157+
pipelineName = UUID.randomUUID().toString();
158+
when (pipelineDescription.getPipelineName()).thenReturn(pipelineName);
155159
lenient().when (sqsSinkConfig.getMaxRetries()).thenReturn(3);
156160
lenient().when(flushResponse.hasFailed()).thenReturn(false);
157161
when(sqsClient.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(flushResponse);
158162
when(expressionEvaluator.isValidFormatExpression(anyString())).thenReturn(true);
159163
when(dlqPushHandler.perform(any(List.class))).thenReturn(true);
160-
lenient().when(pluginSetting.getName()).thenReturn("name");
161-
lenient().when(pluginSetting.getPipelineName()).thenReturn("pipeline");
164+
pluginName = UUID.randomUUID().toString();
165+
lenient().when(pluginSetting.getName()).thenReturn(pluginName);
166+
lenient().when(pluginSetting.getPipelineName()).thenReturn(pipelineName);
162167
when(dlqPushHandler.getPluginSetting()).thenReturn(pluginSetting);
163168
lenient().doNothing().when(summary).record(any(Double.class));
164169
lenient().doNothing().when(timer).record(any(Long.class), any(TimeUnit.class));
@@ -535,7 +540,6 @@ void TestWithOneBatch_RetryFlushes() throws Exception {
535540
@Test
536541
void TestLargeRecordToPipelineDLQ() {
537542
dlqPipeline = mock(HeadlessPipeline.class);
538-
539543
List<Record<Event>> capturedRecords = new ArrayList<>();
540544
doAnswer(invocation -> {
541545
List<Record<Event>> arg = invocation.getArgument(0);
@@ -551,9 +555,9 @@ void TestLargeRecordToPipelineDLQ() {
551555
assertThat(capturedRecords.size(), equalTo(1));
552556

553557
Event event = capturedRecords.get(0).getData();
554-
assertThat(event.get("_failure_metadata/pluginId", String.class), equalTo("name"));
555-
assertThat(event.get("_failure_metadata/pluginName", String.class), equalTo("name"));
556-
assertThat(event.get("_failure_metadata/pipelineName", String.class), equalTo("pipeline"));
558+
assertThat(event.get("_failure_metadata/pluginId", String.class), equalTo(pluginName));
559+
assertThat(event.get("_failure_metadata/pluginName", String.class), equalTo(pluginName));
560+
assertThat(event.get("_failure_metadata/pipelineName", String.class), equalTo(pipelineName));
557561
assertThat(event.get("_failure_metadata/sqsSinkQueueUrl", String.class), equalTo(queueUrl));
558562
verify(eventHandle, never()).release(anyBoolean());
559563
}

0 commit comments

Comments
 (0)