From de9a285700b7ad4279e030f71e8e38b3426d8247 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Tue, 21 Apr 2026 15:29:02 -0500 Subject: [PATCH] Add forward_to support along with failure_pipeline support for kafka sink Signed-off-by: Taylor Gray --- .../kafka/producer/KafkaCustomProducer.java | 66 ++++++-- .../producer/KafkaCustomProducerFactory.java | 24 ++- .../plugins/kafka/sink/KafkaSink.java | 7 +- .../producer/KafkaCustomProducerTest.java | 150 ++++++++++++++++++ .../plugins/kafka/sink/KafkaSinkTest.java | 2 +- 5 files changed, 235 insertions(+), 14 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java index 9fdec08382..581bd2bfe9 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducer.java @@ -24,7 +24,10 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.model.sink.SinkForwardRecordsContext; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties; @@ -40,6 +43,7 @@ import java.io.OutputStream; import java.util.Collection; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.Future; @@ -79,6 +83,12 @@ public class KafkaCustomProducer { private final CompressionOption compressionConfig; + private final SinkContext sinkContext; + + private final SinkForwardRecordsContext sinkForwardRecordsContext; + + private final HeadlessPipeline failurePipeline; + public KafkaCustomProducer(final KafkaProducer producer, final KafkaProducerConfig kafkaProducerConfig, final DLQSink dlqSink, @@ -86,7 +96,10 @@ public KafkaCustomProducer(final KafkaProducer producer, final String tagTargetKey, final KafkaTopicProducerMetrics topicMetrics, final SchemaService schemaService, - final CompressionOption compressionConfig + final CompressionOption compressionConfig, + final SinkContext sinkContext, + final SinkForwardRecordsContext sinkForwardRecordsContext, + final HeadlessPipeline failurePipeline ) { this.producer = producer; this.kafkaProducerConfig = kafkaProducerConfig; @@ -100,6 +113,9 @@ public KafkaCustomProducer(final KafkaProducer producer, this.topicMetrics = topicMetrics; this.topicMetrics.register(this.producer); this.compressionConfig = (compressionConfig == null) ? CompressionOption.NONE: compressionConfig; + this.sinkContext = sinkContext; + this.sinkForwardRecordsContext = sinkForwardRecordsContext; + this.failurePipeline = failurePipeline; } public KafkaCustomProducer(final KafkaProducer producer, @@ -110,7 +126,7 @@ public KafkaCustomProducer(final KafkaProducer producer, final KafkaTopicProducerMetrics topicMetrics, final SchemaService schemaService ) { - this(producer, kafkaProducerConfig, dlqSink, expressionEvaluator, tagTargetKey, topicMetrics, schemaService, null); + this(producer, kafkaProducerConfig, dlqSink, expressionEvaluator, tagTargetKey, topicMetrics, schemaService, null, null, null, null); } KafkaTopicProducerMetrics getTopicMetrics() { @@ -162,7 +178,14 @@ public void produceRecords(final Record record) throws Exception { } catch (Exception e) { LOG.error("Error occurred while publishing record {}", e.getMessage()); topicMetrics.getNumberOfRecordSendErrors().increment(); - if (dlqSink != null) { + if (failurePipeline != null) { + record.getData().updateFailureMetadata() + .withPluginName("kafka") + .withPipelineName(kafkaProducerConfig.getTopic() != null ? topicName : null) + .withErrorMessage(e.getMessage()) + .with("topic", topicName); + failurePipeline.sendEvents(List.of(record)); + } else if (dlqSink != null) { JsonNode dataNode = record.getData().getJsonNode(); dlqSink.perform(dataNode, e); } else { @@ -182,7 +205,7 @@ private void publishJsonMessageAsBytes(Record record, String key) throws compressedOutputStream.write(bytes); compressedOutputStream.close(); - send(topicName, key, byteArrayOutputStream.toByteArray()); + send(topicName, key, byteArrayOutputStream.toByteArray(), record); } private Event getEvent(final Record record) { @@ -197,7 +220,7 @@ private Event getEvent(final Record record) { private void publishPlaintextMessage(final Record record, final String key) throws Exception { - send(topicName, key, record.getData().toJsonString()); + send(topicName, key, record.getData().toJsonString(), record); } private void publishAvroMessage(final Record record, final String key) throws Exception { @@ -206,20 +229,24 @@ private void publishAvroMessage(final Record record, final String key) th throw new RuntimeException("Schema definition is mandatory in case of type avro"); } final GenericRecord genericRecord = getGenericRecord(record.getData(), avroSchema); - send(topicName, key, genericRecord); + send(topicName, key, genericRecord, record); } - Future send(final String topicName, String key, final Object record) throws Exception { + Future send(final String topicName, String key, final Object record, final Record originalRecord) throws Exception { ProducerRecord producerRecord = Objects.isNull(key) ? new ProducerRecord(topicName, record) : new ProducerRecord(topicName, key, record); - return producer.send(producerRecord, callBack(record)); + return producer.send(producerRecord, callBack(originalRecord)); + } + + Future send(final String topicName, String key, final Object record) throws Exception { + return send(topicName, key, record, null); } private void publishJsonMessage(final Record record, final String key) throws IOException, ProcessingException, Exception { JsonNode dataNode = record.getData().getJsonNode(); - send(topicName, key, dataNode); + send(topicName, key, dataNode, record); } public boolean validateSchema(final String jsonData, final String schemaJson) throws IOException, ProcessingException { @@ -232,13 +259,30 @@ public boolean validateSchema(final String jsonData, final String schemaJson) th return report != null ? report.isSuccess() : false; } - private Callback callBack(final Object dataForDlq) { + private Callback callBack(final Record originalRecord) { return (metadata, exception) -> { if (null != exception) { LOG.error("Error occurred while publishing {}", exception.getMessage()); topicMetrics.getNumberOfRecordProcessingErrors().increment(); + if (failurePipeline != null && originalRecord != null) { + originalRecord.getData().updateFailureMetadata() + .withPluginName("kafka") + .withPipelineName(kafkaProducerConfig.getTopic() != null ? topicName : null) + .withErrorMessage(exception.getMessage()) + .with("topic", topicName); + failurePipeline.sendEvents(List.of(originalRecord)); + } else if (dlqSink != null && originalRecord != null) { + dlqSink.perform(originalRecord.getData().getJsonNode(), exception); + } else { + releaseEventHandles(false); + } } else { - releaseEventHandles(true); + if (sinkContext != null && sinkForwardRecordsContext != null && sinkContext.getForwardToPipelines().size() > 0) { + sinkForwardRecordsContext.addRecord(originalRecord); + sinkContext.forwardRecords(sinkForwardRecordsContext, null, null); + } else { + releaseEventHandles(true); + } } }; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java index b1fba3521e..cfa8648f4a 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerFactory.java @@ -11,7 +11,9 @@ import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.model.sink.SinkForwardRecordsContext; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfig; import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfigAdapter; @@ -61,11 +63,30 @@ public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProduce return createProducer(kafkaProducerConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, topicNameInMetrics, CompressionOption.NONE); } + public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProducerConfig, + final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics, + final DLQSink dlqSink, + final boolean topicNameInMetrics, + final SinkForwardRecordsContext sinkForwardRecordsContext, + final HeadlessPipeline failurePipeline) { + return createProducerInternal(kafkaProducerConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, topicNameInMetrics, CompressionOption.NONE, sinkForwardRecordsContext, failurePipeline); + } + public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProducerConfig, final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics, final DLQSink dlqSink, final boolean topicNameInMetrics, final CompressionOption manualCompressionConfig) { + return createProducerInternal(kafkaProducerConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, topicNameInMetrics, manualCompressionConfig, null, null); + } + + private KafkaCustomProducer createProducerInternal(final KafkaProducerConfig kafkaProducerConfig, + final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics, + final DLQSink dlqSink, + final boolean topicNameInMetrics, + final CompressionOption manualCompressionConfig, + final SinkForwardRecordsContext sinkForwardRecordsContext, + final HeadlessPipeline failurePipeline) { AwsContext awsContext = new AwsContext(kafkaProducerConfig, awsCredentialsSupplier); KeyFactory keyFactory = new KeyFactory(awsContext); // If either or both of Producer's max_request_size or @@ -95,7 +116,8 @@ public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProduce final SchemaService schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaProducerConfig.getSchemaConfig()).build(); return new KafkaCustomProducer(producer, kafkaProducerConfig, dlqSink, - expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, topicMetrics, schemaService, manualCompressionConfig); + expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, topicMetrics, schemaService, manualCompressionConfig, + sinkContext, sinkForwardRecordsContext, failurePipeline); } private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig, final Integer maxRequestSize) { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java index 97c93d22de..65b1006cf6 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.model.sink.AbstractSink; import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.model.sink.SinkForwardRecordsContext; import org.opensearch.dataprepper.plugins.kafka.common.serialization.CommonSerializationFactory; import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; @@ -73,6 +74,8 @@ public class KafkaSink extends AbstractSink> { private final SinkContext sinkContext; + private final SinkForwardRecordsContext sinkForwardRecordsContext; + @DataPrepperPluginConstructor public KafkaSink(final PluginSetting pluginSetting, final KafkaSinkConfig kafkaSinkConfig, final PluginFactory pluginFactory, @@ -86,6 +89,7 @@ public KafkaSink(final PluginSetting pluginSetting, final KafkaSinkConfig kafkaS this.expressionEvaluator = expressionEvaluator; reentrantLock = new ReentrantLock(); this.sinkContext = sinkContext; + this.sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); SerializationFactory serializationFactory = new CommonSerializationFactory(); topicServiceFactory = new TopicServiceFactory(); @@ -167,7 +171,8 @@ private void checkTopicCreationCriteriaAndCreateTopic() { private KafkaCustomProducer createProducer() { final DLQSink dlqSink = new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting); - return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, true); + return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, expressionEvaluator, sinkContext, pluginMetrics, dlqSink, true, + sinkForwardRecordsContext, getFailurePipeline()); } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java index ec12e65f97..424333bd50 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java @@ -26,8 +26,12 @@ import org.mockito.quality.Strictness; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFailureMetadata; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.pipeline.HeadlessPipeline; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.model.sink.SinkForwardRecordsContext; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; @@ -38,6 +42,9 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -205,6 +212,60 @@ public void producePlainTextRecords_callbackException() throws Exception { verify(numberOfRecordProcessingError).increment(); } + @Test + public void producePlainTextRecords_callbackException_sendsToFailurePipeline() throws Exception { + when(kafkaSinkConfig.getSerdeFormat()).thenReturn("plaintext"); + KafkaProducer kafkaProducer = mock(KafkaProducer.class); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); + when(kafkaTopicProducerMetrics.getNumberOfRecordProcessingErrors()).thenReturn(numberOfRecordProcessingError); + + EventFailureMetadata failureMetadata = mock(EventFailureMetadata.class); + when(failureMetadata.withPluginName(any())).thenReturn(failureMetadata); + when(failureMetadata.withPipelineName(any())).thenReturn(failureMetadata); + when(failureMetadata.withErrorMessage(any())).thenReturn(failureMetadata); + when(failureMetadata.with(any(), any())).thenReturn(failureMetadata); + + event = (JacksonEvent) JacksonEvent.fromMessage(UUID.randomUUID().toString()); + JacksonEvent spyEvent = spy(event); + when(spyEvent.updateFailureMetadata()).thenReturn(failureMetadata); + record = new Record<>(spyEvent); + + producer = new KafkaCustomProducer(kafkaProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), + null, kafkaTopicProducerMetrics, schemaService, null, + null, null, failurePipeline); + sinkProducer = spy(producer); + sinkProducer.produceRecords(record); + + final ArgumentCaptor callbackArgumentCaptor = ArgumentCaptor.forClass(Callback.class); + verify(kafkaProducer).send(any(ProducerRecord.class), callbackArgumentCaptor.capture()); + callbackArgumentCaptor.getValue().onCompletion(null, new RuntimeException("kafka error")); + + verify(numberOfRecordProcessingError).increment(); + verify(failurePipeline).sendEvents(any()); + verify(failureMetadata).withPluginName("kafka"); + verify(failureMetadata).with("topic", "test-topic"); + } + + @Test + public void producePlainTextRecords_callbackException_fallsBackToDlq() throws Exception { + when(kafkaSinkConfig.getSerdeFormat()).thenReturn("plaintext"); + KafkaProducer kafkaProducer = mock(KafkaProducer.class); + when(kafkaTopicProducerMetrics.getNumberOfRecordProcessingErrors()).thenReturn(numberOfRecordProcessingError); + + producer = new KafkaCustomProducer(kafkaProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), + null, kafkaTopicProducerMetrics, schemaService, null, + null, null, null); + sinkProducer = spy(producer); + sinkProducer.produceRecords(record); + + final ArgumentCaptor callbackArgumentCaptor = ArgumentCaptor.forClass(Callback.class); + verify(kafkaProducer).send(any(ProducerRecord.class), callbackArgumentCaptor.capture()); + callbackArgumentCaptor.getValue().onCompletion(null, new RuntimeException("kafka error")); + + verify(numberOfRecordProcessingError).increment(); + verify(dlqSink).perform(any(), any()); + } + @Test public void produceJsonRecordsTest() throws Exception { when(kafkaSinkConfig.getSerdeFormat()).thenReturn("JSON"); @@ -277,5 +338,94 @@ public void validateSchema() throws IOException, ProcessingException { String jsonSchema2 = "{\"type\": \"object\",\"properties\": {\"Year\": {\"type\": \"string\"},\"Age\": {\"type\": \"string\"},\"Ethnic\": {\"type\":\"string\",\"default\": null}}}"; assertTrue(producer.validateSchema(jsonSchema, jsonSchema2)); } + + @Test + public void producePlainTextRecords_callbackSuccess_forwardsToForwardPipeline() throws Exception { + when(kafkaSinkConfig.getSerdeFormat()).thenReturn("plaintext"); + KafkaProducer kafkaProducer = mock(KafkaProducer.class); + SinkContext sinkContext = mock(SinkContext.class); + HeadlessPipeline forwardPipeline = mock(HeadlessPipeline.class); + Map forwardPipelines = new HashMap<>(); + forwardPipelines.put("forward-pipeline", forwardPipeline); + when(sinkContext.getForwardToPipelines()).thenReturn(forwardPipelines); + SinkForwardRecordsContext sinkForwardRecordsContext = new SinkForwardRecordsContext(sinkContext); + + producer = new KafkaCustomProducer(kafkaProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), + null, kafkaTopicProducerMetrics, schemaService, null, + sinkContext, sinkForwardRecordsContext, null); + sinkProducer = spy(producer); + sinkProducer.produceRecords(record); + + final ArgumentCaptor callbackArgumentCaptor = ArgumentCaptor.forClass(Callback.class); + verify(kafkaProducer).send(any(ProducerRecord.class), callbackArgumentCaptor.capture()); + callbackArgumentCaptor.getValue().onCompletion(mock(org.apache.kafka.clients.producer.RecordMetadata.class), null); + + verify(sinkContext).forwardRecords(eq(sinkForwardRecordsContext), eq(null), eq(null)); + } + + @Test + public void producePlainTextRecords_callbackSuccess_releasesHandlesWhenNoForwardPipelines() throws Exception { + when(kafkaSinkConfig.getSerdeFormat()).thenReturn("plaintext"); + KafkaProducer kafkaProducer = mock(KafkaProducer.class); + SinkContext sinkContext = mock(SinkContext.class); + when(sinkContext.getForwardToPipelines()).thenReturn(Collections.emptyMap()); + + producer = new KafkaCustomProducer(kafkaProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), + null, kafkaTopicProducerMetrics, schemaService, null, + sinkContext, null, null); + sinkProducer = spy(producer); + sinkProducer.produceRecords(record); + + final ArgumentCaptor callbackArgumentCaptor = ArgumentCaptor.forClass(Callback.class); + verify(kafkaProducer).send(any(ProducerRecord.class), callbackArgumentCaptor.capture()); + callbackArgumentCaptor.getValue().onCompletion(mock(org.apache.kafka.clients.producer.RecordMetadata.class), null); + + verify(sinkContext, org.mockito.Mockito.never()).forwardRecords(any(), any(), any()); + } + + @Test + public void produceRecords_sendError_sendsToFailurePipeline() throws Exception { + when(kafkaSinkConfig.getSerdeFormat()).thenReturn("plaintext"); + KafkaProducer kafkaProducer = mock(KafkaProducer.class); + HeadlessPipeline failurePipeline = mock(HeadlessPipeline.class); + when(kafkaTopicProducerMetrics.getNumberOfRecordSendErrors()).thenReturn(numberOfRecordSendErrors); + when(kafkaProducer.send(any(ProducerRecord.class), any(Callback.class))).thenThrow(new KafkaException("test error")); + + EventFailureMetadata failureMetadata = mock(EventFailureMetadata.class); + when(failureMetadata.withPluginName(any())).thenReturn(failureMetadata); + when(failureMetadata.withPipelineName(any())).thenReturn(failureMetadata); + when(failureMetadata.withErrorMessage(any())).thenReturn(failureMetadata); + when(failureMetadata.with(any(), any())).thenReturn(failureMetadata); + + event = (JacksonEvent) JacksonEvent.fromMessage(UUID.randomUUID().toString()); + JacksonEvent spyEvent = spy(event); + when(spyEvent.updateFailureMetadata()).thenReturn(failureMetadata); + record = new Record<>(spyEvent); + + producer = new KafkaCustomProducer(kafkaProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), + null, kafkaTopicProducerMetrics, schemaService, null, + null, null, failurePipeline); + producer.produceRecords(record); + + verify(failurePipeline).sendEvents(any()); + verify(failureMetadata).withPluginName("kafka"); + verify(failureMetadata).with("topic", "test-topic"); + verifyNoInteractions(dlqSink); + } + + @Test + public void produceRecords_sendError_fallsBackToDlqWhenNoFailurePipeline() throws Exception { + when(kafkaSinkConfig.getSerdeFormat()).thenReturn("plaintext"); + KafkaProducer kafkaProducer = mock(KafkaProducer.class); + when(kafkaTopicProducerMetrics.getNumberOfRecordSendErrors()).thenReturn(numberOfRecordSendErrors); + when(kafkaProducer.send(any(ProducerRecord.class), any(Callback.class))).thenThrow(new KafkaException("test error")); + + producer = new KafkaCustomProducer(kafkaProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class), + null, kafkaTopicProducerMetrics, schemaService, null, + null, null, null); + producer.produceRecords(record); + + verify(dlqSink).perform(any(), any()); + } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java index 6d57c7803a..bc0d9dd284 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSinkTest.java @@ -134,7 +134,7 @@ public void after() { private KafkaSink createObjectUnderTest() { final KafkaSink objectUnderTest; try(final MockedConstruction ignored = mockConstruction(KafkaCustomProducerFactory.class, (mock, context) -> { - when(mock.createProducer(any(), any(), any(), any(), any(), anyBoolean())).thenReturn(kafkaCustomProducer); + when(mock.createProducer(any(), any(), any(), any(), any(), anyBoolean(), any(), any())).thenReturn(kafkaCustomProducer); })) { objectUnderTest = new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactoryMock, pluginMetrics, mock(ExpressionEvaluator.class), sinkContext, awsCredentialsSupplier); }