Skip to content

Drop Kinesis records with invalid UTF-8 bytes#6746

Merged
sb2k16 merged 1 commit into
opensearch-project:mainfrom
sb2k16:fix/drop-invalid-utf8-kinesis-records
Apr 10, 2026
Merged

Drop Kinesis records with invalid UTF-8 bytes#6746
sb2k16 merged 1 commit into
opensearch-project:mainfrom
sb2k16:fix/drop-invalid-utf8-kinesis-records

Conversation

@sb2k16

@sb2k16 sb2k16 commented Apr 9, 2026

Copy link
Copy Markdown
Member

Validate UTF-8 encoding of Kinesis record bytes before passing to the codec. Records containing malformed UTF-8 (e.g. unpaired surrogates like 0xDBC8) are dropped with a warning log instead of crashing the pipeline with a Jackson JsonParseException.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

record.data().get(arr);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr);

if (!isValidUtf8(arr)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right solution? There is no reason that we cannot have codecs that parse non UTF-8 bytes. Should the codec itself do this check?

I believe that our Avro codec wouldn't work here. Also we could support Protobuf or other formats in the future.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we follow this approach?

try {
codec.parse(inputFile, fileCompressionOption.getDecompressionEngine(), record -> {
consumer.accept(record, dataSelection);
});
return inputFile.getLength();
} catch (final Exception e) {
s3ObjectPluginMetrics.getS3ObjectReadFailedCounter().increment();
throw new S3ReadFailedException(e);
}

@sb2k16 sb2k16 force-pushed the fix/drop-invalid-utf8-kinesis-records branch from 2d1013b to 4d95df5 Compare April 10, 2026 04:04
dlvenable
dlvenable previously approved these changes Apr 10, 2026

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sb2k16 !

try {
codec.parse(decompressionEngine.createInputStream(byteArrayInputStream), eventConsumer);
} catch (final Exception e) {
LOG.error("Failed to parse Kinesis record. sequenceNumber={}, partitionKey={}",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a metric for this?

graytaylor0
graytaylor0 previously approved these changes Apr 10, 2026
@sb2k16 sb2k16 dismissed stale reviews from graytaylor0 and dlvenable via 1ebf039 April 10, 2026 16:29
@sb2k16 sb2k16 force-pushed the fix/drop-invalid-utf8-kinesis-records branch from 4d95df5 to 1ebf039 Compare April 10, 2026 16:29
@github-actions

github-actions Bot commented Apr 10, 2026

Copy link
Copy Markdown

✅ License Header Check Passed

All newly added files have proper license headers. Great work! 🎉

@sb2k16 sb2k16 force-pushed the fix/drop-invalid-utf8-kinesis-records branch from 1ebf039 to 95c1354 Compare April 10, 2026 16:32
Wrap codec.parse() in a try-catch in KinesisRecordConverter to handle
parse failures (e.g. invalid UTF-8 surrogates) gracefully. Failed
records are logged and skipped instead of crashing the pipeline.
Adds a recordParseErrors metric counter via PluginMetrics.

Signed-off-by: Souvik Bose <souvbose@amazon.com>
@sb2k16 sb2k16 force-pushed the fix/drop-invalid-utf8-kinesis-records branch from 95c1354 to e3a76ad Compare April 10, 2026 17:19
@sb2k16 sb2k16 merged commit 332c4ce into opensearch-project:main Apr 10, 2026
71 of 72 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants