22
33from typing import Any
44
5+ from google .protobuf .internal .decoder import _DecodeVarint # type: ignore[attr-defined]
56from google .protobuf .json_format import MessageToDict
67
78from aws_lambda_powertools .utilities .kafka .deserializer .base import DeserializerBase
@@ -43,6 +44,12 @@ def deserialize(self, data: bytes | str) -> dict:
4344 When the data cannot be deserialized according to the message class,
4445 typically due to data format incompatibility or incorrect message class.
4546
47+ Notes
48+ -----
49+ This deserializer handles both standard Protocol Buffer format and the Confluent
50+ Schema Registry format which includes message index information. It will first try
51+ standard deserialization and fall back to message index handling if needed.
52+
4653 Example
4754 --------
4855 >>> # Assuming proper protobuf setup
@@ -54,11 +61,56 @@ def deserialize(self, data: bytes | str) -> dict:
5461 ... except KafkaConsumerDeserializationError as e:
5562 ... print(f"Failed to deserialize: {e}")
5663 """
64+ value = self ._decode_input (data )
5765 try :
58- value = self ._decode_input (data )
5966 message = self .message_class ()
6067 message .ParseFromString (value )
6168 return MessageToDict (message , preserving_proto_field_name = True )
69+ except Exception :
70+ return self ._deserialize_with_message_index (value , self .message_class ())
71+
72+ def _deserialize_with_message_index (self , data : bytes , parser : Any ) -> dict :
73+ """
74+ Deserialize protobuf message with Confluent message index handling.
75+
76+ Parameters
77+ ----------
78+ data : bytes
79+ data
80+ parser : google.protobuf.message.Message
81+ Protobuf message instance to parse the data into
82+
83+ Returns
84+ -------
85+ dict
86+ Dictionary representation of the parsed protobuf message with original field names
87+
88+ Raises
89+ ------
90+ KafkaConsumerDeserializationError
91+ If deserialization fails
92+
93+ Notes
94+ -----
95+ This method handles the special case of Confluent Schema Registry's message index
96+ format, where the message is prefixed with either a single 0 (for the first schema)
97+ or a list of schema indexes. The actual protobuf message follows these indexes.
98+ """
99+
100+ buffer = memoryview (data )
101+ pos = 0
102+
103+ try :
104+ first_value , new_pos = _DecodeVarint (buffer , pos )
105+ pos = new_pos
106+
107+ if first_value != 0 :
108+ for _ in range (first_value ):
109+ _ , new_pos = _DecodeVarint (buffer , pos )
110+ pos = new_pos
111+
112+ parser .ParseFromString (data [pos :])
113+ return MessageToDict (parser , preserving_proto_field_name = True )
62114 except Exception as e :
63115 raise KafkaConsumerDeserializationError (
64116 f"Error trying to deserialize protobuf data - { type (e ).__name__ } : { str (e )} " ,
0 commit comments