-
Notifications
You must be signed in to change notification settings - Fork 481
Expand file tree
/
Copy pathaccess_event_metadata.py
More file actions
44 lines (35 loc) · 1.29 KB
/
Copy pathaccess_event_metadata.py
File metadata and controls
44 lines (35 loc) · 1.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
# Define Avro schema
avro_schema = """
{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
"""
schema_config = SchemaConfig(
value_schema_type="AVRO",
value_schema=avro_schema,
)
@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context: LambdaContext):
for record in event.records:
# Log record coordinates for tracing
logger.info(f"Processing message from topic '{record.topic}'")
logger.info(f"Partition: {record.partition}, Offset: {record.offset}")
logger.info(f"Produced at: {record.timestamp}")
# Process message headers
logger.info(f"Headers: {record.headers}")
# Access the Avro deserialized message content
value = record.value
logger.info(f"Deserialized value: {value['name']}")
# For debugging, you can access the original raw data
logger.info(f"Raw message: {record.original_value}")
return {"statusCode": 200}