Skip to content

Commit 332c4ce

Browse files
sb2k16sbose2k21
andauthored
Drop Kinesis records with invalid UTF-8 bytes (#6746)
Wrap codec.parse() in a try-catch in KinesisRecordConverter to handle parse failures (e.g. invalid UTF-8 surrogates) gracefully. Failed records are logged and skipped instead of crashing the pipeline. Adds a recordParseErrors metric counter via PluginMetrics. Signed-off-by: Souvik Bose <souvbose@amazon.com> Co-authored-by: Souvik Bose <souvbose@amazon.com>
1 parent 8f4589f commit 332c4ce

4 files changed

Lines changed: 53 additions & 5 deletions

File tree

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
package org.opensearch.dataprepper.plugins.kinesis.source.converter;
1212

13+
import io.micrometer.core.instrument.Counter;
14+
import org.opensearch.dataprepper.metrics.PluginMetrics;
1315
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
1416
import org.opensearch.dataprepper.model.codec.InputCodec;
1517
import org.opensearch.dataprepper.model.event.Event;
@@ -18,6 +20,9 @@
1820
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord;
1921
import software.amazon.kinesis.retrieval.KinesisClientRecord;
2022

23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
2126
import java.io.ByteArrayInputStream;
2227
import java.io.IOException;
2328
import java.time.Instant;
@@ -27,10 +32,14 @@
2732

2833
public class KinesisRecordConverter {
2934

35+
private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordConverter.class);
36+
static final String RECORD_PARSE_ERRORS = "recordParseErrors";
3037
private final InputCodec codec;
38+
private final Counter recordParseErrors;
3139

32-
public KinesisRecordConverter(final InputCodec codec) {
40+
public KinesisRecordConverter(final InputCodec codec, final PluginMetrics pluginMetrics) {
3341
this.codec = codec;
42+
this.recordParseErrors = pluginMetrics.counter(RECORD_PARSE_ERRORS);
3443
}
3544

3645
public List<KinesisInputOutputRecord> convert(final DecompressionEngine decompressionEngine,
@@ -65,6 +74,12 @@ private void processRecord(final DecompressionEngine decompressionEngine,
6574
record.data().get(arr);
6675
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr);
6776

68-
codec.parse(decompressionEngine.createInputStream(byteArrayInputStream), eventConsumer);
77+
try {
78+
codec.parse(decompressionEngine.createInputStream(byteArrayInputStream), eventConsumer);
79+
} catch (final Exception e) {
80+
recordParseErrors.increment();
81+
LOG.error("Failed to parse Kinesis record. sequenceNumber={}, partitionKey={}",
82+
record.sequenceNumber(), record.partitionKey(), e);
83+
}
6984
}
7085
}

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisShardRecordProcessorFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public KinesisShardRecordProcessorFactory(Buffer<Record<Event>> buffer,
4040
this.buffer = buffer;
4141
this.acknowledgementSetManager = acknowledgementSetManager;
4242
this.pluginMetrics = pluginMetrics;
43-
this.kinesisRecordConverter = new KinesisRecordConverter(codec);
43+
this.kinesisRecordConverter = new KinesisRecordConverter(codec, pluginMetrics);
4444
}
4545

4646
@Override

data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
package org.opensearch.dataprepper.plugins.kinesis.source;
1212

13+
import io.micrometer.core.instrument.Counter;
1314
import org.junit.jupiter.api.BeforeEach;
1415
import org.junit.jupiter.api.Test;
1516
import org.mockito.Mock;
@@ -155,6 +156,8 @@ void setup() {
155156
kinesisLeaseConfig = mock(KinesisLeaseConfig.class);
156157
workerIdentifierGenerator = mock(WorkerIdentifierGenerator.class);
157158
kinesisLeaseCoordinationTableConfig = mock(KinesisLeaseCoordinationTableConfig.class);
159+
pluginMetrics = mock(PluginMetrics.class);
160+
when(pluginMetrics.counter(any(String.class))).thenReturn(mock(Counter.class));
158161
when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig);
159162
when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn("kinesis-lease-table");
160163
when(kinesisLeaseCoordinationTableConfig.getRegion()).thenReturn("us-east-1");

data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
package org.opensearch.dataprepper.plugins.kinesis.source.converter;
1212

1313
import com.fasterxml.jackson.databind.ObjectMapper;
14+
import io.micrometer.core.instrument.Counter;
1415
import org.junit.jupiter.api.Test;
1516
import org.opensearch.dataprepper.event.TestEventFactory;
17+
import org.opensearch.dataprepper.metrics.PluginMetrics;
1618
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
1719
import org.opensearch.dataprepper.model.codec.InputCodec;
1820
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
@@ -39,6 +41,7 @@
3941
import static org.junit.jupiter.api.Assertions.assertEquals;
4042
import static org.mockito.ArgumentMatchers.any;
4143
import static org.mockito.Mockito.doNothing;
44+
import static org.mockito.Mockito.doThrow;
4245
import static org.mockito.Mockito.mock;
4346
import static org.mockito.Mockito.times;
4447
import static org.mockito.Mockito.verify;
@@ -50,7 +53,9 @@ public class KinesisRecordConverterTest {
5053
@Test
5154
void testRecordConverter() throws IOException {
5255
InputCodec codec = mock(InputCodec.class);
53-
KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(codec);
56+
PluginMetrics pluginMetrics = mock(PluginMetrics.class);
57+
when(pluginMetrics.counter(KinesisRecordConverter.RECORD_PARSE_ERRORS)).thenReturn(mock(Counter.class));
58+
KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(codec, pluginMetrics);
5459
DecompressionEngine decompressionEngine = CompressionOption.NONE.getDecompressionEngine();
5560
doNothing().when(codec).parse(any(InputStream.class), any(Consumer.class));
5661

@@ -81,7 +86,8 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException {
8186
}
8287

8388
KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(
84-
new NdjsonInputCodec(new NdjsonInputConfig(), TestEventFactory.getTestEventFactory()));
89+
new NdjsonInputCodec(new NdjsonInputConfig(), TestEventFactory.getTestEventFactory()),
90+
PluginMetrics.fromNames("test", "pipeline"));
8591

8692
final String partitionKey = UUID.randomUUID().toString();
8793
final String sequenceNumber = UUID.randomUUID().toString();
@@ -108,6 +114,30 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException {
108114
});
109115
}
110116

