-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathdynamic_schema_loader.py
More file actions
322 lines (277 loc) · 11.7 KB
/
dynamic_schema_loader.py
File metadata and controls
322 lines (277 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from abc import ABC, abstractmethod
from copy import deepcopy
from dataclasses import InitVar, dataclass, field
from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Union
import dpath
from typing_extensions import deprecated
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.types import Config
AIRBYTE_DATA_TYPES: Mapping[str, MutableMapping[str, Any]] = {
"string": {"type": ["null", "string"]},
"boolean": {"type": ["null", "boolean"]},
"date": {"type": ["null", "string"], "format": "date"},
"timestamp_without_timezone": {
"type": ["null", "string"],
"format": "date-time",
"airbyte_type": "timestamp_without_timezone",
},
"timestamp_with_timezone": {"type": ["null", "string"], "format": "date-time"},
"time_without_timezone": {
"type": ["null", "string"],
"format": "time",
"airbyte_type": "time_without_timezone",
},
"time_with_timezone": {
"type": ["null", "string"],
"format": "time",
"airbyte_type": "time_with_timezone",
},
"integer": {"type": ["null", "integer"]},
"number": {"type": ["null", "number"]},
"array": {"type": ["null", "array"]},
"object": {"type": ["null", "object"]},
}
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass(frozen=True)
class ComplexFieldType:
"""
Identifies complex field type
"""
field_type: str
items: Optional[Union[str, "ComplexFieldType"]] = None
def __post_init__(self) -> None:
"""
Enforces that `items` is only used when `field_type` is a array
"""
# `items_type` is valid only for array target types
if self.items and self.field_type != "array":
raise ValueError("'items' can only be used when 'field_type' is an array.")
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass(frozen=True)
class TypesMap:
"""
Represents a mapping between a current type and its corresponding target type.
"""
target_type: Union[List[str], str, ComplexFieldType]
current_type: Union[List[str], str]
condition: Optional[str]
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass
class SchemaTypeIdentifier:
"""
Identifies schema details for dynamic schema extraction and processing.
"""
key_pointer: List[Union[InterpolatedString, str]]
parameters: InitVar[Mapping[str, Any]]
type_pointer: Optional[List[Union[InterpolatedString, str]]] = None
types_mapping: Optional[List[TypesMap]] = None
schema_pointer: Optional[List[Union[InterpolatedString, str]]] = None
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.schema_pointer = (
self._update_pointer(self.schema_pointer, parameters) if self.schema_pointer else []
) # type: ignore[assignment] # This is reqired field in model
self.key_pointer = self._update_pointer(self.key_pointer, parameters) # type: ignore[assignment] # This is reqired field in model
self.type_pointer = (
self._update_pointer(self.type_pointer, parameters) if self.type_pointer else None
)
@staticmethod
def _update_pointer(
pointer: Optional[List[Union[InterpolatedString, str]]], parameters: Mapping[str, Any]
) -> Optional[List[Union[InterpolatedString, str]]]:
return (
[
InterpolatedString.create(path, parameters=parameters)
if isinstance(path, str)
else path
for path in pointer
]
if pointer
else None
)
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AdditionalPropertyFieldsInferrer(ABC):
"""
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
},
<...>
}
```
... will look like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
"toto": "tata"
},
<...>
}
```
"""
@abstractmethod
def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Infers additional property fields from the given property definition.
"""
pass
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass
class DynamicSchemaLoader(SchemaLoader):
"""
Dynamically loads a JSON Schema by extracting data from retrieved records.
"""
retriever: Retriever
config: Config
parameters: InitVar[Mapping[str, Any]]
schema_type_identifier: SchemaTypeIdentifier
schema_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
additional_property_fields_inferrer: Optional[AdditionalPropertyFieldsInferrer] = None
allow_additional_properties: bool = True
def get_json_schema(self) -> Mapping[str, Any]:
"""
Constructs a JSON Schema based on retrieved data.
"""
properties = {}
retrieved_record = next(self.retriever.read_records({}), None) # type: ignore[call-overload] # read_records return Iterable data type
raw_schema = (
self._extract_data(
retrieved_record, # type: ignore[arg-type] # Expected that retrieved_record will be only Mapping[str, Any]
self.schema_type_identifier.schema_pointer,
)
if retrieved_record
else []
)
for property_definition in raw_schema:
key = self._get_key(property_definition, self.schema_type_identifier.key_pointer)
value = self._get_type(
property_definition,
self.schema_type_identifier.type_pointer,
)
value.update(
self.additional_property_fields_inferrer.infer(property_definition)
if self.additional_property_fields_inferrer
else {}
)
properties[key] = value
transformed_properties = self._transform(properties)
return {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": self.allow_additional_properties,
"properties": transformed_properties,
}
def _transform(
self,
properties: Mapping[str, Any],
) -> Mapping[str, Any]:
for transformation in self.schema_transformations:
transformation.transform(
properties, # type: ignore # properties has type Mapping[str, Any], but Dict[str, Any] expected
config=self.config,
)
return properties
def _get_key(
self,
raw_schema: MutableMapping[str, Any],
field_key_path: List[Union[InterpolatedString, str]],
) -> str:
"""
Extracts the key field from the schema using the specified path.
"""
field_key = self._extract_data(raw_schema, field_key_path)
if not isinstance(field_key, str):
raise ValueError(f"Expected key to be a string. Got {field_key}")
return field_key
def _get_type(
self,
raw_schema: MutableMapping[str, Any],
field_type_path: Optional[List[Union[InterpolatedString, str]]],
) -> Dict[str, Any]:
"""
Determines the JSON Schema type for a field, supporting nullable and combined types.
"""
raw_field_type = (
self._extract_data(raw_schema, field_type_path, default="string")
if field_type_path
else "string"
)
mapped_field_type = self._replace_type_if_not_valid(raw_field_type, raw_schema)
if (
isinstance(mapped_field_type, list)
and len(mapped_field_type) == 2
and all(isinstance(item, str) for item in mapped_field_type)
):
first_type = self._get_airbyte_type(mapped_field_type[0])
second_type = self._get_airbyte_type(mapped_field_type[1])
return {"oneOf": [first_type, second_type]}
elif isinstance(mapped_field_type, str):
return self._get_airbyte_type(mapped_field_type)
elif isinstance(mapped_field_type, ComplexFieldType):
return self._resolve_complex_type(mapped_field_type)
else:
raise ValueError(
f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}."
)
def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Dict[str, Any]:
if not complex_type.items:
return self._get_airbyte_type(complex_type.field_type)
field_type = self._get_airbyte_type(complex_type.field_type)
field_type["items"] = (
self._get_airbyte_type(complex_type.items)
if isinstance(complex_type.items, str)
else self._resolve_complex_type(complex_type.items)
)
return field_type
def _replace_type_if_not_valid(
self,
field_type: Union[List[str], str],
raw_schema: MutableMapping[str, Any],
) -> Union[List[str], str, ComplexFieldType]:
"""
Replaces a field type if it matches a type mapping in `types_map`.
"""
if self.schema_type_identifier.types_mapping:
for types_map in self.schema_type_identifier.types_mapping:
# conditional is optional param, setting to true if not provided
condition = InterpolatedBoolean(
condition=types_map.condition if types_map.condition is not None else "True",
parameters={},
).eval(config=self.config, raw_schema=raw_schema)
if field_type == types_map.current_type and condition:
return types_map.target_type
return field_type
@staticmethod
def _get_airbyte_type(field_type: str) -> Dict[str, Any]:
"""
Maps a field type to its corresponding Airbyte type definition.
"""
if field_type not in AIRBYTE_DATA_TYPES:
raise ValueError(f"Invalid Airbyte data type: {field_type}")
return deepcopy(AIRBYTE_DATA_TYPES[field_type]) # type: ignore # a copy of a dict should be a dict, not a MutableMapping
def _extract_data(
self,
body: Mapping[str, Any],
extraction_path: Optional[List[Union[InterpolatedString, str]]] = None,
default: Any = None,
) -> Any:
"""
Extracts data from the body based on the provided extraction path.
"""
if not extraction_path:
return body
path = [
node.eval(self.config) if not isinstance(node, str) else node
for node in extraction_path
]
return dpath.get(body, path, default=default) # type: ignore # extracted will be a MutableMapping, given input data structure