Skip to content

Commit eaa46d7

Browse files
kafka_actions: fall back to string when schema registry magic byte is absent (DataDog#23951)
* kafka_actions: fall back to string when schema registry magic byte is absent When uses_schema_registry=True but the first byte of a message is not 0x00, the deserializer now decodes the message as a plain UTF-8 string instead of raising an error. This handles producers that mix framed and unframed messages on the same topic. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_actions: add changelog entry for PR DataDog#23951 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_actions: simplify SR fallback — debug log, no raise, cleaner condition - Replace warning with debug log - Single condition: len >= 5 && magic byte == 0x00 → SR path, else → string fallback - No ValueError raised for any non-SR message Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_actions: remove msg_hex from debug log Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_actions: fix lint errors (E501, I001) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * kafka_actions: fix remaining I001 import sort errors using root pyproject.toml Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent e9ffa01 commit eaa46d7

3 files changed

Lines changed: 55 additions & 24 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fall back to string deserialization when schema registry magic byte is absent.

kafka_actions/datadog_checks/kafka_actions/message_deserializer.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -243,20 +243,21 @@ def _deserialize_bytes_maybe_schema_registry(
243243
) -> tuple[str | None, int | None]:
244244
"""Deserialize message, handling Schema Registry format if present."""
245245
if uses_schema_registry:
246-
if len(message) < 5 or message[0] != SCHEMA_REGISTRY_MAGIC_BYTE:
247-
msg_hex = message[:5].hex() if len(message) >= 5 else message.hex()
248-
raise ValueError(
249-
f"Expected schema registry format (magic byte 0x00 + 4-byte schema ID), "
250-
f"but message is too short or has wrong magic byte: {msg_hex}"
251-
)
252-
schema_id = int.from_bytes(message[1:5], 'big')
253-
message = message[5:] # Skip the magic byte and schema ID bytes
246+
if len(message) >= 5 and message[0] == SCHEMA_REGISTRY_MAGIC_BYTE:
247+
schema_id = int.from_bytes(message[1:5], 'big')
248+
message = message[5:] # Skip the magic byte and schema ID bytes
254249

255-
actual_format = message_format
256-
if self.schema_registry is not None:
257-
schema, actual_format = self._fetch_and_build_schema(schema_id, message_format)
250+
actual_format = message_format
251+
if self.schema_registry is not None:
252+
schema, actual_format = self._fetch_and_build_schema(schema_id, message_format)
258253

259-
return self._deserialize_bytes(message, actual_format, schema, uses_schema_registry=True), schema_id
254+
return self._deserialize_bytes(message, actual_format, schema, uses_schema_registry=True), schema_id
255+
else:
256+
self.log.debug(
257+
"Expected schema registry format (magic byte 0x00 + 4-byte schema ID), "
258+
"but message is too short or has wrong magic byte, falling back to string",
259+
)
260+
return self._deserialize_bytes(message, 'string', None, uses_schema_registry=False), None
260261
else:
261262
# Fallback behavior: try without schema registry format first, then with it
262263
try:

kafka_actions/tests/test_message_deserializer.py

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,11 @@ def test_avro_explicit_schema_registry_configuration(self):
270270
assert result[1] is None, "Should have no schema ID"
271271
assert 'The Go Programming Language' in result[0]
272272

273-
# Test 2: uses_schema_registry=True with plain Avro message - should fail (missing magic byte)
273+
# Test 2: uses_schema_registry=True with plain Avro message - falls back to string (non-UTF-8 bytes → error)
274274
result = deserializer.deserialize_message(avro_message_no_sr, 'avro', avro_schema, True)
275-
assert result[0].startswith("<deserialization error:"), "Should fail when uses_schema_registry=True"
275+
assert result[0].startswith("<deserialization error:"), (
276+
"Non-UTF-8 avro bytes produce error after string fallback"
277+
)
276278
assert result[1] is None
277279

278280
# Test 3: uses_schema_registry=True with Schema Registry format - should succeed
@@ -281,17 +283,18 @@ def test_avro_explicit_schema_registry_configuration(self):
281283
assert result[1] == 350, "Should extract schema ID 350"
282284
assert 'The Go Programming Language' in result[0]
283285

284-
# Test 4: Wrong magic byte - should fail
286+
# Test 4: Wrong magic byte - falls back to string (non-UTF-8 bytes → error)
285287
wrong_magic_byte = (
286288
b'\x01\x00\x00\x01\x5e\xd0\xf5\xe4\xd6\xa3\xb9\x046The Go Programming Language\x18Alan Donovan'
287289
)
288290
result = deserializer.deserialize_message(wrong_magic_byte, 'avro', avro_schema, True)
289-
assert result[0].startswith("<deserialization error:"), "Should fail with wrong magic byte"
291+
assert result[0].startswith("<deserialization error:"), "Non-UTF-8 bytes produce error after string fallback"
290292

291-
# Test 5: Message too short (less than 5 bytes) - should fail
293+
# Test 5: Magic byte 0x00 present but message too short (< 5 bytes) - falls back to string
292294
too_short = b'\x00\x00\x01'
293295
result = deserializer.deserialize_message(too_short, 'avro', avro_schema, True)
294-
assert result[0].startswith("<deserialization error:"), "Should fail when message too short"
296+
assert result[0] is not None, "Too-short message falls back to string, not an error"
297+
assert result[1] is None
295298

296299
# Test 6: Test through DeserializedMessage wrapper
297300
kafka_msg = MockKafkaMessage(key=key_bytes, value=avro_message_no_sr)
@@ -406,10 +409,10 @@ def test_protobuf_explicit_schema_registry_configuration(self):
406409
assert result[1] is None, "Should have no schema ID"
407410
assert 'The Go Programming Language' in result[0]
408411

409-
# Test 2: uses_schema_registry=True with plain Protobuf message - should fail
412+
# Test 2: uses_schema_registry=True with plain Protobuf message - falls back to string (non-UTF-8 → error)
410413
result = deserializer.deserialize_message(protobuf_message_no_sr, 'protobuf', protobuf_schema, True)
411414
assert result[0].startswith("<deserialization error:"), (
412-
"Protobuf should fail when uses_schema_registry=True but no SR format"
415+
"Non-UTF-8 protobuf bytes produce error after string fallback"
413416
)
414417
assert result[1] is None
415418

@@ -419,15 +422,16 @@ def test_protobuf_explicit_schema_registry_configuration(self):
419422
assert result[1] == 350, "Should extract schema ID 350"
420423
assert 'The Go Programming Language' in result[0]
421424

422-
# Test 4: Wrong magic byte - should fail
425+
# Test 4: Wrong magic byte - falls back to string (non-UTF-8 bytes → error)
423426
wrong_magic_byte = b'\x01\x00\x00\x01\x5e' + protobuf_message_no_sr
424427
result = deserializer.deserialize_message(wrong_magic_byte, 'protobuf', protobuf_schema, True)
425-
assert result[0].startswith("<deserialization error:"), "Should fail with wrong magic byte"
428+
assert result[0].startswith("<deserialization error:"), "Non-UTF-8 bytes produce error after string fallback"
426429

427-
# Test 5: Message too short (less than 5 bytes) - should fail
430+
# Test 5: Magic byte 0x00 present but message too short (< 5 bytes) - falls back to string
428431
too_short = b'\x00\x00\x01'
429432
result = deserializer.deserialize_message(too_short, 'protobuf', protobuf_schema, True)
430-
assert result[0].startswith("<deserialization error:"), "Should fail when message too short"
433+
assert result[0] is not None, "Too-short message falls back to string, not an error"
434+
assert result[1] is None
431435

432436
# Test 6: Test through DeserializedMessage wrapper
433437
kafka_msg = MockKafkaMessage(key=key_bytes, value=protobuf_message_no_sr)
@@ -655,6 +659,31 @@ def test_value_skip_bytes_via_deserialized_message(self):
655659
assert msg.value == {'price': 99.95}
656660
assert msg.key == 'order-42'
657661

662+
def test_schema_registry_wrong_magic_byte_falls_back_to_string(self):
663+
"""When uses_schema_registry=True but first byte is not 0x00, fall back to string decoding."""
664+
log = MagicMock()
665+
deserializer = MessageDeserializer(log)
666+
667+
# A plain UTF-8 string message — no schema registry framing.
668+
plain_text = b'hello-world'
669+
result, schema_id = deserializer.deserialize_message(plain_text, 'avro', uses_schema_registry=True)
670+
671+
assert result == json.dumps('hello-world')
672+
assert schema_id is None
673+
log.debug.assert_called()
674+
675+
def test_schema_registry_wrong_magic_byte_non_utf8_returns_error(self):
676+
"""When fallback-to-string is triggered but bytes are not valid UTF-8, return error string."""
677+
log = MagicMock()
678+
deserializer = MessageDeserializer(log)
679+
680+
# Bytes that are not valid UTF-8 and don't start with 0x00.
681+
invalid_utf8 = b'\xff\xfe\xfd'
682+
result, schema_id = deserializer.deserialize_message(invalid_utf8, 'json', uses_schema_registry=True)
683+
684+
assert result.startswith("<deserialization error:")
685+
assert schema_id is None
686+
658687
def test_skip_bytes_runs_before_schema_registry_detection(self):
659688
"""skip_bytes is applied before the Confluent SR magic-byte check.
660689

0 commit comments

Comments
 (0)