From e3a76ada52acc00d7d9aaff16b1306cc66bdcfe3 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Fri, 10 Apr 2026 09:32:35 -0700 Subject: [PATCH] Drop Kinesis records with invalid UTF-8 bytes Wrap codec.parse() in a try-catch in KinesisRecordConverter to handle parse failures (e.g. invalid UTF-8 surrogates) gracefully. Failed records are logged and skipped instead of crashing the pipeline. Adds a recordParseErrors metric counter via PluginMetrics. Signed-off-by: Souvik Bose --- .../converter/KinesisRecordConverter.java | 19 +++++++++-- .../KinesisShardRecordProcessorFactory.java | 2 +- .../kinesis/source/KinesisServiceTest.java | 3 ++ .../converter/KinesisRecordConverterTest.java | 34 +++++++++++++++++-- 4 files changed, 53 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java index 3514670097..503c1a9a4a 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java @@ -10,6 +10,8 @@ package org.opensearch.dataprepper.plugins.kinesis.source.converter; +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.event.Event; @@ -18,6 +20,9 @@ import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.time.Instant; @@ -27,10 +32,14 @@ public class KinesisRecordConverter { + private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordConverter.class); + static final String RECORD_PARSE_ERRORS = "recordParseErrors"; private final InputCodec codec; + private final Counter recordParseErrors; - public KinesisRecordConverter(final InputCodec codec) { + public KinesisRecordConverter(final InputCodec codec, final PluginMetrics pluginMetrics) { this.codec = codec; + this.recordParseErrors = pluginMetrics.counter(RECORD_PARSE_ERRORS); } public List convert(final DecompressionEngine decompressionEngine, @@ -65,6 +74,12 @@ private void processRecord(final DecompressionEngine decompressionEngine, record.data().get(arr); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr); - codec.parse(decompressionEngine.createInputStream(byteArrayInputStream), eventConsumer); + try { + codec.parse(decompressionEngine.createInputStream(byteArrayInputStream), eventConsumer); + } catch (final Exception e) { + recordParseErrors.increment(); + LOG.error("Failed to parse Kinesis record. sequenceNumber={}, partitionKey={}", + record.sequenceNumber(), record.partitionKey(), e); + } } } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java index ff9943a41d..bdc15dd8d1 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java @@ -40,7 +40,7 @@ public KinesisShardRecordProcessorFactory(Buffer> buffer, this.buffer = buffer; this.acknowledgementSetManager = acknowledgementSetManager; this.pluginMetrics = pluginMetrics; - this.kinesisRecordConverter = new KinesisRecordConverter(codec); + this.kinesisRecordConverter = new KinesisRecordConverter(codec, pluginMetrics); } @Override diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java index 896faa4155..4b69af305d 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java @@ -10,6 +10,7 @@ package org.opensearch.dataprepper.plugins.kinesis.source; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -155,6 +156,8 @@ void setup() { kinesisLeaseConfig = mock(KinesisLeaseConfig.class); workerIdentifierGenerator = mock(WorkerIdentifierGenerator.class); kinesisLeaseCoordinationTableConfig = mock(KinesisLeaseCoordinationTableConfig.class); + pluginMetrics = mock(PluginMetrics.class); + when(pluginMetrics.counter(any(String.class))).thenReturn(mock(Counter.class)); when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig); when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn("kinesis-lease-table"); when(kinesisLeaseCoordinationTableConfig.getRegion()).thenReturn("us-east-1"); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java index 0f9081455e..72e3f54895 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java @@ -11,8 +11,10 @@ package org.opensearch.dataprepper.plugins.kinesis.source.converter; import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.event.TestEventFactory; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.codec.DecompressionEngine; import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.plugins.codec.CompressionOption; @@ -39,6 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -50,7 +53,9 @@ public class KinesisRecordConverterTest { @Test void testRecordConverter() throws IOException { InputCodec codec = mock(InputCodec.class); - KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(codec); + PluginMetrics pluginMetrics = mock(PluginMetrics.class); + when(pluginMetrics.counter(KinesisRecordConverter.RECORD_PARSE_ERRORS)).thenReturn(mock(Counter.class)); + KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(codec, pluginMetrics); DecompressionEngine decompressionEngine = CompressionOption.NONE.getDecompressionEngine(); doNothing().when(codec).parse(any(InputStream.class), any(Consumer.class)); @@ -81,7 +86,8 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException { } KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter( - new NdjsonInputCodec(new NdjsonInputConfig(), TestEventFactory.getTestEventFactory())); + new NdjsonInputCodec(new NdjsonInputConfig(), TestEventFactory.getTestEventFactory()), + PluginMetrics.fromNames("test", "pipeline")); final String partitionKey = UUID.randomUUID().toString(); final String sequenceNumber = UUID.randomUUID().toString(); @@ -108,6 +114,30 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException { }); } + @Test + void convert_doesNotThrowWhenCodecParseThrowsException() throws IOException { + InputCodec codec = mock(InputCodec.class); + PluginMetrics pluginMetrics = mock(PluginMetrics.class); + Counter recordParseErrors = mock(Counter.class); + when(pluginMetrics.counter(KinesisRecordConverter.RECORD_PARSE_ERRORS)).thenReturn(recordParseErrors); + KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(codec, pluginMetrics); + DecompressionEngine decompressionEngine = CompressionOption.NONE.getDecompressionEngine(); + doThrow(new IOException("Invalid UTF-8")).when(codec).parse(any(InputStream.class), any(Consumer.class)); + + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() + .data(ByteBuffer.wrap("bad data".getBytes())) + .sequenceNumber("seq-1") + .partitionKey("key-1") + .build(); + + List results = kinesisRecordConverter.convert( + decompressionEngine, List.of(kinesisClientRecord), streamId); + + assertEquals(0, results.size()); + verify(codec, times(1)).parse(any(InputStream.class), any(Consumer.class)); + verify(recordParseErrors, times(1)).increment(); + } + private static Map generateJson() { final Map jsonObject = new LinkedHashMap<>(); for (int i = 0; i < 1; i++) {