-
Notifications
You must be signed in to change notification settings - Fork 480
Expand file tree
/
Copy pathusing_java_naming_convention.py
More file actions
45 lines (36 loc) · 1.36 KB
/
Copy pathusing_java_naming_convention.py
File metadata and controls
45 lines (36 loc) · 1.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from datetime import datetime
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
# Define schema that matches Java producer
avro_schema = """
{
"namespace": "com.example.orders",
"type": "record",
"name": "OrderEvent",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "totalAmount", "type": "double"},
{"name": "orderDate", "type": "long", "logicalType": "timestamp-millis"}
]
}
"""
# Configure schema with field name normalization for Python style
def normalize_field_name(data: dict):
data["order_id"] = data["orderId"]
data["customer_id"] = data["customerId"]
data["total_amount"] = data["totalAmount"]
data["order_date"] = datetime.fromtimestamp(data["orderDate"] / 1000)
return data
schema_config = SchemaConfig(
value_schema_type="AVRO",
value_schema=avro_schema,
value_output_serializer=normalize_field_name,
)
@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
order = record.value # OrderProcessor instance
logger.info(f"Processing order {order['order_id']}")