-
Notifications
You must be signed in to change notification settings - Fork 481
Expand file tree
/
Copy pathconsumer_records.py
More file actions
162 lines (125 loc) · 5.2 KB
/
Copy pathconsumer_records.py
File metadata and controls
162 lines (125 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from __future__ import annotations
import logging
from functools import cached_property
from typing import TYPE_CHECKING, Any
from aws_lambda_powertools.shared.functions import decode_header_bytes
from aws_lambda_powertools.utilities.data_classes.common import CaseInsensitiveDict
from aws_lambda_powertools.utilities.data_classes.kafka_event import KafkaEventBase, KafkaEventRecordBase
from aws_lambda_powertools.utilities.kafka.deserializer.deserializer import get_deserializer
from aws_lambda_powertools.utilities.kafka.serialization.serialization import serialize_to_output_type
if TYPE_CHECKING:
from collections.abc import Iterator
from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig
logger = logging.getLogger(__name__)
class ConsumerRecordRecords(KafkaEventRecordBase):
"""
A Kafka Consumer Record
"""
def __init__(self, data: dict[str, Any], schema_config: SchemaConfig | None = None):
super().__init__(data)
self.schema_config = schema_config
@cached_property
def key(self) -> Any:
key = self.get("key")
# Return None if key doesn't exist
if not key:
return None
logger.debug("Deserializing key field")
# Determine schema type and schema string
schema_type = None
schema_value = None
output_serializer = None
if self.schema_config and self.schema_config.key_schema_type:
schema_type = self.schema_config.key_schema_type
schema_value = self.schema_config.key_schema
output_serializer = self.schema_config.key_output_serializer
# Always use get_deserializer if None it will default to DEFAULT
deserializer = get_deserializer(
schema_type=schema_type,
schema_value=schema_value,
field_metadata=self.key_schema_metadata,
)
deserialized_value = deserializer.deserialize(key)
# Apply output serializer if specified
if output_serializer:
return serialize_to_output_type(deserialized_value, output_serializer)
return deserialized_value
@cached_property
def value(self) -> Any:
value = self["value"]
# Determine schema type and schema string
schema_type = None
schema_value = None
output_serializer = None
logger.debug("Deserializing value field")
if self.schema_config and self.schema_config.value_schema_type:
schema_type = self.schema_config.value_schema_type
schema_value = self.schema_config.value_schema
output_serializer = self.schema_config.value_output_serializer
# Always use get_deserializer if None it will default to DEFAULT
deserializer = get_deserializer(
schema_type=schema_type,
schema_value=schema_value,
field_metadata=self.value_schema_metadata,
)
deserialized_value = deserializer.deserialize(value)
# Apply output serializer if specified
if output_serializer:
return serialize_to_output_type(deserialized_value, output_serializer)
return deserialized_value
@property
def original_value(self) -> str:
"""The original (base64 encoded) Kafka record value."""
return self["value"]
@property
def original_key(self) -> str | None:
"""
The original (base64 encoded) Kafka record key.
This key is optional; if not provided,
a round-robin algorithm will be used to determine
the partition for the message.
"""
return self.get("key")
@property
def original_headers(self) -> list[dict[str, list[int]]]:
"""The raw Kafka record headers."""
return self["headers"]
@cached_property
def headers(self) -> dict[str, bytes]:
"""Decodes the headers as a single dictionary."""
return CaseInsensitiveDict(
(k, decode_header_bytes(v)) for chunk in self.original_headers for k, v in chunk.items()
)
class ConsumerRecords(KafkaEventBase):
"""Self-managed or MSK Apache Kafka event trigger
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
"""
def __init__(self, data: dict[str, Any], schema_config: SchemaConfig | None = None):
super().__init__(data)
self._records: Iterator[ConsumerRecordRecords] | None = None
self.schema_config = schema_config
@property
def records(self) -> Iterator[ConsumerRecordRecords]:
"""The Kafka records."""
for chunk in self["records"].values():
for record in chunk:
yield ConsumerRecordRecords(data=record, schema_config=self.schema_config)
@property
def record(self) -> ConsumerRecordRecords:
"""
Returns the next Kafka record using an iterator.
Returns
-------
ConsumerRecordRecords
The next Kafka record.
Raises
------
StopIteration
If there are no more records available.
"""
if self._records is None:
self._records = self.records
return next(self._records)