117+
@Test
118+
void convert_doesNotThrowWhenCodecParseThrowsException() throws IOException {
119+
InputCodec codec = mock(InputCodec.class);
120+
PluginMetrics pluginMetrics = mock(PluginMetrics.class);
121+
Counter recordParseErrors = mock(Counter.class);
122+
when(pluginMetrics.counter(KinesisRecordConverter.RECORD_PARSE_ERRORS)).thenReturn(recordParseErrors);
123+
KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(codec, pluginMetrics);
124+
DecompressionEngine decompressionEngine = CompressionOption.NONE.getDecompressionEngine();
125+
doThrow(new IOException("Invalid UTF-8")).when(codec).parse(any(InputStream.class), any(Consumer.class));
126+
127+
KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder()
128+
.data(ByteBuffer.wrap("bad data".getBytes()))
129+
.sequenceNumber("seq-1")
130+
.partitionKey("key-1")
131+
.build();
132+
133+
List<KinesisInputOutputRecord> results = kinesisRecordConverter.convert(
134+
decompressionEngine, List.of(kinesisClientRecord), streamId);
135+
136+
assertEquals(0, results.size());
137+
verify(codec, times(1)).parse(any(InputStream.class), any(Consumer.class));
138+
verify(recordParseErrors, times(1)).increment();
139+
}
140+
111141
private static Map<String, Object> generateJson() {
112142
final Map<String, Object> jsonObject = new LinkedHashMap<>();
113143
for (int i = 0; i < 1; i++) {

0 commit comments

Comments
 (0)