-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathinferred_schema_loader.py
More file actions
112 lines (89 loc) · 4.3 KB
/
inferred_schema_loader.py
File metadata and controls
112 lines (89 loc) · 4.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
from collections.abc import Mapping as ABCMapping
from collections.abc import Sequence
from dataclasses import InitVar, dataclass
from typing import Any, Mapping
from airbyte_cdk.models import AirbyteRecordMessage
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.types import Config
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer
def _to_builtin_types(value: Any) -> Any:
"""
Recursively convert Mapping-like and Sequence-like objects to plain Python types.
This is necessary because genson's schema inference doesn't handle custom Mapping
or Sequence implementations properly. We need to convert everything to plain dicts,
lists, and primitive types.
Args:
value: The value to convert
Returns:
The value converted to plain Python types
"""
if isinstance(value, ABCMapping):
return {k: _to_builtin_types(v) for k, v in value.items()}
elif isinstance(value, (list, tuple)):
return [_to_builtin_types(item) for item in value]
elif isinstance(value, Sequence) and not isinstance(value, (str, bytes)):
return [_to_builtin_types(item) for item in value]
else:
return value
@dataclass
class InferredSchemaLoader(SchemaLoader):
"""
Infers a JSON Schema by reading a sample of records from the stream at discover time.
This schema loader reads up to `record_sample_size` records from the stream and uses
the SchemaInferrer to generate a JSON schema based on the structure of those records.
This is useful for streams where the schema is not known in advance or changes dynamically.
Attributes:
retriever (Retriever): The retriever used to fetch records from the stream
config (Config): The user-provided configuration as specified by the source's spec
parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
record_sample_size (int): The maximum number of records to read for schema inference. Defaults to 100.
stream_name (str): The name of the stream for which to infer the schema
"""
retriever: Retriever
config: Config
parameters: InitVar[Mapping[str, Any]]
record_sample_size: int = 100
stream_name: str = ""
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
if not self.stream_name:
self.stream_name = parameters.get("name", "")
if not self.stream_name:
raise ValueError(
"stream_name must be provided either directly or via the 'name' parameter"
)
def get_json_schema(self) -> Mapping[str, Any]:
"""
Infers and returns a JSON schema by reading a sample of records from the stream.
This method reads up to `record_sample_size` records from the stream and uses
the SchemaInferrer to generate a JSON schema. If no records are available,
it returns an empty schema.
Returns:
A mapping representing the inferred JSON schema for the stream
"""
schema_inferrer = SchemaInferrer()
record_count = 0
for stream_slice in self.retriever.stream_slices():
for record in self.retriever.read_records(records_schema={}, stream_slice=stream_slice):
if record_count >= self.record_sample_size:
break
# Convert all Mapping-like and Sequence-like objects to plain Python types
# This is necessary because genson doesn't handle custom implementations properly
record = _to_builtin_types(record)
airbyte_record = AirbyteRecordMessage(
stream=self.stream_name,
data=record, # type: ignore[arg-type]
emitted_at=0,
)
schema_inferrer.accumulate(airbyte_record)
record_count += 1
if record_count >= self.record_sample_size:
break
inferred_schema: Mapping[str, Any] | None = schema_inferrer.get_stream_schema(
self.stream_name
)
return inferred_schema if inferred_schema else {}