-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathmain.py
More file actions
110 lines (93 loc) · 3.75 KB
/
main.py
File metadata and controls
110 lines (93 loc) · 3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import sys
from typing import Any, List, Mapping, Optional, Tuple
import orjson
from airbyte_cdk.connector import BaseConnector
from airbyte_cdk.connector_builder.connector_builder_handler import (
TestReadLimits,
create_source,
full_resolve_manifest,
get_limits,
read_stream,
resolve_manifest,
)
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteMessageSerializer,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteCatalogSerializer,
)
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.source import Source
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
def get_config_and_catalog_from_args(
args: List[str],
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
# TODO: Add functionality for the `debug` logger.
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
parsed_args = AirbyteEntrypoint.parse_args(args)
config_path, catalog_path, state_path = (
parsed_args.config,
parsed_args.catalog,
parsed_args.state,
)
if parsed_args.command != "read":
raise ValueError("Only read commands are allowed for Connector Builder requests.")
config = BaseConnector.read_config(config_path)
if "__command" not in config:
raise ValueError(
f"Invalid config: `__command` should be provided at the root of the config but config only has keys {list(config.keys())}"
)
command = config["__command"]
if command == "test_read":
catalog = ConfiguredAirbyteCatalogSerializer.load(BaseConnector.read_config(catalog_path))
state = Source.read_state(state_path)
else:
catalog = None
state = []
if "__injected_declarative_manifest" not in config:
raise ValueError(
f"Invalid config: `__injected_declarative_manifest` should be provided at the root of the config but config only has keys {list(config.keys())}"
)
return command, config, catalog, state
def handle_connector_builder_request(
source: ManifestDeclarativeSource,
command: str,
config: Mapping[str, Any],
catalog: Optional[ConfiguredAirbyteCatalog],
state: List[AirbyteStateMessage],
limits: TestReadLimits,
) -> AirbyteMessage:
if command == "resolve_manifest":
return resolve_manifest(source)
elif command == "test_read":
assert (
catalog is not None
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
return read_stream(source, config, catalog, state, limits)
elif command == "full_resolve_manifest":
return full_resolve_manifest(source)
else:
raise ValueError(f"Unrecognized command {command}.")
def handle_request(args: List[str]) -> str:
command, config, catalog, state = get_config_and_catalog_from_args(args)
limits = get_limits(config)
source = create_source(config, limits)
return orjson.dumps(
AirbyteMessageSerializer.dump(
handle_connector_builder_request(source, command, config, catalog, state, limits)
)
).decode() # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage
if __name__ == "__main__":
try:
print(handle_request(sys.argv[1:]))
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error handling request: {str(exc)}"
)
m = error.as_airbyte_message()
print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode())