Drop Kinesis records with invalid UTF-8 bytes#6746
Merged
sb2k16 merged 1 commit intoApr 10, 2026
Conversation
dlvenable
requested changes
Apr 9, 2026
| record.data().get(arr); | ||
| ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arr); | ||
|
|
||
| if (!isValidUtf8(arr)) { |
Member
There was a problem hiding this comment.
Is this the right solution? There is no reason that we cannot have codecs that parse non UTF-8 bytes. Should the codec itself do this check?
I believe that our Avro codec wouldn't work here. Also we could support Protobuf or other formats in the future.
Member
There was a problem hiding this comment.
Can we follow this approach?
2d1013b to
4d95df5
Compare
dlvenable
previously approved these changes
Apr 10, 2026
divbok
approved these changes
Apr 10, 2026
graytaylor0
reviewed
Apr 10, 2026
| try { | ||
| codec.parse(decompressionEngine.createInputStream(byteArrayInputStream), eventConsumer); | ||
| } catch (final Exception e) { | ||
| LOG.error("Failed to parse Kinesis record. sequenceNumber={}, partitionKey={}", |
Member
There was a problem hiding this comment.
Should we have a metric for this?
graytaylor0
previously approved these changes
Apr 10, 2026
4d95df5 to
1ebf039
Compare
✅ License Header Check PassedAll newly added files have proper license headers. Great work! 🎉 |
1ebf039 to
95c1354
Compare
Wrap codec.parse() in a try-catch in KinesisRecordConverter to handle parse failures (e.g. invalid UTF-8 surrogates) gracefully. Failed records are logged and skipped instead of crashing the pipeline. Adds a recordParseErrors metric counter via PluginMetrics. Signed-off-by: Souvik Bose <souvbose@amazon.com>
95c1354 to
e3a76ad
Compare
dlvenable
approved these changes
Apr 10, 2026
graytaylor0
approved these changes
Apr 10, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Validate UTF-8 encoding of Kinesis record bytes before passing to the codec. Records containing malformed UTF-8 (e.g. unpaired surrogates like 0xDBC8) are dropped with a warning log instead of crashing the pipeline with a Jackson JsonParseException.
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.