-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathcatalog_providers.py
More file actions
155 lines (126 loc) · 5.64 KB
/
catalog_providers.py
File metadata and controls
155 lines (126 loc) · 5.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Catalog provider implementation.
A catalog provider wraps a configured catalog and configured streams. This class is responsible for
providing information about the catalog and streams. A catalog provider can also be updated with new
streams as they are discovered, providing a thin layer of abstraction over the configured catalog.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, cast, final
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sql import exceptions as exc
from airbyte_cdk.sql._util.name_normalizers import LowerCaseNormalizer
if TYPE_CHECKING:
from airbyte_cdk.models import ConfiguredAirbyteStream
class CatalogProvider:
"""A catalog provider wraps a configured catalog and configured streams.
This class is responsible for providing information about the catalog and streams.
Note:
- The catalog provider is not responsible for managing the catalog or streams but it may
be updated with new streams as they are discovered.
"""
def __init__(
self,
configured_catalog: ConfiguredAirbyteCatalog,
) -> None:
"""Initialize the catalog manager with a catalog object reference.
Since the catalog is passed by reference, the catalog manager may be updated with new
streams as they are discovered.
"""
self._catalog: ConfiguredAirbyteCatalog = self.validate_catalog(configured_catalog)
@staticmethod
def validate_catalog(catalog: ConfiguredAirbyteCatalog) -> Any:
"""Validate the catalog to ensure it is valid.
This requires ensuring that `generationId` and `minGenerationId` are both set. If
not, both values will be set to `1`.
"""
for stream in catalog.streams:
if stream.generation_id is None:
stream.generation_id = 1
if stream.minimum_generation_id is None:
stream.minimum_generation_id = 1
if stream.sync_id is None:
stream.sync_id = 1 # This should ideally increment monotonically with each sync.
return catalog
@property
def configured_catalog(self) -> ConfiguredAirbyteCatalog:
"""Return the configured catalog."""
return self._catalog
@property
def stream_names(self) -> list[str]:
"""Return the names of the streams in the catalog."""
return list({stream.stream.name for stream in self.configured_catalog.streams})
def get_configured_stream_info(
self,
stream_name: str,
) -> ConfiguredAirbyteStream:
"""Return the column definitions for the given stream."""
if not self.configured_catalog:
raise exc.AirbyteInternalError(
message="Cannot get stream JSON schema without a catalog.",
)
matching_streams: list[ConfiguredAirbyteStream] = [
stream
for stream in self.configured_catalog.streams
if stream.stream.name == stream_name
]
if not matching_streams:
raise exc.AirbyteStreamNotFoundError(
stream_name=stream_name,
context={
"available_streams": [
stream.stream.name for stream in self.configured_catalog.streams
],
},
)
if len(matching_streams) > 1:
raise exc.AirbyteInternalError(
message="Multiple streams found with same name.",
context={
"stream_name": stream_name,
},
)
return matching_streams[0]
@final
def get_stream_json_schema(
self,
stream_name: str,
) -> dict[str, Any]:
"""Return the column definitions for the given stream."""
return cast(dict[str, Any], self.get_configured_stream_info(stream_name).stream.json_schema)
def get_stream_properties(
self,
stream_name: str,
) -> dict[str, dict[str, Any]]:
"""Return the names of the top-level properties for the given stream."""
return cast(dict[str, Any], self.get_stream_json_schema(stream_name)["properties"])
def get_primary_keys(
self,
stream_name: str,
) -> list[str] | None:
"""Return the primary key column names for the given stream.
We return `source_defined_primary_key` if set, or `primary_key` otherwise. If both are set,
we assume they should not should differ, since Airbyte data integrity constraints do not
permit overruling a source's pre-defined primary keys. If neither is set, we return `None`.
Returns:
A list of column names that constitute the primary key, or None if no primary key is defined.
"""
configured_stream = self.get_configured_stream_info(stream_name)
pks = configured_stream.stream.source_defined_primary_key or configured_stream.primary_key
if not pks:
return None
normalized_pks: list[list[str]] = [
[LowerCaseNormalizer.normalize(c) for c in pk] for pk in pks
]
for pk_nodes in normalized_pks:
if len(pk_nodes) != 1:
raise exc.AirbyteError(
message=(
"Nested primary keys are not supported. "
"Each PK column should have exactly one node. "
),
context={
"stream_name": stream_name,
"primary_key_nodes": pk_nodes,
},
)
return [pk_nodes[0] for pk_nodes in normalized_pks]