-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathinferred_schema_loader.py
More file actions
77 lines (61 loc) · 2.96 KB
/
inferred_schema_loader.py
File metadata and controls
77 lines (61 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional
from airbyte_cdk.models import AirbyteRecordMessage
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.types import Config
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer
@dataclass
class InferredSchemaLoader(SchemaLoader):
"""
Infers a JSON Schema by reading a sample of records from the stream at discover time.
This schema loader reads up to `record_sample_size` records from the stream and uses
the SchemaInferrer to generate a JSON schema based on the structure of those records.
This is useful for streams where the schema is not known in advance or changes dynamically.
Attributes:
retriever (Retriever): The retriever used to fetch records from the stream
config (Config): The user-provided configuration as specified by the source's spec
parameters (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
record_sample_size (int): The maximum number of records to read for schema inference. Defaults to 100.
stream_name (str): The name of the stream for which to infer the schema
"""
retriever: Retriever
config: Config
parameters: InitVar[Mapping[str, Any]]
record_sample_size: int = 100
stream_name: str = ""
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
if not self.stream_name:
self.stream_name = parameters.get("name", "")
def get_json_schema(self) -> Mapping[str, Any]:
"""
Infers and returns a JSON schema by reading a sample of records from the stream.
This method reads up to `record_sample_size` records from the stream and uses
the SchemaInferrer to generate a JSON schema. If no records are available,
it returns an empty schema.
Returns:
A mapping representing the inferred JSON schema for the stream
"""
schema_inferrer = SchemaInferrer()
record_count = 0
try:
for record in self.retriever.read_records({}): # type: ignore[call-overload]
if record_count >= self.record_sample_size:
break
airbyte_record = AirbyteRecordMessage(
stream=self.stream_name,
data=record, # type: ignore[arg-type]
emitted_at=0, # Not used for schema inference
)
schema_inferrer.accumulate(airbyte_record)
record_count += 1
except Exception:
return {}
inferred_schema: Optional[Mapping[str, Any]] = schema_inferrer.get_stream_schema(
self.stream_name
)
return inferred_schema if inferred_schema else {}