-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathcomposite_raw_decoder.py
More file actions
160 lines (132 loc) · 5.24 KB
/
composite_raw_decoder.py
File metadata and controls
160 lines (132 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import csv
import gzip
import io
import json
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from io import BufferedIOBase, TextIOWrapper
from typing import Any, Generator, MutableMapping, Optional
import orjson
import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.utils import AirbyteTracedException
logger = logging.getLogger("airbyte")
@dataclass
class Parser(ABC):
@abstractmethod
def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Parse data and yield dictionaries.
"""
pass
@dataclass
class GzipParser(Parser):
inner_parser: Parser
def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Decompress gzipped bytes and pass decompressed data to the inner parser.
"""
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)
@dataclass
class JsonParser(Parser):
encoding: str = "utf-8"
def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]:
"""
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
"""
raw_data = data.read()
body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data)
if body_json is None:
raise AirbyteTracedException(
message="Response JSON data failed to be parsed. See logs for more information.",
internal_message=f"Response JSON data failed to be parsed.",
failure_type=FailureType.system_error,
)
if isinstance(body_json, list):
yield from body_json
else:
yield from [body_json]
def _parse_orjson(self, raw_data: bytes) -> Optional[Any]:
try:
return orjson.loads(raw_data.decode(self.encoding))
except Exception as exc:
logger.debug(
f"Failed to parse JSON data using orjson library. Falling back to json library. {exc}"
)
return None
def _parse_json(self, raw_data: bytes) -> Optional[Any]:
try:
return json.loads(raw_data.decode(self.encoding))
except Exception as exc:
logger.error(f"Failed to parse JSON data using json library. {exc}")
return None
@dataclass
class JsonLineParser(Parser):
encoding: Optional[str] = "utf-8"
def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
for line in data:
try:
yield json.loads(line.decode(encoding=self.encoding or "utf-8"))
except json.JSONDecodeError as e:
logger.warning(f"Cannot decode/parse line {line!r} as JSON, error: {e}")
@dataclass
class CsvParser(Parser):
# TODO: migrate implementation to re-use file-base classes
encoding: Optional[str] = "utf-8"
delimiter: Optional[str] = ","
def _get_delimiter(self) -> Optional[str]:
"""
Get delimiter from the configuration. Check for the escape character and decode it.
"""
if self.delimiter is not None:
if self.delimiter.startswith("\\"):
self.delimiter = self.delimiter.encode("utf-8").decode("unicode_escape")
return self.delimiter
def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Parse CSV data from decompressed bytes.
"""
text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore
reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",")
for row in reader:
yield row
@dataclass
class CompositeRawDecoder(Decoder):
"""
Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None]
passed response.raw to parser(s).
Note: response.raw is not decoded/decompressed by default.
parsers should be instantiated recursively.
Example:
composite_raw_decoder = CompositeRawDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1")))
"""
parser: Parser
stream_response: bool = True
def is_stream_response(self) -> bool:
return self.stream_response
def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
if self.is_stream_response():
# urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content)
# We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution.
response.raw.auto_close = False
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
response.raw.close()
else:
yield from self.parser.parse(data=io.BytesIO(response.content))