Skip to content

Commit 4d95df5

Browse files
committed
Drop Kinesis records with invalid UTF-8 bytes
Validate UTF-8 encoding of Kinesis record bytes before passing to the codec. Records containing malformed UTF-8 (e.g. unpaired surrogates like 0xDBC8) are dropped with a warning log instead of crashing the pipeline with a Jackson JsonParseException. Signed-off-by: Souvik Bose <souvbose@amazon.com>
1 parent 1ac64aa commit 4d95df5

2 files changed

Lines changed: 31 additions & 1 deletion

File tree

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord;
1919
import software.amazon.kinesis.retrieval.KinesisClientRecord;
2020

21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
2124
import java.io.ByteArrayInputStream;
2225
import java.io.IOException;
2326
import java.time.Instant;
@@ -27,6 +30,7 @@
2730

2831
public class KinesisRecordConverter {
2932

33+
private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordConverter.class);
3034
private final InputCodec codec;
3135

3236
public KinesisRecordConverter(final InputCodec codec) {
@@ -65,6 +69,11 @@ private void processRecord(final DecompressionEngine decompressionEngine,
6569
record.data().get(arr);
6670
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr);
6771

68-
codec.parse(decompressionEngine.createInputStream(byteArrayInputStream), eventConsumer);
72+
try {
73+
codec.parse(decompressionEngine.createInputStream(byteArrayInputStream), eventConsumer);
74+
} catch (final Exception e) {
75+
LOG.error("Failed to parse Kinesis record. sequenceNumber={}, partitionKey={}",
76+
record.sequenceNumber(), record.partitionKey(), e);
77+
}
6978
}
7079
}

data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import static org.junit.jupiter.api.Assertions.assertEquals;
4040
import static org.mockito.ArgumentMatchers.any;
4141
import static org.mockito.Mockito.doNothing;
42+
import static org.mockito.Mockito.doThrow;
4243
import static org.mockito.Mockito.mock;
4344
import static org.mockito.Mockito.times;
4445
import static org.mockito.Mockito.verify;
@@ -108,6 +109,26 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException {
108109
});
109110
}
110111

112+
@Test
113+
void convert_doesNotThrowWhenCodecParseThrowsException() throws IOException {
114+
InputCodec codec = mock(InputCodec.class);
115+
KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(codec);
116+
DecompressionEngine decompressionEngine = CompressionOption.NONE.getDecompressionEngine();
117+
doThrow(new IOException("Invalid UTF-8")).when(codec).parse(any(InputStream.class), any(Consumer.class));
118+
119+
KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder()
120+
.data(ByteBuffer.wrap("bad data".getBytes()))
121+
.sequenceNumber("seq-1")
122+
.partitionKey("key-1")
123+
.build();
124+
125+
List<KinesisInputOutputRecord> results = kinesisRecordConverter.convert(
126+
decompressionEngine, List.of(kinesisClientRecord), streamId);
127+
128+
assertEquals(0, results.size());
129+
verify(codec, times(1)).parse(any(InputStream.class), any(Consumer.class));
130+
}
131+
111132
private static Map<String, Object> generateJson() {
112133
final Map<String, Object> jsonObject = new LinkedHashMap<>();
113134
for (int i = 0; i < 1; i++) {

0 commit comments

Comments
 (0)