Skip to content
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte_cdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
create_connector_config_control_message,
emit_configuration_as_airbyte_control_message,
)
from .connector import BaseConnector, Connector
from .connector import BaseConnector
Comment thread
aaronsteers marked this conversation as resolved.
Outdated
from .destinations import Destination
from .entrypoint import AirbyteEntrypoint, launch
from .logger import AirbyteLogFormatter, init_logger
Expand Down
15 changes: 5 additions & 10 deletions airbyte_cdk/cli/source_declarative_manifest/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
)
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.source import TState
from airbyte_cdk.utils.cli_arg_parse import parse_cli_args
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now


Expand Down Expand Up @@ -93,7 +94,7 @@ def handle_command(args: list[str]) -> None:

def _get_local_yaml_source(args: list[str]) -> SourceLocalYaml:
try:
parsed_args = AirbyteEntrypoint.parse_args(args)
parsed_args = parse_cli_args(args)
config, catalog, state = _parse_inputs_into_config_catalog_state(parsed_args)
return SourceLocalYaml(config=config, catalog=catalog, state=state)
except Exception as error:
Expand All @@ -119,10 +120,7 @@ def _get_local_yaml_source(args: list[str]) -> SourceLocalYaml:

def handle_local_manifest_command(args: list[str]) -> None:
source = _get_local_yaml_source(args)
launch(
source=source,
args=args,
)
source.launch_with_cli_args(args)


def handle_remote_manifest_command(args: list[str]) -> None:
Expand All @@ -149,10 +147,7 @@ def handle_remote_manifest_command(args: list[str]) -> None:
print(AirbyteEntrypoint.airbyte_message_to_string(message))
else:
source = create_declarative_source(args)
launch(
source=source,
args=args,
)
source.launch_with_cli_args(args=args)


