forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkinesis.py
More file actions
62 lines (51 loc) · 2.46 KB
/
kinesis.py
File metadata and controls
62 lines (51 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from __future__ import annotations
import logging
import zlib
from typing import TYPE_CHECKING, Any, cast
from aws_lambda_powertools.utilities.parser.envelopes.base import BaseEnvelope
from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamModel
if TYPE_CHECKING:
from aws_lambda_powertools.utilities.parser.types import T
logger = logging.getLogger(__name__)
class KinesisDataStreamEnvelope(BaseEnvelope):
"""Kinesis Data Stream Envelope to extract array of Records
The record's data parameter is a base64 encoded string which is parsed into a bytes array,
though it can also be a JSON encoded string.
Regardless of its type it'll be parsed into a BaseModel object.
Note: Records will be parsed the same way so if model is str,
all items in the list will be parsed as str and not as JSON (and vice versa)
"""
def parse(self, data: dict[str, Any] | Any | None, model: type[T]) -> list[T | None]:
"""Parses records found with model provided
Parameters
----------
data : dict
Lambda event to be parsed
model : type[T]
Data model provided to parse after extracting data using envelope
Returns
-------
list[T | None]
List of records parsed with model provided
"""
logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}")
parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.model_validate(data)
logger.debug(f"Parsing Kinesis records in `body` with {model}")
models = []
for record in parsed_envelope.Records:
# We allow either AWS expected contract (bytes) or a custom Model, see #943
data = cast(bytes, record.kinesis.data)
try:
decoded_data = data.decode("utf-8")
except UnicodeDecodeError as ude:
try:
logger.debug(
f"{type(ude).__name__}: {str(ude)} encountered. "
"Data will be decompressed with zlib.decompress().",
)
decompressed_data = zlib.decompress(data, zlib.MAX_WBITS | 32)
decoded_data = decompressed_data.decode("utf-8")
except Exception as e:
raise ValueError("Unable to decode and/or decompress data.") from e
models.append(self._parse(data=decoded_data, model=model))
return models