-
Notifications
You must be signed in to change notification settings - Fork 481
Expand file tree
/
Copy pathworking_with_key_and_value.py
More file actions
49 lines (41 loc) · 1.2 KB
/
Copy pathworking_with_key_and_value.py
File metadata and controls
49 lines (41 loc) · 1.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
# Define schemas for both components
key_schema = """
{
"type": "record",
"name": "ProductKey",
"fields": [
{"name": "product_id", "type": "string"}
]
}
"""
value_schema = """
{
"type": "record",
"name": "ProductInfo",
"fields": [
{"name": "name", "type": "string"},
{"name": "price", "type": "double"},
{"name": "in_stock", "type": "boolean"}
]
}
"""
# Configure both key and value schemas
schema_config = SchemaConfig(
key_schema_type="AVRO",
key_schema=key_schema,
value_schema_type="AVRO",
value_schema=value_schema,
)
@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
# Access both deserialized components
key = record.key
value = record.value
logger.info(f"Processing key: {key['product_id']}")
logger.info(f"Processing value: {value['name']}")
return {"statusCode": 200}