-
Notifications
You must be signed in to change notification settings - Fork 42
Expand file tree
/
Copy pathprocessor.py
More file actions
121 lines (106 loc) · 3.74 KB
/
processor.py
File metadata and controls
121 lines (106 loc) · 3.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import logging
from typing import Any, List, Mapping, Optional, Tuple
from airbyte_protocol_dataclasses.models import (
AirbyteCatalog,
Status,
)
from fastapi import HTTPException
from airbyte_cdk.connector_builder.models import StreamRead
from airbyte_cdk.connector_builder.test_reader import TestReader
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import (
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
)
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
ConcurrentDeclarativeSource,
)
from airbyte_cdk.test.entrypoint_wrapper import AirbyteEntrypointException, EntrypointOutput
class ManifestCommandProcessor:
_source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]
_logger = logging.getLogger("airbyte.manifest-server")
def __init__(
self, source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]
) -> None:
self._source = source
def test_read(
self,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
record_limit: int,
page_limit: int,
slice_limit: int,
) -> StreamRead:
"""
Test the read method of the source.
"""
test_read_handler = TestReader(
max_pages_per_slice=page_limit,
max_slices=slice_limit,
max_record_limit=record_limit,
)
stream_read = test_read_handler.run_test_read(
source=self._source,
config=config,
configured_catalog=catalog,
state=state,
stream_name=catalog.streams[0].stream.name,
record_limit=record_limit,
)
return stream_read
def check_connection(
self,
config: Mapping[str, Any],
) -> Tuple[bool, Optional[str]]:
"""
Check the connection to the source.
"""
spec = self._source.spec(self._logger)
entrypoint = AirbyteEntrypoint(source=self._source)
messages = entrypoint.check(spec, config)
output = EntrypointOutput(
messages=[AirbyteEntrypoint.airbyte_message_to_string(m) for m in messages],
command=["check"],
)
self._raise_on_trace_message(output)
status_messages = output.connection_status_messages
if not status_messages or status_messages[-1].connectionStatus is None:
return False, "Connection check did not return a status message"
connection_status = status_messages[-1].connectionStatus
return (
connection_status.status == Status.SUCCEEDED,
connection_status.message,
)
def discover(
self,
config: Mapping[str, Any],
) -> Optional[AirbyteCatalog]:
"""
Discover the catalog from the source.
"""
spec = self._source.spec(self._logger)
entrypoint = AirbyteEntrypoint(source=self._source)
messages = entrypoint.discover(spec, config)
output = EntrypointOutput(
messages=[AirbyteEntrypoint.airbyte_message_to_string(m) for m in messages],
command=["discover"],
)
self._raise_on_trace_message(output)
try:
catalog_message = output.catalog
return catalog_message.catalog
except ValueError:
# No catalog message found
return None
def _raise_on_trace_message(
self,
output: EntrypointOutput,
) -> None:
"""
Raise an exception if a trace message is found.
"""
try:
output.raise_if_errors()
except AirbyteEntrypointException as e:
raise HTTPException(status_code=422, detail=e.message)