-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathcatalog_builder.py
More file actions
89 lines (71 loc) · 3.21 KB
/
catalog_builder.py
File metadata and controls
89 lines (71 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from typing import Any, Dict, List, Union, overload
from airbyte_protocol_dataclasses.models import DestinationSyncMode
from airbyte_cdk.models import (
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
ConfiguredAirbyteStreamSerializer,
SyncMode,
)
class ConfiguredAirbyteStreamBuilder:
def __init__(self) -> None:
self._stream: Dict[str, Any] = {
"stream": {
"name": "any name",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_primary_key": [["id"]],
},
"primary_key": [["id"]],
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
def with_name(self, name: str) -> "ConfiguredAirbyteStreamBuilder":
self._stream["stream"]["name"] = name # type: ignore # we assume that self._stream["stream"] is a Dict[str, Any]
return self
def with_sync_mode(self, sync_mode: SyncMode) -> "ConfiguredAirbyteStreamBuilder":
self._stream["sync_mode"] = sync_mode.name
return self
def with_destination_sync_mode(
self, sync_mode: DestinationSyncMode
) -> "ConfiguredAirbyteStreamBuilder":
self._stream["destination_sync_mode"] = sync_mode.name
return self
def with_primary_key(self, pk: List[List[str]]) -> "ConfiguredAirbyteStreamBuilder":
self._stream["primary_key"] = pk
self._stream["stream"]["source_defined_primary_key"] = pk # type: ignore # we assume that self._stream["stream"] is a Dict[str, Any]
return self
def with_json_schema(self, json_schema: Dict[str, Any]) -> "ConfiguredAirbyteStreamBuilder":
self._stream["stream"]["json_schema"] = json_schema
return self
def build(self) -> ConfiguredAirbyteStream:
return ConfiguredAirbyteStreamSerializer.load(self._stream)
class CatalogBuilder:
def __init__(self) -> None:
self._streams: List[ConfiguredAirbyteStreamBuilder] = []
@overload
def with_stream(self, name: ConfiguredAirbyteStreamBuilder) -> "CatalogBuilder": ...
@overload
def with_stream(self, name: str, sync_mode: SyncMode) -> "CatalogBuilder": ...
def with_stream(
self,
name: Union[str, ConfiguredAirbyteStreamBuilder],
sync_mode: Union[SyncMode, None] = None,
) -> "CatalogBuilder":
# As we are introducing a fully fledge ConfiguredAirbyteStreamBuilder, we would like to deprecate the previous interface
# with_stream(str, SyncMode)
# to avoid a breaking change, `name` needs to stay in the API but this can be either a name or a builder
name_or_builder = name
builder = (
name_or_builder
if isinstance(name_or_builder, ConfiguredAirbyteStreamBuilder)
else ConfiguredAirbyteStreamBuilder()
.with_name(name_or_builder)
.with_sync_mode(sync_mode)
)
self._streams.append(builder)
return self
def build(self) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog(
streams=list(map(lambda builder: builder.build(), self._streams))
)