Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2565,14 +2565,14 @@ def create_dynamic_schema_loader(
self._create_component_from_model(model=transformation_model, config=config)
)
name = "dynamic_properties"
partition_router = self._build_stream_slicer_from_partition_router(model.retriever, config)
retriever = self._create_component_from_model(
model=model.retriever,
config=config,
name=name,
primary_key=None,
partition_router=self._build_stream_slicer_from_partition_router(
model.retriever, config
),
stream_slicer=partition_router,
partition_router=partition_router,
transformations=[],
use_cache=True,
log_formatter=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever
from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader, JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema import (
DynamicSchemaLoader,
InlineSchemaLoader,
JsonFileSchemaLoader,
)
from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import (
CachingSchemaLoaderDecorator,
)
Expand Down Expand Up @@ -4160,6 +4164,97 @@ def test_create_async_retriever():
assert download_retriever_record_selector.schema_normalization._config.name == "NoTransform"


def test_dynamic_schema_loader_with_async_retriever():
"""
Verifies that DynamicSchemaLoader can be created with an AsyncRetriever without raising
TypeError due to missing stream_slicer kwarg.
Regression test for https://github.com/airbytehq/airbyte-python-cdk/issues/766
"""
content = """
stream_with_dynamic_schema:
type: DeclarativeStream
name: test_stream
schema_loader:
type: DynamicSchemaLoader
retriever:
type: AsyncRetriever
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["data"]
status_mapping:
failed:
- Error
running:
- Pending
completed:
- Success
timeout: []
status_extractor:
type: DpathExtractor
field_path:
- status
download_target_extractor:
type: DpathExtractor
field_path:
- download_url
creation_requester:
type: HttpRequester
url_base: https://api.test.com
path: /reports/create
http_method: POST
polling_requester:
type: HttpRequester
url_base: https://api.test.com
path: /reports/status
http_method: GET
download_requester:
type: HttpRequester
url_base: "{{download_target}}"
http_method: GET
schema_type_identifier:
type: SchemaTypeIdentifier
key_pointer:
- name
type_pointer:
- type
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: https://api.test.com
path: /data
http_method: GET
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: []
paginator:
type: NoPagination
$parameters:
name: "test_stream"
primary_key: "id"
"""
parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
stream_manifest = transformer.propagate_types_and_parameters(
"", resolved_manifest["stream_with_dynamic_schema"], {}
)

stream = factory.create_component(
model_type=DeclarativeStreamModel,
component_definition=stream_manifest,
config=input_config,
)

assert isinstance(stream, DefaultStream)
schema_loader = get_schema_loader(stream)
assert isinstance(schema_loader, DynamicSchemaLoader)
assert isinstance(schema_loader.retriever, AsyncRetriever)


def test_api_budget():
manifest = {
"type": "DeclarativeSource",
Expand Down
Loading