Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 69 additions & 10 deletions airbyte_cdk/cli/source_declarative_manifest/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@

from __future__ import annotations

import argparse
import json
import pkgutil
import sys
import traceback
from collections.abc import Mapping
from collections.abc import MutableMapping
from pathlib import Path
from typing import Any, cast

import orjson
import yaml

from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
from airbyte_cdk.models import (
Expand Down Expand Up @@ -54,7 +56,7 @@ class SourceLocalYaml(YamlDeclarativeSource):
def __init__(
self,
catalog: ConfiguredAirbyteCatalog | None,
config: Mapping[str, Any] | None,
config: MutableMapping[str, Any] | None,
state: TState,
**kwargs: Any,
) -> None:
Expand Down Expand Up @@ -91,7 +93,8 @@ def handle_command(args: list[str]) -> None:

def _get_local_yaml_source(args: list[str]) -> SourceLocalYaml:
try:
config, catalog, state = _parse_inputs_into_config_catalog_state(args)
parsed_args = AirbyteEntrypoint.parse_args(args)
config, catalog, state = _parse_inputs_into_config_catalog_state(parsed_args)
return SourceLocalYaml(config=config, catalog=catalog, state=state)
except Exception as error:
Comment thread
ChristoGrab marked this conversation as resolved.
print(
Expand Down Expand Up @@ -162,21 +165,40 @@ def create_declarative_source(
connector builder.
"""
try:
config: Mapping[str, Any] | None
config: MutableMapping[str, Any] | None
catalog: ConfiguredAirbyteCatalog | None
state: list[AirbyteStateMessage]
config, catalog, state = _parse_inputs_into_config_catalog_state(args)
if config is None or "__injected_declarative_manifest" not in config:

parsed_args = AirbyteEntrypoint.parse_args(args)
config, catalog, state = _parse_inputs_into_config_catalog_state(parsed_args)

if config is None:
raise ValueError(
"Invalid config: `__injected_declarative_manifest` should be provided at the root "
"of the config or using the --manifest-path argument."
)

Comment thread
ChristoGrab marked this conversation as resolved.
# If a manifest_path is provided in the args, inject it into the config
if hasattr(parsed_args, "manifest_path") and parsed_args.manifest_path:
injected_manifest = _parse_manifest_from_file(parsed_args.manifest_path)
if injected_manifest:
config["__injected_declarative_manifest"] = injected_manifest

if "__injected_declarative_manifest" not in config:
raise ValueError(
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"Invalid config: `__injected_declarative_manifest` should be provided at the root "
f"of the config but config only has keys: {list(config.keys() if config else [])}"
"of the config or using the --manifest-path argument. "
f"Config only has keys: {list(config.keys() if config else [])}"
)
if not isinstance(config["__injected_declarative_manifest"], dict):
raise ValueError(
"Invalid config: `__injected_declarative_manifest` should be a dictionary, "
f"but got type: {type(config['__injected_declarative_manifest'])}"
)

if hasattr(parsed_args, "components_path") and parsed_args.components_path:
_register_components_from_file(parsed_args.components_path)

return ConcurrentDeclarativeSource(
config=config,
catalog=catalog,
Expand Down Expand Up @@ -205,13 +227,12 @@ def create_declarative_source(


def _parse_inputs_into_config_catalog_state(
args: list[str],
parsed_args: argparse.Namespace,
) -> tuple[
Mapping[str, Any] | None,
MutableMapping[str, Any] | None,
ConfiguredAirbyteCatalog | None,
list[AirbyteStateMessage],
]:
parsed_args = AirbyteEntrypoint.parse_args(args)
config = (
ConcurrentDeclarativeSource.read_config(parsed_args.config)
if hasattr(parsed_args, "config")
Expand All @@ -231,6 +252,44 @@ def _parse_inputs_into_config_catalog_state(
return config, catalog, state


def _parse_manifest_from_file(filepath: str) -> dict[str, Any] | None:
"""Extract and parse a manifest file specified in the args."""
try:
with open(filepath, "r", encoding="utf-8") as manifest_file:
manifest_content = yaml.safe_load(manifest_file)
if manifest_content is None:
raise ValueError(f"Manifest file at {filepath} is empty")
if not isinstance(manifest_content, dict):
raise ValueError(f"Manifest must be a dictionary, got {type(manifest_content)}")
return manifest_content
except Exception as error:
raise ValueError(f"Failed to load manifest file from {filepath}: {error}")


def _register_components_from_file(filepath: str) -> None:
"""Load and register components from a Python file specified in the args."""
import importlib.util
import sys

components_path = Path(filepath)

module_name = "components"
sdm_module_name = "source_declarative_manifest.components"
Comment thread
ChristoGrab marked this conversation as resolved.

# Create module spec
spec = importlib.util.spec_from_file_location(module_name, components_path)
if spec is None or spec.loader is None:
raise ImportError(f"Could not load module from {components_path}")

# Create module and execute code, registering the module before executing its code
# To avoid issues with dataclasses that look up the module
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
sys.modules[sdm_module_name] = module

spec.loader.exec_module(module)


def run() -> None:
args: list[str] = sys.argv[1:]
handle_command(args)
6 changes: 3 additions & 3 deletions airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar
from typing import Any, Generic, Mapping, MutableMapping, Optional, Protocol, TypeVar

import yaml

Expand Down Expand Up @@ -41,9 +41,9 @@ def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
"""

@staticmethod
def read_config(config_path: str) -> Mapping[str, Any]:
def read_config(config_path: str) -> MutableMapping[str, Any]:
config = BaseConnector._read_json_file(config_path)
if isinstance(config, Mapping):
if isinstance(config, MutableMapping):
return config
else:
raise ValueError(
Expand Down
36 changes: 36 additions & 0 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ def parse_args(args: List[str]) -> argparse.Namespace:
required_check_parser.add_argument(
"--config", type=str, required=True, help="path to the json configuration file"
)
check_parser.add_argument(
"--manifest-path",
type=str,
required=False,
help="path to the YAML manifest file to inject into the config",
)
check_parser.add_argument(
"--components-path",
type=str,
required=False,
help="path to the custom components file, if it exists",
)

# discover
discover_parser = subparsers.add_parser(
Expand All @@ -95,6 +107,18 @@ def parse_args(args: List[str]) -> argparse.Namespace:
required_discover_parser.add_argument(
"--config", type=str, required=True, help="path to the json configuration file"
)
discover_parser.add_argument(
"--manifest-path",
type=str,
required=False,
help="path to the YAML manifest file to inject into the config",
)
discover_parser.add_argument(
"--components-path",
type=str,
required=False,
help="path to the custom components file, if it exists",
)

# read
read_parser = subparsers.add_parser(
Expand All @@ -114,6 +138,18 @@ def parse_args(args: List[str]) -> argparse.Namespace:
required=True,
help="path to the catalog used to determine which data to read",
)
read_parser.add_argument(
"--manifest-path",
type=str,
required=False,
help="path to the YAML manifest file to inject into the config",
)
read_parser.add_argument(
"--components-path",
type=str,
required=False,
help="path to the custom components file, if it exists",
)

return main_parser.parse_args(args)

Expand Down
49 changes: 49 additions & 0 deletions unit_tests/source_declarative_manifest/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import sys
import tempfile
from pathlib import Path
from typing import Generator

import pytest
import yaml
Expand Down Expand Up @@ -52,3 +55,49 @@ def valid_local_config_file():
@pytest.fixture
def invalid_local_config_file():
return get_resource_path("invalid_local_pokeapi_config.json")


# Sample component code for testing
SAMPLE_COMPONENTS_PY_TEXT = """
def sample_function() -> str:
return "Hello, World!"

class SimpleClass:
def sample_method(self) -> str:
return sample_function()
"""


def verify_components_loaded() -> None:
"""Verify that components were properly loaded."""
import components

assert hasattr(components, "sample_function")
assert components.sample_function() == "Hello, World!"

# Verify the components module is registered in sys.modules
assert "components" in sys.modules
assert "source_declarative_manifest.components" in sys.modules

# Verify they are the same module
assert sys.modules["components"] is sys.modules["source_declarative_manifest.components"]


@pytest.fixture
def components_file() -> Generator[str, None, None]:
"""Create a temporary file with sample components code and clean up modules afterwards."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as temp_file:
temp_file.write(SAMPLE_COMPONENTS_PY_TEXT)
temp_file.flush()
file_path = temp_file.name

try:
yield file_path
finally:
# Clean up the modules
if "components" in sys.modules:
del sys.modules["components"]
if "source_declarative_manifest.components" in sys.modules:
del sys.modules["source_declarative_manifest.components"]
# Clean up the temporary file
Path(file_path).unlink(missing_ok=True)
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from pathlib import Path
from unittest.mock import mock_open, patch

import pytest

from airbyte_cdk.cli.source_declarative_manifest._run import (
_parse_manifest_from_file,
create_declarative_source,
handle_command,
)
Expand All @@ -27,3 +31,11 @@ def test_given_no_injected_declarative_manifest_then_raise_value_error(invalid_r
def test_given_injected_declarative_manifest_then_return_declarative_manifest(valid_remote_config):
source = create_declarative_source(["check", "--config", str(valid_remote_config)])
assert isinstance(source, ManifestDeclarativeSource)


def test_parse_manifest_from_file(valid_remote_config: Path) -> None:
mock_manifest_content = '{"test_manifest": "fancy_declarative_components"}'
with patch("builtins.open", mock_open(read_data=mock_manifest_content)):
# Test with manifest path
result = _parse_manifest_from_file("manifest.yaml")
assert result == {"test_manifest": "fancy_declarative_components"}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from airbyte_protocol_dataclasses.models.airbyte_protocol import AirbyteCatalog

from airbyte_cdk.cli.source_declarative_manifest._run import (
_register_components_from_file,
create_declarative_source,
)
from airbyte_cdk.models import ConfiguredAirbyteCatalog, ConfiguredAirbyteStream
Expand All @@ -33,15 +34,10 @@
register_components_module_from_string,
)
from airbyte_cdk.utils.connector_paths import MANIFEST_YAML

SAMPLE_COMPONENTS_PY_TEXT = """
def sample_function() -> str:
return "Hello, World!"

class SimpleClass:
def sample_method(self) -> str:
return sample_function()
"""
from unit_tests.source_declarative_manifest.conftest import (
SAMPLE_COMPONENTS_PY_TEXT,
verify_components_loaded,
)


def get_resource_path(file_name) -> str:
Expand Down Expand Up @@ -288,3 +284,12 @@ def _read_fn(*args, **kwargs):
_read_fn()
else:
_read_fn()


def test_register_components_from_file(components_file: str) -> None:
"""Test that components can be properly loaded from a file."""
# Register the components
_register_components_from_file(components_file)

# Verify the components were loaded correctly
verify_components_loaded()
Loading
Loading