Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package org.opensearch.dataprepper.plugins.kinesis.source.converter;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.event.Event;
Expand All @@ -18,6 +20,9 @@
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.time.Instant;
Expand All @@ -27,10 +32,14 @@

public class KinesisRecordConverter {

private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordConverter.class);
static final String RECORD_PARSE_ERRORS = "recordParseErrors";
private final InputCodec codec;
private final Counter recordParseErrors;

public KinesisRecordConverter(final InputCodec codec) {
public KinesisRecordConverter(final InputCodec codec, final PluginMetrics pluginMetrics) {
this.codec = codec;
this.recordParseErrors = pluginMetrics.counter(RECORD_PARSE_ERRORS);
}

public List<KinesisInputOutputRecord> convert(final DecompressionEngine decompressionEngine,
Expand Down Expand Up @@ -65,6 +74,12 @@ private void processRecord(final DecompressionEngine decompressionEngine,
record.data().get(arr);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr);

codec.parse(decompressionEngine.createInputStream(byteArrayInputStream), eventConsumer);
try {
codec.parse(decompressionEngine.createInputStream(byteArrayInputStream), eventConsumer);
} catch (final Exception e) {
recordParseErrors.increment();
LOG.error("Failed to parse Kinesis record. sequenceNumber={}, partitionKey={}",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a metric for this?

record.sequenceNumber(), record.partitionKey(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public KinesisShardRecordProcessorFactory(Buffer<Record<Event>> buffer,
this.buffer = buffer;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pluginMetrics = pluginMetrics;
this.kinesisRecordConverter = new KinesisRecordConverter(codec);
this.kinesisRecordConverter = new KinesisRecordConverter(codec, pluginMetrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

package org.opensearch.dataprepper.plugins.kinesis.source;

import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
Expand Down Expand Up @@ -155,6 +156,8 @@ void setup() {
kinesisLeaseConfig = mock(KinesisLeaseConfig.class);
workerIdentifierGenerator = mock(WorkerIdentifierGenerator.class);
kinesisLeaseCoordinationTableConfig = mock(KinesisLeaseCoordinationTableConfig.class);
pluginMetrics = mock(PluginMetrics.class);
when(pluginMetrics.counter(any(String.class))).thenReturn(mock(Counter.class));
when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig);
when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn("kinesis-lease-table");
when(kinesisLeaseCoordinationTableConfig.getRegion()).thenReturn("us-east-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
package org.opensearch.dataprepper.plugins.kinesis.source.converter;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.codec.DecompressionEngine;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
Expand All @@ -39,6 +41,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -50,7 +53,9 @@ public class KinesisRecordConverterTest {
@Test
void testRecordConverter() throws IOException {
InputCodec codec = mock(InputCodec.class);
KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(codec);
PluginMetrics pluginMetrics = mock(PluginMetrics.class);
when(pluginMetrics.counter(KinesisRecordConverter.RECORD_PARSE_ERRORS)).thenReturn(mock(Counter.class));
KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(codec, pluginMetrics);
DecompressionEngine decompressionEngine = CompressionOption.NONE.getDecompressionEngine();
doNothing().when(codec).parse(any(InputStream.class), any(Consumer.class));

Expand Down Expand Up @@ -81,7 +86,8 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException {
}

KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(
new NdjsonInputCodec(new NdjsonInputConfig(), TestEventFactory.getTestEventFactory()));
new NdjsonInputCodec(new NdjsonInputConfig(), TestEventFactory.getTestEventFactory()),
PluginMetrics.fromNames("test", "pipeline"));

final String partitionKey = UUID.randomUUID().toString();
final String sequenceNumber = UUID.randomUUID().toString();
Expand All @@ -108,6 +114,30 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException {
});
}

@Test
void convert_doesNotThrowWhenCodecParseThrowsException() throws IOException {
InputCodec codec = mock(InputCodec.class);
PluginMetrics pluginMetrics = mock(PluginMetrics.class);
Counter recordParseErrors = mock(Counter.class);
when(pluginMetrics.counter(KinesisRecordConverter.RECORD_PARSE_ERRORS)).thenReturn(recordParseErrors);
KinesisRecordConverter kinesisRecordConverter = new KinesisRecordConverter(codec, pluginMetrics);
DecompressionEngine decompressionEngine = CompressionOption.NONE.getDecompressionEngine();
doThrow(new IOException("Invalid UTF-8")).when(codec).parse(any(InputStream.class), any(Consumer.class));

KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder()
.data(ByteBuffer.wrap("bad data".getBytes()))
.sequenceNumber("seq-1")
.partitionKey("key-1")
.build();

List<KinesisInputOutputRecord> results = kinesisRecordConverter.convert(
decompressionEngine, List.of(kinesisClientRecord), streamId);

assertEquals(0, results.size());
verify(codec, times(1)).parse(any(InputStream.class), any(Consumer.class));
verify(recordParseErrors, times(1)).increment();
}

private static Map<String, Object> generateJson() {
final Map<String, Object> jsonObject = new LinkedHashMap<>();
for (int i = 0; i < 1; i++) {
Expand Down
Loading