def create_declarative_source(
Expand All @@ -169,7 +164,7 @@ def create_declarative_source(
catalog: ConfiguredAirbyteCatalog | None
state: list[AirbyteStateMessage]

parsed_args = AirbyteEntrypoint.parse_args(args)
parsed_args = parse_cli_args(args)
config, catalog, state = _parse_inputs_into_config_catalog_state(parsed_args)

if config is None:
Expand Down
144 changes: 86 additions & 58 deletions airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,30 @@
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Generic, Mapping, MutableMapping, Optional, Protocol, TypeVar
from pathlib import Path
from typing import Any, Generic, Mapping, Optional, TypeVar

import yaml
from typing_extensions import Self

from airbyte_cdk.models import (
AirbyteConnectionStatus,
ConnectorSpecification,
ConnectorSpecificationSerializer,
)
from airbyte_cdk.models import AirbyteConnectionStatus
from airbyte_cdk.models.airbyte_protocol import AirbyteMessage, ConnectorSpecification, Type
from airbyte_cdk.sources.message.repository import MessageRepository, PassthroughMessageRepository
from airbyte_cdk.utils.cli_arg_parse import ConnectorCLIArgs, parse_cli_args


def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
def _load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
"""Gets a resource from a package, returning None if it does not exist"""
try:
return pkgutil.get_data(package, filename)
except FileNotFoundError:
return None


def _write_config(config: Mapping[str, Any], config_path: str) -> None:
Path(config_path).write_text(json.dumps(config))


TConfig = TypeVar("TConfig", bound=Mapping[str, Any])


Expand All @@ -35,37 +40,19 @@ class BaseConnector(ABC, Generic[TConfig]):
check_config_against_spec: bool = True

@abstractmethod
def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
"""
Persist config in temporary directory to run the Source job
"""

@staticmethod
def read_config(config_path: str) -> MutableMapping[str, Any]:
config = BaseConnector._read_json_file(config_path)
if isinstance(config, MutableMapping):
return config
else:
raise ValueError(
f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
)

@staticmethod
def _read_json_file(file_path: str) -> Any:
with open(file_path, "r") as file:
contents = file.read()

try:
return json.loads(contents)
except json.JSONDecodeError as error:
raise ValueError(
f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON."
)

@staticmethod
def write_config(config: TConfig, config_path: str) -> None:
with open(config_path, "w") as fh:
fh.write(json.dumps(config))
@classmethod
def to_typed_config(
cls,
config: Mapping[str, Any],
) -> TConfig:
"""Return a typed config object from a config dictionary."""
...

@classmethod
def configure(cls, config: Mapping[str, Any], temp_dir: str) -> TConfig:
config_path = os.path.join(temp_dir, "config.json")
_write_config(config, config_path)
return cls.to_typed_config(config)

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Expand All @@ -75,8 +62,8 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:

package = self.__class__.__module__.split(".")[0]

yaml_spec = load_optional_package_file(package, "spec.yaml")
json_spec = load_optional_package_file(package, "spec.json")
yaml_spec = _load_optional_package_file(package, "spec.yaml")
json_spec = _load_optional_package_file(package, "spec.json")

if yaml_spec and json_spec:
raise RuntimeError(
Expand All @@ -95,7 +82,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
else:
raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.")

return ConnectorSpecificationSerializer.load(spec_obj)
return ConnectorSpecification.from_dict(spec_obj)

@abstractmethod
def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus:
Expand All @@ -104,20 +91,61 @@ def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionSta
to the Stripe API.
"""


class _WriteConfigProtocol(Protocol):
@staticmethod
def write_config(config: Mapping[str, Any], config_path: str) -> None: ...


class DefaultConnectorMixin:
# can be overridden to change an input config
def configure(
self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str
) -> Mapping[str, Any]:
config_path = os.path.join(temp_dir, "config.json")
self.write_config(config, config_path)
return config


class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC): ...
@abstractmethod
@classmethod
def create_with_cli_args(
cls,
cli_args: ConnectorCLIArgs,
) -> Self:
"""Return an instance of the connector, using the provided CLI args."""
...

@classmethod
def launch_with_cli_args(
cls,
args: list[str],
*,
logger: logging.Logger | None = None,
message_repository: MessageRepository | None = None,
# TODO: Add support for inputs:
# stdin: StringIO | MessageRepository | None = None,
) -> None:
"""Launches the connector with the provided configuration."""
logger = logger or logging.getLogger(f"airbyte.{type(cls).__name__}")
message_repository = message_repository or PassthroughMessageRepository()
parsed_cli_args: ConnectorCLIArgs = parse_cli_args(
args,
with_read=True if getattr(cls, "read", False) else False,
with_write=True if getattr(cls, "write", False) else False,
with_discover=True if getattr(cls, "discover", False) else False,
)
logger.info(f"Launching connector with args: {parsed_cli_args}")
verb = parsed_cli_args.command

spec: ConnectorSpecification
if verb == "check":
config = cls.to_typed_config(parsed_cli_args.get_config_dict())
connector = cls.create_with_cli_args(parsed_cli_args)
connector.check(logger, config)
elif verb == "spec":
connector = cls()
spec = connector.spec(logger)
message_repository.emit_message(
AirbyteMessage(
type=Type.SPEC,
spec=spec,
)
)
elif verb == "discover":
connector = cls()
spec = connector.spec(logger)
print(json.dumps(spec.to_dict(), indent=2))
elif verb == "read":
# Implementation for reading data goes here
pass
elif verb == "write":
# Implementation for writing data goes here
pass
else:
raise ValueError(f"Unknown command: {verb}")
# Implementation for launching the connector goes here
3 changes: 2 additions & 1 deletion airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.source import Source
from airbyte_cdk.utils.cli_arg_parse import parse_cli_args
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


Expand All @@ -35,7 +36,7 @@ def get_config_and_catalog_from_args(
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
# TODO: Add functionality for the `debug` logger.
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
parsed_args = AirbyteEntrypoint.parse_args(args)
parsed_args = parse_cli_args(args)
config_path, catalog_path, state_path = (
parsed_args.config,
parsed_args.catalog,
Expand Down
63 changes: 14 additions & 49 deletions airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import sys
from abc import ABC, abstractmethod
from multiprocessing import Value
from typing import Any, Iterable, List, Mapping

import orjson
Expand All @@ -21,6 +22,7 @@
Type,
)
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
from airbyte_cdk.utils.cli_arg_parse import ConnectorCLIArgs, parse_cli_args
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

logger = logging.getLogger("airbyte")
Expand Down Expand Up @@ -68,54 +70,10 @@ def _run_write(
)
logger.info("Writing complete.")

def parse_args(self, args: List[str]) -> argparse.Namespace:
"""
:param args: commandline arguments
:return:
"""

parent_parser = argparse.ArgumentParser(add_help=False)
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")

# spec
subparsers.add_parser(
"spec", help="outputs the json configuration specification", parents=[parent_parser]
)

# check
check_parser = subparsers.add_parser(
"check", help="checks the config can be used to connect", parents=[parent_parser]
)
required_check_parser = check_parser.add_argument_group("required named arguments")
required_check_parser.add_argument(
"--config", type=str, required=True, help="path to the json configuration file"
)

# write
write_parser = subparsers.add_parser(
"write", help="Writes data to the destination", parents=[parent_parser]
)
write_required = write_parser.add_argument_group("required named arguments")
write_required.add_argument(
"--config", type=str, required=True, help="path to the JSON configuration file"
)
write_required.add_argument(
"--catalog", type=str, required=True, help="path to the configured catalog JSON file"
)

parsed_args = main_parser.parse_args(args)
cmd = parsed_args.command
if not cmd:
raise Exception("No command entered. ")
elif cmd not in ["spec", "check", "write"]:
# This is technically dead code since parse_args() would fail if this was the case
# But it's non-obvious enough to warrant placing it here anyways
raise Exception(f"Unknown command entered: {cmd}")

return parsed_args

def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
def run_cmd(
self,
parsed_args: ConnectorCLIArgs,
) -> Iterable[AirbyteMessage]:
cmd = parsed_args.command
if cmd not in self.VALID_CMDS:
raise Exception(f"Unrecognized command: {cmd}")
Expand All @@ -138,6 +96,9 @@ def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
if cmd == "check":
yield self._run_check(config=config)
elif cmd == "write":
if not parsed_args.catalog:
raise ValueError("Catalog path is required for write command.")

# Wrap in UTF-8 to override any other input encodings
wrapped_stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8")
yield from self._run_write(
Expand All @@ -148,7 +109,11 @@ def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:

def run(self, args: List[str]) -> None:
init_uncaught_exception_handler(logger)
parsed_args = self.parse_args(args)
parsed_args: ConnectorCLIArgs = parse_cli_args(
args,
with_write=True,
with_read=False,
)
output_messages = self.run_cmd(parsed_args)
for message in output_messages:
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
Loading
Loading