-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathconcurrent_source_adapter.py
More file actions
148 lines (131 loc) · 6.01 KB
/
concurrent_source_adapter.py
File metadata and controls
148 lines (131 loc) · 6.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from abc import ABC
from datetime import timedelta
from typing import Any, Callable, Iterator, List, Mapping, MutableMapping, Optional, Tuple
from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade
from airbyte_cdk.sources.streams.concurrent.cursor import (
ConcurrentCursor,
Cursor,
CursorField,
CursorValueType,
FinalStateCursor,
GapType,
)
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
AbstractStreamStateConverter,
)
DEFAULT_LOOKBACK_SECONDS = 0
class ConcurrentSourceAdapter(AbstractSource, ABC):
def __init__(self, concurrent_source: ConcurrentSource, **kwargs: Any) -> None:
"""
ConcurrentSourceAdapter is a Source that wraps a concurrent source and exposes it as a regular source.
The source's streams are still defined through the streams() method.
Streams wrapped in a StreamFacade will be processed concurrently.
Other streams will be processed sequentially as a later step.
"""
self._concurrent_source = concurrent_source
super().__init__(**kwargs)
def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Optional[List[AirbyteStateMessage]] = None,
) -> Iterator[AirbyteMessage]:
abstract_streams = self._select_abstract_streams(config, catalog)
concurrent_stream_names = {stream.name for stream in abstract_streams}
configured_catalog_for_regular_streams = ConfiguredAirbyteCatalog(
streams=[
stream
for stream in catalog.streams
if stream.stream.name not in concurrent_stream_names
]
)
if abstract_streams:
yield from self._concurrent_source.read(abstract_streams)
if configured_catalog_for_regular_streams.streams:
yield from super().read(logger, config, configured_catalog_for_regular_streams, state)
def _select_abstract_streams(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog
) -> List[AbstractStream]:
"""
Selects streams that can be processed concurrently and returns their abstract representations.
"""
all_streams = self.streams(config)
stream_name_to_instance: Mapping[str, Stream] = {s.name: s for s in all_streams}
abstract_streams: List[AbstractStream] = []
for configured_stream in configured_catalog.streams:
stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
if not stream_instance:
continue
if isinstance(stream_instance, AbstractStreamFacade):
abstract_streams.append(stream_instance.get_underlying_stream())
return abstract_streams
def convert_to_concurrent_stream(
self,
logger: logging.Logger,
stream: Stream,
state_manager: ConnectorStateManager,
cursor: Optional[Cursor] = None,
) -> Stream:
"""
Prepares a stream for concurrent processing by initializing or assigning a cursor,
managing the stream's state, and returning an updated Stream instance.
"""
state: MutableMapping[str, Any] = {}
if cursor:
state = state_manager.get_stream_state(stream.name, stream.namespace)
stream.cursor = cursor # type: ignore[assignment] # cursor is of type ConcurrentCursor, which inherits from Cursor
if hasattr(stream, "parent"):
# pyrefly: ignore # missing-attribute
stream.parent.cursor = cursor
else:
cursor = FinalStateCursor(
stream_name=stream.name,
stream_namespace=stream.namespace,
message_repository=self.message_repository, # type: ignore[arg-type] # _default_message_repository will be returned in the worst case
)
return StreamFacade.create_from_stream(stream, self, logger, state, cursor)
def initialize_cursor(
self,
stream: Stream,
state_manager: ConnectorStateManager,
converter: AbstractStreamStateConverter,
slice_boundary_fields: Optional[Tuple[str, str]],
start: Optional[CursorValueType],
end_provider: Callable[[], CursorValueType],
lookback_window: Optional[GapType] = None,
slice_range: Optional[GapType] = None,
) -> Optional[ConcurrentCursor]:
lookback_window = lookback_window or timedelta(seconds=DEFAULT_LOOKBACK_SECONDS)
cursor_field_name = stream.cursor_field
if cursor_field_name:
if not isinstance(cursor_field_name, str):
raise ValueError(
f"Cursor field type must be a string, but received {type(cursor_field_name).__name__}."
)
return ConcurrentCursor(
stream.name,
stream.namespace,
state_manager.get_stream_state(stream.name, stream.namespace),
self.message_repository, # type: ignore[arg-type] # _default_message_repository will be returned in the worst case
state_manager,
converter,
CursorField(cursor_field_name),
slice_boundary_fields,
start,
end_provider,
lookback_window,
slice_range,
)
return None