Skip to content

Commit 219f7df

Browse files
refactor: move stream_name_to_group into ModelToComponentFactory
- Add stream_name_to_group parameter to ModelToComponentFactory.__init__() - Add set_stream_name_to_group() method for post-init configuration - Factory now looks up block_simultaneous_read from its own mapping - Remove config injection hack from ConcurrentDeclarativeSource.streams() - Update tests to use factory-based approach instead of extra fields Co-Authored-By: unknown <>
1 parent 49b0174 commit 219f7df

3 files changed

Lines changed: 80 additions & 18 deletions

File tree

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -405,13 +405,11 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
405405
if api_budget_model:
406406
self._constructor.set_api_budget(api_budget_model, self._config)
407407

408-
stream_name_to_group = self._build_stream_name_to_group(self._source_config)
408+
self._constructor.set_stream_name_to_group(
409+
self._build_stream_name_to_group(self._source_config)
410+
)
409411

410412
prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
411-
for stream_config in prepared_configs:
412-
stream_name = stream_config.get("name", "")
413-
if stream_name in stream_name_to_group:
414-
stream_config["block_simultaneous_read"] = stream_name_to_group[stream_name]
415413

416414
source_streams = [
417415
self._constructor.create_component(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,7 @@ def __init__(
682682
max_concurrent_async_job_count: Optional[int] = None,
683683
configured_catalog: Optional[ConfiguredAirbyteCatalog] = None,
684684
api_budget: Optional[APIBudget] = None,
685+
stream_name_to_group: Optional[Dict[str, str]] = None,
685686
):
686687
self._init_mappings()
687688
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
@@ -698,9 +699,14 @@ def __init__(
698699
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
699700
self._api_budget: Optional[Union[APIBudget]] = api_budget
700701
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
702+
self._stream_name_to_group: Dict[str, str] = stream_name_to_group or {}
701703
# placeholder for deprecation warnings
702704
self._collected_deprecation_logs: List[ConnectorBuilderLogMessage] = []
703705

706+
def set_stream_name_to_group(self, stream_name_to_group: Dict[str, str]) -> None:
707+
"""Set the mapping from stream name to group name for block_simultaneous_read."""
708+
self._stream_name_to_group = stream_name_to_group
709+
704710
def _init_mappings(self) -> None:
705711
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
706712
AddedFieldDefinitionModel: self.create_added_field_definition,
@@ -2118,7 +2124,7 @@ def create_default_stream(
21182124
logger=logging.getLogger(f"airbyte.{stream_name}"),
21192125
cursor=concurrent_cursor,
21202126
supports_file_transfer=hasattr(model, "file_uploader") and bool(model.file_uploader),
2121-
block_simultaneous_read=getattr(model, "block_simultaneous_read", "") or "",
2127+
block_simultaneous_read=self._stream_name_to_group.get(stream_name, ""),
21222128
)
21232129

21242130
def _migrate_state(self, model: DeclarativeStreamModel, config: Config) -> None:

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5215,19 +5215,17 @@ def test_catalog_defined_cursor_field_stream_missing():
52155215

52165216

52175217
def test_block_simultaneous_read_from_stream_groups():
5218-
"""Test that block_simultaneous_read flows through from stream_groups to DefaultStream.
5218+
"""Test that block_simultaneous_read flows through from stream_name_to_group to DefaultStream.
52195219
5220-
The stream_groups config is processed by ConcurrentDeclarativeSource which injects
5221-
block_simultaneous_read into individual stream configs before passing them to the factory.
5222-
This test verifies that the factory correctly reads block_simultaneous_read from the
5223-
extra fields on the stream config dict.
5220+
The stream_groups config is parsed by ConcurrentDeclarativeSource into a stream_name_to_group
5221+
mapping, which is then set on the ModelToComponentFactory. The factory uses this mapping to
5222+
look up the group for each stream it creates.
52245223
"""
52255224
content = """
52265225
parent_stream:
52275226
type: DeclarativeStream
52285227
name: "parent"
52295228
primary_key: "id"
5230-
block_simultaneous_read: "issues_endpoint"
52315229
retriever:
52325230
type: SimpleRetriever
52335231
requester:
@@ -5255,7 +5253,6 @@ def test_block_simultaneous_read_from_stream_groups():
52555253
type: DeclarativeStream
52565254
name: "child"
52575255
primary_key: "id"
5258-
block_simultaneous_read: "issues_endpoint"
52595256
retriever:
52605257
type: SimpleRetriever
52615258
requester:
@@ -5318,26 +5315,31 @@ def test_block_simultaneous_read_from_stream_groups():
53185315

53195316
config = {"api_key": "test_key"}
53205317

5318+
# Create a factory with stream_name_to_group mapping (as ConcurrentDeclarativeSource would do)
5319+
factory_with_groups = ModelToComponentFactory(
5320+
stream_name_to_group={"parent": "issues_endpoint", "child": "issues_endpoint"}
5321+
)
5322+
53215323
parsed_manifest = YamlDeclarativeSource._parse(content)
53225324
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
53235325

5324-
# Test parent stream with block_simultaneous_read injected (as ConcurrentDeclarativeSource would do)
5326+
# Test parent stream gets block_simultaneous_read from the factory's stream_name_to_group
53255327
parent_manifest = transformer.propagate_types_and_parameters(
53265328
"", resolved_manifest["parent_stream"], {}
53275329
)
5328-
parent_stream: DefaultStream = factory.create_component(
5330+
parent_stream: DefaultStream = factory_with_groups.create_component(
53295331
model_type=DeclarativeStreamModel, component_definition=parent_manifest, config=config
53305332
)
53315333

53325334
assert isinstance(parent_stream, DefaultStream)
53335335
assert parent_stream.name == "parent"
53345336
assert parent_stream.block_simultaneous_read == "issues_endpoint"
53355337

5336-
# Test child stream with block_simultaneous_read injected
5338+
# Test child stream gets block_simultaneous_read from the factory's stream_name_to_group
53375339
child_manifest = transformer.propagate_types_and_parameters(
53385340
"", resolved_manifest["child_stream"], {}
53395341
)
5340-
child_stream: DefaultStream = factory.create_component(
5342+
child_stream: DefaultStream = factory_with_groups.create_component(
53415343
model_type=DeclarativeStreamModel, component_definition=child_manifest, config=config
53425344
)
53435345

@@ -5349,7 +5351,7 @@ def test_block_simultaneous_read_from_stream_groups():
53495351
no_block_manifest = transformer.propagate_types_and_parameters(
53505352
"", resolved_manifest["no_block_stream"], {}
53515353
)
5352-
no_block_stream: DefaultStream = factory.create_component(
5354+
no_block_stream: DefaultStream = factory_with_groups.create_component(
53535355
model_type=DeclarativeStreamModel, component_definition=no_block_manifest, config=config
53545356
)
53555357

@@ -5358,6 +5360,62 @@ def test_block_simultaneous_read_from_stream_groups():
53585360
assert no_block_stream.block_simultaneous_read == ""
53595361

53605362

5363+
def test_set_stream_name_to_group():
5364+
"""Test that set_stream_name_to_group updates the factory's stream_name_to_group mapping."""
5365+
content = """
5366+
test_stream:
5367+
type: DeclarativeStream
5368+
name: "test"
5369+
primary_key: "id"
5370+
retriever:
5371+
type: SimpleRetriever
5372+
requester:
5373+
type: HttpRequester
5374+
url_base: "https://api.example.com"
5375+
path: "/test"
5376+
http_method: "GET"
5377+
authenticator:
5378+
type: BearerAuthenticator
5379+
api_token: "{{ config['api_key'] }}"
5380+
record_selector:
5381+
type: RecordSelector
5382+
extractor:
5383+
type: DpathExtractor
5384+
field_path: []
5385+
schema_loader:
5386+
type: InlineSchemaLoader
5387+
schema:
5388+
type: object
5389+
properties:
5390+
id:
5391+
type: string
5392+
"""
5393+
5394+
config = {"api_key": "test_key"}
5395+
5396+
# Create factory without stream_name_to_group
5397+
test_factory = ModelToComponentFactory()
5398+
5399+
parsed_manifest = YamlDeclarativeSource._parse(content)
5400+
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
5401+
stream_manifest = transformer.propagate_types_and_parameters(
5402+
"", resolved_manifest["test_stream"], {}
5403+
)
5404+
5405+
# Without stream_name_to_group, block_simultaneous_read should be empty
5406+
stream: DefaultStream = test_factory.create_component(
5407+
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=config
5408+
)
5409+
assert stream.block_simultaneous_read == ""
5410+
5411+
# After setting stream_name_to_group, block_simultaneous_read should be populated
5412+
test_factory.set_stream_name_to_group({"test": "my_group"})
5413+
stream = test_factory.create_component(
5414+
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=config
5415+
)
5416+
assert stream.block_simultaneous_read == "my_group"
5417+
5418+
53615419
def get_schema_loader(stream: DefaultStream):
53625420
assert isinstance(
53635421
stream._stream_partition_generator._partition_factory._schema_loader,

0 commit comments

Comments
 (0)