-
Notifications
You must be signed in to change notification settings - Fork 481
Expand file tree
/
Copy pathconsumer_records.py
More file actions
144 lines (110 loc) · 4.71 KB
/
Copy pathconsumer_records.py
File metadata and controls
144 lines (110 loc) · 4.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from __future__ import annotations
from functools import cached_property
from typing import TYPE_CHECKING, Any
from aws_lambda_powertools.utilities.data_classes.common import CaseInsensitiveDict
from aws_lambda_powertools.utilities.data_classes.kafka_event import KafkaEventBase, KafkaEventRecordBase
from aws_lambda_powertools.utilities.kafka.deserializer.deserializer import get_deserializer
from aws_lambda_powertools.utilities.kafka.serialization.serialization import serialize_to_output_type
if TYPE_CHECKING:
from collections.abc import Iterator
from aws_lambda_powertools.utilities.kafka.schema_config import SchemaConfig
class ConsumerRecordRecords(KafkaEventRecordBase):
"""
A Kafka Consumer Record
"""
def __init__(self, data: dict[str, Any], schema_config: SchemaConfig | None = None):
super().__init__(data)
self.schema_config = schema_config
@cached_property
def key(self) -> Any:
key = self.get("key")
# Return None if key doesn't exist
if not key:
return None
# Determine schema type and schema string
schema_type = None
schema_str = None
output_serializer = None
if self.schema_config and self.schema_config.key_schema_type:
schema_type = self.schema_config.key_schema_type
schema_str = self.schema_config.key_schema
output_serializer = self.schema_config.key_output_serializer
# Always use get_deserializer if None it will default to DEFAULT
deserializer = get_deserializer(schema_type, schema_str)
deserialized_value = deserializer.deserialize(key)
# Apply output serializer if specified
if output_serializer:
return serialize_to_output_type(deserialized_value, output_serializer)
return deserialized_value
@cached_property
def value(self) -> Any:
value = self["value"]
# Determine schema type and schema string
schema_type = None
schema_str = None
output_serializer = None
if self.schema_config and self.schema_config.value_schema_type:
schema_type = self.schema_config.value_schema_type
schema_str = self.schema_config.value_schema
output_serializer = self.schema_config.value_output_serializer
# Always use get_deserializer if None it will default to DEFAULT
deserializer = get_deserializer(schema_type, schema_str)
deserialized_value = deserializer.deserialize(value)
# Apply output serializer if specified
if output_serializer:
return serialize_to_output_type(deserialized_value, output_serializer)
return deserialized_value
@property
def original_value(self) -> str:
"""The original (base64 encoded) Kafka record value."""
return self["value"]
@property
def original_key(self) -> str | None:
"""
The original (base64 encoded) Kafka record key.
This key is optional; if not provided,
a round-robin algorithm will be used to determine
the partition for the message.
"""
return self.get("key")
@property
def original_headers(self) -> list[dict[str, list[int]]]:
"""The raw Kafka record headers."""
return self["headers"]
@cached_property
def headers(self) -> dict[str, bytes]:
"""Decodes the headers as a single dictionary."""
return CaseInsensitiveDict((k, bytes(v)) for chunk in self.original_headers for k, v in chunk.items())
class ConsumerRecords(KafkaEventBase):
"""Self-managed or MSK Apache Kafka event trigger
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
- https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
"""
def __init__(self, data: dict[str, Any], schema_config: SchemaConfig | None = None):
super().__init__(data)
self._records: Iterator[ConsumerRecordRecords] | None = None
self.schema_config = schema_config
@property
def records(self) -> Iterator[ConsumerRecordRecords]:
"""The Kafka records."""
for chunk in self["records"].values():
for record in chunk:
yield ConsumerRecordRecords(data=record, schema_config=self.schema_config)
@property
def record(self) -> ConsumerRecordRecords:
"""
Returns the next Kafka record using an iterator.
Returns
-------
ConsumerRecordRecords
The next Kafka record.
Raises
------
StopIteration
If there are no more records available.
"""
if self._records is None:
self._records = self.records
return next(self._records)