11import base64
22import io
33from copy import deepcopy
4+ from dataclasses import dataclass
45
56import pytest
67from avro .io import BinaryEncoder , DatumWriter
@@ -86,57 +87,58 @@ def kafka_event_with_avro_data(avro_encoded_value, avro_encoded_key):
8687 }
8788
8889
89- def test_kafka_consumer_with_avro_and_dataclass (
90- kafka_event_with_avro_data ,
91- avro_value_schema ,
92- lambda_context ,
93- user_value_dataclass ,
94- ):
95- """Test Kafka consumer with Avro deserialization and dataclass output serialization."""
90+ @dataclass
91+ class UserValueDataClass :
92+ name : str
93+ age : int
94+
95+
96+ @dataclass
97+ class UserKeyClass :
98+ user_id : str
99+
100+
101+ def test_kafka_consumer_with_avro (kafka_event_with_avro_data , avro_value_schema , lambda_context ):
102+ """Test Kafka consumer with Avro deserialization without output serialization."""
96103
97104 # Create dict to capture results
98105 result_data = {}
99106
100- schema_config = SchemaConfig (
101- value_schema_type = "AVRO" ,
102- value_schema = avro_value_schema ,
103- value_output_serializer = user_value_dataclass ,
104- )
107+ schema_config = SchemaConfig (value_schema_type = "AVRO" , value_schema = avro_value_schema )
105108
106109 @kafka_consumer (schema_config = schema_config )
107110 def handler (event : ConsumerRecords , context ):
108111 # Capture the results to verify
109112 record = next (event .records )
110113 result_data ["value_type" ] = type (record .value ).__name__
111- result_data ["name" ] = record .value . name
112- result_data ["age" ] = record .value . age
114+ result_data ["name" ] = record .value [ " name" ]
115+ result_data ["age" ] = record .value [ " age" ]
113116 return {"processed" : True }
114117
115118 # Call the handler
116119 result = handler (kafka_event_with_avro_data , lambda_context )
117120
118121 # Verify the results
119122 assert result == {"processed" : True }
120- assert result_data ["value_type" ] == "UserValueDataClass "
123+ assert result_data ["value_type" ] == "dict "
121124 assert result_data ["name" ] == "John Doe"
122125 assert result_data ["age" ] == 30
123126
124127
125- def test_kafka_consumer_with_avro_and_custom_object (
128+ def test_kafka_consumer_with_avro_and_dataclass (
126129 kafka_event_with_avro_data ,
127130 avro_value_schema ,
128131 lambda_context ,
129- user_value_dict ,
130132):
131- """Test Kafka consumer with Avro deserialization and custom object serialization."""
133+ """Test Kafka consumer with Avro deserialization and dataclass output serialization."""
132134
133135 # Create dict to capture results
134136 result_data = {}
135137
136138 schema_config = SchemaConfig (
137139 value_schema_type = "AVRO" ,
138140 value_schema = avro_value_schema ,
139- value_output_serializer = user_value_dict ,
141+ value_output_serializer = UserValueDataClass ,
140142 )
141143
142144 @kafka_consumer (schema_config = schema_config )
@@ -153,34 +155,43 @@ def handler(event: ConsumerRecords, context):
153155
154156 # Verify the results
155157 assert result == {"processed" : True }
156- assert result_data ["value_type" ] == "UserValueDict "
158+ assert result_data ["value_type" ] == "UserValueDataClass "
157159 assert result_data ["name" ] == "John Doe"
158160 assert result_data ["age" ] == 30
159161
160162
161- def test_kafka_consumer_with_avro_raw (kafka_event_with_avro_data , avro_value_schema , lambda_context ):
162- """Test Kafka consumer with Avro deserialization without output serialization."""
163+ def test_kafka_consumer_with_avro_and_custom_object (
164+ kafka_event_with_avro_data ,
165+ avro_value_schema ,
166+ lambda_context ,
167+ user_value_dict ,
168+ ):
169+ """Test Kafka consumer with Avro deserialization and custom object serialization."""
163170
164171 # Create dict to capture results
165172 result_data = {}
166173
167- schema_config = SchemaConfig (value_schema_type = "AVRO" , value_schema = avro_value_schema )
174+ schema_config = SchemaConfig (
175+ value_schema_type = "AVRO" ,
176+ value_schema = avro_value_schema ,
177+ value_output_serializer = user_value_dict ,
178+ )
168179
169180 @kafka_consumer (schema_config = schema_config )
170181 def handler (event : ConsumerRecords , context ):
171182 # Capture the results to verify
172183 record = next (event .records )
173184 result_data ["value_type" ] = type (record .value ).__name__
174- result_data ["name" ] = record .value [ " name" ]
175- result_data ["age" ] = record .value [ " age" ]
185+ result_data ["name" ] = record .value . name
186+ result_data ["age" ] = record .value . age
176187 return {"processed" : True }
177188
178189 # Call the handler
179190 result = handler (kafka_event_with_avro_data , lambda_context )
180191
181192 # Verify the results
182193 assert result == {"processed" : True }
183- assert result_data ["value_type" ] == "dict "
194+ assert result_data ["value_type" ] == "UserValueDict "
184195 assert result_data ["name" ] == "John Doe"
185196 assert result_data ["age" ] == 30
186197
@@ -207,7 +218,7 @@ def lambda_handler(event: ConsumerRecords, context):
207218
208219 # The exact error message may vary depending on the Avro library's internals,
209220 # but should indicate a deserialization problem
210- assert "Error trying to deserializer avro data" in str (excinfo .value )
221+ assert "Error trying to deserialize avro data" in str (excinfo .value )
211222
212223
213224def test_kafka_consumer_with_invalid_avro_schema (kafka_event_with_avro_data , lambda_context ):
@@ -245,8 +256,6 @@ def test_kafka_consumer_with_key_deserialization(
245256 lambda_context ,
246257 avro_value_schema ,
247258 avro_key_schema ,
248- user_value_dataclass ,
249- user_key_dataclass ,
250259):
251260 """Test deserializing both key and value with different schemas and serializers."""
252261
@@ -257,10 +266,10 @@ def test_kafka_consumer_with_key_deserialization(
257266 schema_config = SchemaConfig (
258267 value_schema_type = "AVRO" ,
259268 value_schema = avro_value_schema ,
260- value_output_serializer = user_value_dataclass ,
269+ value_output_serializer = UserValueDataClass ,
261270 key_schema_type = "AVRO" ,
262271 key_schema = avro_key_schema ,
263- key_output_serializer = user_key_dataclass ,
272+ key_output_serializer = UserKeyClass ,
264273 )
265274
266275 @kafka_consumer (schema_config = schema_config )
@@ -283,48 +292,3 @@ def lambda_handler(event: ConsumerRecords, context):
283292 assert key_value_result ["value_type" ] == "UserValueDataClass"
284293 assert key_value_result ["value_name" ] == "John Doe"
285294 assert key_value_result ["value_age" ] == 30
286-
287-
288- def test_kafka_consumer_with_different_serializers_for_key_and_value (
289- kafka_event_with_avro_data ,
290- lambda_context ,
291- avro_value_schema ,
292- avro_key_schema ,
293- user_key_dataclass ,
294- user_value_dict ,
295- ):
296- """Test using different serializer types for key and value."""
297-
298- # Create dict to capture results
299- results = {}
300-
301- # Create schema config with different serializers
302- schema_config = SchemaConfig (
303- value_schema_type = "AVRO" ,
304- value_schema = avro_value_schema ,
305- value_output_serializer = user_value_dict ,
306- key_schema_type = "AVRO" ,
307- key_schema = avro_key_schema ,
308- key_output_serializer = user_key_dataclass ,
309- )
310-
311- @kafka_consumer (schema_config = schema_config )
312- def handler (event : ConsumerRecords , context ):
313- record = next (event .records )
314- results ["key_type" ] = type (record .key ).__name__
315- results ["key_id" ] = record .key .user_id
316- results ["value_type" ] = type (record .value ).__name__
317- results ["value_name" ] = record .value .name
318- results ["value_age" ] = record .value .age
319- return {"processed" : True }
320-
321- # Call the handler
322- result = handler (kafka_event_with_avro_data , lambda_context )
323-
324- # Verify the results
325- assert result == {"processed" : True }
326- assert results ["key_type" ] == "UserKeyClass"
327- assert results ["key_id" ] == "user-123"
328- assert results ["value_type" ] == "UserValueDict"
329- assert results ["value_name" ] == "John Doe"
330- assert results ["value_age" ] == 30
0 commit comments