diff --git a/airbyte_cdk/cli/source_declarative_manifest/_run.py b/airbyte_cdk/cli/source_declarative_manifest/_run.py index f7a1c47e3..f161bf21f 100644 --- a/airbyte_cdk/cli/source_declarative_manifest/_run.py +++ b/airbyte_cdk/cli/source_declarative_manifest/_run.py @@ -243,8 +243,8 @@ def _parse_inputs_into_config_catalog_state( ]: config = ( ConcurrentDeclarativeSource.read_config(parsed_args.config) - if hasattr(parsed_args, "config") - else None + if hasattr(parsed_args, "config") and parsed_args.config + else {} ) catalog = ( ConcurrentDeclarativeSource.read_catalog(parsed_args.catalog) diff --git a/airbyte_cdk/connector.py b/airbyte_cdk/connector.py index a7c13dcda..3fbece681 100644 --- a/airbyte_cdk/connector.py +++ b/airbyte_cdk/connector.py @@ -31,8 +31,15 @@ def load_optional_package_file(package: str, filename: str) -> Optional[bytes]: class BaseConnector(ABC, Generic[TConfig]): - # configure whether the `check_config_against_spec_or_exit()` needs to be called check_config_against_spec: bool = True + """Configure whether `check_config_against_spec_or_exit()` needs to be called.""" + + check_config_during_discover: bool = True + """Determines whether config validation should be skipped during discovery. + + By default, config validation is not skipped during discovery. This can be overridden + by sources that can provide catalog information without requiring authentication. + """ @abstractmethod def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig: diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 54c207487..cd147f0dd 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -105,7 +105,7 @@ def parse_args(args: List[str]) -> argparse.Namespace: ) required_discover_parser = discover_parser.add_argument_group("required named arguments") required_discover_parser.add_argument( - "--config", type=str, required=True, help="path to the json configuration file" + "--config", type=str, required=False, help="path to the json configuration file" ) discover_parser.add_argument( "--manifest-path", @@ -177,19 +177,41 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: ) if cmd == "spec": message = AirbyteMessage(type=Type.SPEC, spec=source_spec) - yield from [ + yield from ( self.airbyte_message_to_string(queued_message) for queued_message in self._emit_queued_messages(self.source) - ] + ) yield self.airbyte_message_to_string(message) + elif ( + cmd == "discover" + and not parsed_args.config + and not self.source.check_config_during_discover + ): + # Connector supports unprivileged discover + empty_config: dict[str, Any] = {} + yield from ( + self.airbyte_message_to_string(queued_message) + for queued_message in self._emit_queued_messages(self.source) + ) + yield from map( + AirbyteEntrypoint.airbyte_message_to_string, + self.discover(source_spec, empty_config), + ) + elif parsed_args.config is None: + # Raise a helpful error message if we reach here with no config. + raise ValueError( + "The '--config' argument is required but was not provided. " + "This connector does not support unprivileged discovery. " + "Please provide a valid configuration file using the --config flag." + ) else: raw_config = self.source.read_config(parsed_args.config) config = self.source.configure(raw_config, temp_dir) - yield from [ + yield from ( self.airbyte_message_to_string(queued_message) for queued_message in self._emit_queued_messages(self.source) - ] + ) if cmd == "check": yield from map( AirbyteEntrypoint.airbyte_message_to_string, @@ -261,7 +283,7 @@ def discover( self, source_spec: ConnectorSpecification, config: TConfig ) -> Iterable[AirbyteMessage]: self.set_up_secret_filter(config, source_spec.connectionSpecification) - if self.source.check_config_against_spec: + if self.source.check_config_during_discover: self.validate_connection(source_spec, config) catalog = self.source.discover(self.logger, config) diff --git a/airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py index 0f452f97e..b5f56f2d6 100644 --- a/airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py @@ -98,6 +98,9 @@ def _get_declarative_component_schema() -> Dict[str, Any]: class ManifestDeclarativeSource(DeclarativeSource): """Declarative source defined by a manifest of low-code components that define source connector behavior""" + check_config_during_discover: bool = False + """Declarative sources default to not checking config before discovery.""" + def __init__( self, source_config: ConnectionDefinition, @@ -149,6 +152,7 @@ def __init__( # apply additional post-processing to the manifest self._post_process_manifest() + self.check_config_during_discover = self._uses_dynamic_schema_loader() spec: Optional[Mapping[str, Any]] = self._source_config.get("spec") self._spec_component: Optional[Spec] = ( self._constructor.create_component(SpecModel, spec, dict()) if spec else None @@ -614,3 +618,37 @@ def _dynamic_stream_configs( def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: self.logger.debug("declarative source created from manifest", extra=extra_args) + + def _uses_dynamic_schema_loader(self) -> bool: + """ + Determines if any stream in the source uses a DynamicSchemaLoader. + + DynamicSchemaLoader makes a separate call to retrieve schema information, + which might require authentication. When present, config validation cannot + be skipped during discovery. + + Returns: + bool: True if any stream uses a DynamicSchemaLoader (config required for discover), + False otherwise (unprivileged discover may be supported). + """ + empty_config: Dict[str, Any] = {} + for stream_config in self._stream_configs(self._source_config, empty_config): + schema_loader = stream_config.get("schema_loader", {}) + if ( + isinstance(schema_loader, dict) + and schema_loader.get("type") == "DynamicSchemaLoader" + ): + return True + + dynamic_streams = self._source_config.get("dynamic_streams", []) + if dynamic_streams: + for dynamic_stream in dynamic_streams: + stream_template = dynamic_stream.get("stream_template", {}) + schema_loader = stream_template.get("schema_loader", {}) + if ( + isinstance(schema_loader, dict) + and schema_loader.get("type") == "DynamicSchemaLoader" + ): + return True + + return False diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 781bb64d1..ccbec778e 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -242,6 +242,8 @@ def __init__( message_repository=self._message_repository, ) + self.check_config_during_discover = self._uses_dynamic_schema_loader() + def _pre_process_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]: """ Preprocesses the provided manifest dictionary by resolving any manifest references. @@ -651,3 +653,36 @@ def _select_streams( as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE) ) return abstract_streams + + def _uses_dynamic_schema_loader(self) -> bool: + """ + Determines if any stream in the source uses a DynamicSchemaLoader. + + DynamicSchemaLoader makes a separate call to retrieve schema information, + which might require authentication. When present, config validation cannot + be skipped during discovery. + + Returns: + bool: True if any stream uses a DynamicSchemaLoader (config required for discover), + False otherwise (unprivileged discover may be supported). + """ + for stream_config in self._stream_configs(self._source_config): + schema_loader = stream_config.get("schema_loader", {}) + if ( + isinstance(schema_loader, dict) + and schema_loader.get("type") == "DynamicSchemaLoader" + ): + return True + + dynamic_stream_definitions = self._source_config.get("dynamic_streams", []) + if dynamic_stream_definitions: + for dynamic_definition in dynamic_stream_definitions: + stream_template = dynamic_definition.get("stream_template", {}) + schema_loader = stream_template.get("schema_loader", {}) + if ( + isinstance(schema_loader, dict) + and schema_loader.get("type") == "DynamicSchemaLoader" + ): + return True + + return False diff --git a/airbyte_cdk/test/entrypoint_wrapper.py b/airbyte_cdk/test/entrypoint_wrapper.py index abe297030..41784bcd4 100644 --- a/airbyte_cdk/test/entrypoint_wrapper.py +++ b/airbyte_cdk/test/entrypoint_wrapper.py @@ -69,7 +69,6 @@ class AirbyteEntrypointException(Exception): def __post_init__(self) -> None: super().__init__(self.message) - class EntrypointOutput: """A class to encapsulate the output of an Airbyte connector's execution. diff --git a/airbyte_cdk/test/standard_tests/source_base.py b/airbyte_cdk/test/standard_tests/source_base.py index faecb03c7..993ec6a68 100644 --- a/airbyte_cdk/test/standard_tests/source_base.py +++ b/airbyte_cdk/test/standard_tests/source_base.py @@ -62,11 +62,9 @@ def test_discover( ) -> None: """Standard test for `discover`.""" if scenario.expected_outcome.expect_exception(): - # If the scenario expects an exception, we can't ensure it specifically would fail - # in discover, because some discover implementations do not need to make a connection. - # We skip this test in that case. - pytest.skip("Skipping discover test for scenario that expects an exception.") - return + pytest.skip( + "Skipping `discover` test because the scenario is expected to raise an exception." + ) run_test_job( self.create_connector(scenario), @@ -111,6 +109,15 @@ def test_basic_read( obtain the catalog of streams, and then it runs a `read` job to fetch records from those streams. """ + check_result: entrypoint_wrapper.EntrypointOutput = run_test_job( + self.create_connector(scenario), + "check", + test_scenario=scenario, + connector_root=self.get_connector_root_dir(), + ) + if scenario.expected_outcome.expect_exception() and check_result.errors: + # Expected failure and we got it. Return early. + return discover_result = run_test_job( self.create_connector(scenario), "discover", @@ -144,7 +151,9 @@ def test_basic_read( connector_root=self.get_connector_root_dir(), catalog=configured_catalog, ) - + if scenario.expected_outcome.expect_exception() and not result.errors: + # By now we should have raised an exception. + raise AssertionError("Expected an error but got none.") if scenario.expected_outcome.expect_success() and not result.records: raise AssertionError("Expected records but got none.") @@ -153,6 +162,14 @@ def test_fail_read_with_bad_catalog( scenario: ConnectorTestScenario, ) -> None: """Standard test for `read` when passed a bad catalog file.""" + # Recreate the scenario with the same config but set the status to "failed". + scenario = ConnectorTestScenario( + config_dict=scenario.get_config_dict( + connector_root=self.get_connector_root_dir(), + empty_if_missing=False, + ), + status="failed", + ) invalid_configured_catalog = ConfiguredAirbyteCatalog( streams=[ # Create ConfiguredAirbyteStream which is deliberately invalid diff --git a/unit_tests/sources/declarative/test_manifest_declarative_source_dynamic_schema.py b/unit_tests/sources/declarative/test_manifest_declarative_source_dynamic_schema.py new file mode 100644 index 000000000..1e96fde4c --- /dev/null +++ b/unit_tests/sources/declarative/test_manifest_declarative_source_dynamic_schema.py @@ -0,0 +1,175 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +"""Tests for the ManifestDeclarativeSource with DynamicSchemaLoader.""" + +from unittest.mock import MagicMock, patch + +import pytest + +from airbyte_cdk.legacy.sources.declarative.manifest_declarative_source import ( + ManifestDeclarativeSource, +) +from airbyte_cdk.models import AirbyteCatalog + + +def test_check_config_during_discover_with_dynamic_schema_loader(): + """Test that check_config_during_discover is True when DynamicSchemaLoader is used.""" + source_config = { + "type": "DeclarativeSource", + "check": {"type": "CheckStream"}, + "streams": [ + { + "name": "test_stream", + "schema_loader": { + "type": "DynamicSchemaLoader", + "retriever": { + "type": "SimpleRetriever", + "requester": {"url_base": "https://example.com", "http_method": "GET"}, + "record_selector": {"extractor": {"field_path": []}}, + }, + "schema_type_identifier": { + "key_pointer": ["name"], + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": {"url_base": "https://example.com", "http_method": "GET"}, + "record_selector": {"extractor": {"field_path": []}}, + }, + } + ], + "version": "0.1.0", + } + + source = ManifestDeclarativeSource(source_config=source_config) + + assert source.check_config_during_discover is True + assert source.check_config_against_spec is True + + +def test_check_config_during_discover_without_dynamic_schema_loader(): + """Test that check_config_during_discover is False when DynamicSchemaLoader is not used.""" + source_config = { + "type": "DeclarativeSource", + "check": {"type": "CheckStream"}, + "streams": [ + { + "name": "test_stream", + "schema_loader": {"type": "InlineSchemaLoader", "schema": {}}, + "retriever": { + "type": "SimpleRetriever", + "requester": {"url_base": "https://example.com", "http_method": "GET"}, + "record_selector": {"extractor": {"field_path": []}}, + }, + } + ], + "version": "0.1.0", + } + + source = ManifestDeclarativeSource(source_config=source_config) + + assert source.check_config_during_discover is False + assert source.check_config_against_spec is True + + +@patch( + "airbyte_cdk.legacy.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource.streams" +) +def test_discover_with_dynamic_schema_loader_no_config(mock_streams): + """Test that discovery works without config when DynamicSchemaLoader is used.""" + mock_stream = MagicMock() + mock_stream.name = "test_dynamic_stream" + + mock_airbyte_stream = MagicMock() + type(mock_airbyte_stream).name = "test_dynamic_stream" + mock_stream.as_airbyte_stream.return_value = mock_airbyte_stream + + mock_streams.return_value = [mock_stream] + + source_config = { + "type": "DeclarativeSource", + "check": {"type": "CheckStream"}, + "streams": [ + { + "name": "test_dynamic_stream", + "schema_loader": { + "type": "DynamicSchemaLoader", + "retriever": { + "type": "SimpleRetriever", + "requester": {"url_base": "https://example.com", "http_method": "GET"}, + "record_selector": {"extractor": {"field_path": []}}, + }, + "schema_type_identifier": { + "key_pointer": ["name"], + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": {"url_base": "https://example.com", "http_method": "GET"}, + "record_selector": {"extractor": {"field_path": []}}, + }, + } + ], + "version": "0.1.0", + } + + source = ManifestDeclarativeSource(source_config=source_config) + + assert source.check_config_during_discover is True + assert source.check_config_against_spec is True + + logger = MagicMock() + catalog = source.discover(logger, {}) + + assert isinstance(catalog, AirbyteCatalog) + assert len(catalog.streams) == 1 + assert catalog.streams[0].name == "test_dynamic_stream" + + +@patch( + "airbyte_cdk.legacy.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource.streams" +) +def test_discover_without_dynamic_schema_loader_no_config(mock_streams): + """Test that discovery validates config when DynamicSchemaLoader is not used.""" + mock_stream = MagicMock() + mock_stream.name = "test_static_stream" + + mock_airbyte_stream = MagicMock() + type(mock_airbyte_stream).name = "test_static_stream" + mock_stream.as_airbyte_stream.return_value = mock_airbyte_stream + + mock_streams.return_value = [mock_stream] + + source_config = { + "type": "DeclarativeSource", + "check": {"type": "CheckStream"}, + "streams": [ + { + "name": "test_static_stream", + "schema_loader": {"type": "InlineSchemaLoader", "schema": {}}, + "retriever": { + "type": "SimpleRetriever", + "requester": {"url_base": "https://example.com", "http_method": "GET"}, + "record_selector": {"extractor": {"field_path": []}}, + }, + } + ], + "version": "0.1.0", + } + + source = ManifestDeclarativeSource(source_config=source_config) + + assert source.check_config_during_discover is False + assert source.check_config_against_spec is True + + logger = MagicMock() + catalog = source.discover(logger, {}) + + assert isinstance(catalog, AirbyteCatalog) + assert len(catalog.streams) == 1 + assert catalog.streams[0].name == "test_static_stream" + + assert source.check_config_during_discover is False + assert source.check_config_against_spec is True diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 520131881..b51408d10 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -237,14 +237,13 @@ def test_parse_new_args( ["cmd", "args"], [ ("check", {"config": "config_path"}), - ("discover", {"config": "config_path"}), ("read", {"config": "config_path", "catalog": "catalog_path"}), ], ) def test_parse_missing_required_args( cmd: str, args: MutableMapping[str, Any], entrypoint: AirbyteEntrypoint ): - required_args = {"check": ["config"], "discover": ["config"], "read": ["config", "catalog"]} + required_args = {"check": ["config"], "read": ["config", "catalog"]} for required_arg in required_args[cmd]: argcopy = deepcopy(args) del argcopy[required_arg] @@ -856,3 +855,77 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json( # There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here record_messages = list(filter(lambda message: "RECORD" in message, messages)) assert len(record_messages) == 2 + +def test_run_discover_without_config_when_supported(mocker): + """Test that discover works without config when check_config_during_discover is False.""" + # Create a mock source that supports unprivileged discover + mock_source = MockSource() + mock_source.check_config_during_discover = False + + message_repository = MagicMock() + message_repository.consume_queue.return_value = [] + mocker.patch.object( + MockSource, + "message_repository", + new_callable=mocker.PropertyMock, + return_value=message_repository, + ) + + entrypoint = AirbyteEntrypoint(mock_source) + + parsed_args = Namespace(command="discover", config=None) + expected_catalog = AirbyteCatalog( + streams=[ + AirbyteStream( + name="test_stream", + json_schema={"type": "object"}, + supported_sync_modes=[SyncMode.full_refresh] + ) + ] + ) + + spec = ConnectorSpecification(connectionSpecification={}) + mocker.patch.object(MockSource, "spec", return_value=spec) + mocker.patch.object(MockSource, "discover", return_value=expected_catalog) + + messages = list(entrypoint.run(parsed_args)) + + # Should successfully return catalog without config + assert len(messages) == 1 + assert _wrap_message(expected_catalog) == messages[0] + + # Verify discover was called with empty config + MockSource.discover.assert_called_once() + call_args = MockSource.discover.call_args + assert call_args[0][1] == {} # config argument should be empty dict + + +def test_run_discover_without_config_when_not_supported(mocker): + """Test that discover fails with helpful error when config is required but not provided.""" + # Create a mock source that requires config for discover + mock_source = MockSource() + mock_source.check_config_during_discover = True + + message_repository = MagicMock() + message_repository.consume_queue.return_value = [] + mocker.patch.object( + MockSource, + "message_repository", + new_callable=mocker.PropertyMock, + return_value=message_repository, + ) + + entrypoint = AirbyteEntrypoint(mock_source) + + parsed_args = Namespace(command="discover", config=None) + + spec = ConnectorSpecification(connectionSpecification={}) + mocker.patch.object(MockSource, "spec", return_value=spec) + + # Should raise ValueError with helpful message + with pytest.raises(ValueError) as exc_info: + list(entrypoint.run(parsed_args)) + + error_message = str(exc_info.value) + assert "The '--config' argument is required but was not provided" in error_message + assert "does not support unprivileged discovery" in error_message