diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 239e5bd51..47b1c9cb2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2565,14 +2565,14 @@ def create_dynamic_schema_loader( self._create_component_from_model(model=transformation_model, config=config) ) name = "dynamic_properties" + partition_router = self._build_stream_slicer_from_partition_router(model.retriever, config) retriever = self._create_component_from_model( model=model.retriever, config=config, name=name, primary_key=None, - partition_router=self._build_stream_slicer_from_partition_router( - model.retriever, config - ), + stream_slicer=partition_router, + partition_router=partition_router, transformations=[], use_cache=True, log_formatter=( diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index add9a1c42..9bc8ea20a 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -159,7 +159,11 @@ from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever -from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader, JsonFileSchemaLoader +from airbyte_cdk.sources.declarative.schema import ( + DynamicSchemaLoader, + InlineSchemaLoader, + JsonFileSchemaLoader, +) from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import ( CachingSchemaLoaderDecorator, ) @@ -4160,6 +4164,97 @@ def test_create_async_retriever(): assert download_retriever_record_selector.schema_normalization._config.name == "NoTransform" +def test_dynamic_schema_loader_with_async_retriever(): + """ + Verifies that DynamicSchemaLoader can be created with an AsyncRetriever without raising + TypeError due to missing stream_slicer kwarg. + Regression test for https://github.com/airbytehq/airbyte-python-cdk/issues/766 + """ + content = """ +stream_with_dynamic_schema: + type: DeclarativeStream + name: test_stream + schema_loader: + type: DynamicSchemaLoader + retriever: + type: AsyncRetriever + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["data"] + status_mapping: + failed: + - Error + running: + - Pending + completed: + - Success + timeout: [] + status_extractor: + type: DpathExtractor + field_path: + - status + download_target_extractor: + type: DpathExtractor + field_path: + - download_url + creation_requester: + type: HttpRequester + url_base: https://api.test.com + path: /reports/create + http_method: POST + polling_requester: + type: HttpRequester + url_base: https://api.test.com + path: /reports/status + http_method: GET + download_requester: + type: HttpRequester + url_base: "{{download_target}}" + http_method: GET + schema_type_identifier: + type: SchemaTypeIdentifier + key_pointer: + - name + type_pointer: + - type + retriever: + type: SimpleRetriever + requester: + type: HttpRequester + url_base: https://api.test.com + path: /data + http_method: GET + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + paginator: + type: NoPagination + $parameters: + name: "test_stream" + primary_key: "id" +""" + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["stream_with_dynamic_schema"], {} + ) + + stream = factory.create_component( + model_type=DeclarativeStreamModel, + component_definition=stream_manifest, + config=input_config, + ) + + assert isinstance(stream, DefaultStream) + schema_loader = get_schema_loader(stream) + assert isinstance(schema_loader, DynamicSchemaLoader) + assert isinstance(schema_loader.retriever, AsyncRetriever) + + def test_api_budget(): manifest = { "type": "DeclarativeSource",