Skip to content

Commit 3c50ce4

Browse files
devin-ai-integration[bot]bot_apk
andcommitted
fix: pass stream_slicer when creating retriever in DynamicSchemaLoader
In create_dynamic_schema_loader, the retriever was created with only partition_router but not stream_slicer. This caused AsyncRetriever creation to fail with 'missing required keyword-only argument: stream_slicer' since AsyncRetriever requires stream_slicer as a keyword-only argument while SimpleRetriever accepts partition_router. The fix mirrors the pattern used in create_default_stream, passing both stream_slicer and partition_router when creating the retriever. Resolves #766 Co-Authored-By: bot_apk <apk@cognition.ai>
1 parent 69cd63d commit 3c50ce4

2 files changed

Lines changed: 99 additions & 4 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2565,14 +2565,14 @@ def create_dynamic_schema_loader(
25652565
self._create_component_from_model(model=transformation_model, config=config)
25662566
)
25672567
name = "dynamic_properties"
2568+
partition_router = self._build_stream_slicer_from_partition_router(model.retriever, config)
25682569
retriever = self._create_component_from_model(
25692570
model=model.retriever,
25702571
config=config,
25712572
name=name,
25722573
primary_key=None,
2573-
partition_router=self._build_stream_slicer_from_partition_router(
2574-
model.retriever, config
2575-
),
2574+
stream_slicer=partition_router,
2575+
partition_router=partition_router,
25762576
transformations=[],
25772577
use_cache=True,
25782578
log_formatter=(

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,11 @@
159159
from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath
160160
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
161161
from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, SimpleRetriever
162-
from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader, JsonFileSchemaLoader
162+
from airbyte_cdk.sources.declarative.schema import (
163+
DynamicSchemaLoader,
164+
InlineSchemaLoader,
165+
JsonFileSchemaLoader,
166+
)
163167
from airbyte_cdk.sources.declarative.schema.caching_schema_loader_decorator import (
164168
CachingSchemaLoaderDecorator,
165169
)
@@ -4160,6 +4164,97 @@ def test_create_async_retriever():
41604164
assert download_retriever_record_selector.schema_normalization._config.name == "NoTransform"
41614165

41624166

4167+
def test_dynamic_schema_loader_with_async_retriever():
4168+
"""
4169+
Verifies that DynamicSchemaLoader can be created with an AsyncRetriever without raising
4170+
TypeError due to missing stream_slicer kwarg.
4171+
Regression test for https://github.com/airbytehq/airbyte-python-cdk/issues/766
4172+
"""
4173+
content = """
4174+
stream_with_dynamic_schema:
4175+
type: DeclarativeStream
4176+
name: test_stream
4177+
schema_loader:
4178+
type: DynamicSchemaLoader
4179+
retriever:
4180+
type: AsyncRetriever
4181+
record_selector:
4182+
type: RecordSelector
4183+
extractor:
4184+
type: DpathExtractor
4185+
field_path: ["data"]
4186+
status_mapping:
4187+
failed:
4188+
- Error
4189+
running:
4190+
- Pending
4191+
completed:
4192+
- Success
4193+
timeout: []
4194+
status_extractor:
4195+
type: DpathExtractor
4196+
field_path:
4197+
- status
4198+
download_target_extractor:
4199+
type: DpathExtractor
4200+
field_path:
4201+
- download_url
4202+
creation_requester:
4203+
type: HttpRequester
4204+
url_base: https://api.test.com
4205+
path: /reports/create
4206+
http_method: POST
4207+
polling_requester:
4208+
type: HttpRequester
4209+
url_base: https://api.test.com
4210+
path: /reports/status
4211+
http_method: GET
4212+
download_requester:
4213+
type: HttpRequester
4214+
url_base: "{{download_target}}"
4215+
http_method: GET
4216+
schema_type_identifier:
4217+
type: SchemaTypeIdentifier
4218+
key_pointer:
4219+
- name
4220+
type_pointer:
4221+
- type
4222+
retriever:
4223+
type: SimpleRetriever
4224+
requester:
4225+
type: HttpRequester
4226+
url_base: https://api.test.com
4227+
path: /data
4228+
http_method: GET
4229+
record_selector:
4230+
type: RecordSelector
4231+
extractor:
4232+
type: DpathExtractor
4233+
field_path: []
4234+
paginator:
4235+
type: NoPagination
4236+
$parameters:
4237+
name: "test_stream"
4238+
primary_key: "id"
4239+
"""
4240+
parsed_manifest = YamlDeclarativeSource._parse(content)
4241+
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
4242+
stream_manifest = transformer.propagate_types_and_parameters(
4243+
"", resolved_manifest["stream_with_dynamic_schema"], {}
4244+
)
4245+
4246+
stream = factory.create_component(
4247+
model_type=DeclarativeStreamModel,
4248+
component_definition=stream_manifest,
4249+
config=input_config,
4250+
)
4251+
4252+
assert isinstance(stream, DefaultStream)
4253+
schema_loader = get_schema_loader(stream)
4254+
assert isinstance(schema_loader, DynamicSchemaLoader)
4255+
assert isinstance(schema_loader.retriever, AsyncRetriever)
4256+
4257+
41634258
def test_api_budget():
41644259
manifest = {
41654260
"type": "DeclarativeSource",

0 commit comments

Comments
 (0)