Skip to content

Commit 61562c4

Browse files
refactor: move _build_stream_name_to_group into ModelToComponentFactory
- Factory now owns the stream_groups resolution via set_stream_groups(manifest) - ConcurrentDeclarativeSource just calls factory.set_stream_groups(manifest) - Removed _build_stream_name_to_group from ConcurrentDeclarativeSource - Updated tests to use factory's _build_stream_name_to_group directly Co-Authored-By: unknown <>
1 parent 5066ec7 commit 61562c4

4 files changed

Lines changed: 48 additions & 49 deletions

File tree

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -405,9 +405,7 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
405405
if api_budget_model:
406406
self._constructor.set_api_budget(api_budget_model, self._config)
407407

408-
self._constructor.set_stream_name_to_group(
409-
self._build_stream_name_to_group(self._source_config)
410-
)
408+
self._constructor.set_stream_groups(self._source_config)
411409

412410
prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
413411

@@ -532,38 +530,6 @@ def dynamic_streams(self) -> List[Dict[str, Any]]:
532530
with_dynamic_stream_name=True,
533531
)
534532

535-
@staticmethod
536-
def _build_stream_name_to_group(manifest: Mapping[str, Any]) -> Dict[str, str]:
537-
"""Build a mapping from stream name to group name based on the stream_groups manifest config.
538-
539-
After manifest reference resolution, each stream reference in stream_groups.streams
540-
is resolved to the full stream definition dict containing a 'name' field.
541-
542-
Returns:
543-
A dict mapping stream name -> group name for streams that belong to a group.
544-
"""
545-
stream_name_to_group: Dict[str, str] = {}
546-
stream_groups = manifest.get("stream_groups", {})
547-
if not stream_groups:
548-
return stream_name_to_group
549-
550-
for group_name, group_config in stream_groups.items():
551-
streams = group_config.get("streams", [])
552-
for stream_ref in streams:
553-
if isinstance(stream_ref, dict):
554-
# After reference resolution, stream_ref is a full stream definition dict
555-
stream_name = stream_ref.get("name", "")
556-
if stream_name:
557-
stream_name_to_group[stream_name] = group_name
558-
elif isinstance(stream_ref, str):
559-
# If not resolved (shouldn't happen normally), extract name from ref path
560-
# e.g., "#/definitions/my_stream" -> "my_stream"
561-
if stream_ref.startswith("#/definitions/"):
562-
stream_name = stream_ref.split("/")[-1]
563-
stream_name_to_group[stream_name] = group_name
564-
565-
return stream_name_to_group
566-
567533
def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]:
568534
# This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
569535
stream_configs = []

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,6 @@ def __init__(
682682
max_concurrent_async_job_count: Optional[int] = None,
683683
configured_catalog: Optional[ConfiguredAirbyteCatalog] = None,
684684
api_budget: Optional[APIBudget] = None,
685-
stream_name_to_group: Optional[Dict[str, str]] = None,
686685
):
687686
self._init_mappings()
688687
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
@@ -699,13 +698,49 @@ def __init__(
699698
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
700699
self._api_budget: Optional[Union[APIBudget]] = api_budget
701700
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
702-
self._stream_name_to_group: Dict[str, str] = stream_name_to_group or {}
701+
self._stream_name_to_group: Dict[str, str] = {}
703702
# placeholder for deprecation warnings
704703
self._collected_deprecation_logs: List[ConnectorBuilderLogMessage] = []
705704

706-
def set_stream_name_to_group(self, stream_name_to_group: Dict[str, str]) -> None:
707-
"""Set the mapping from stream name to group name for block_simultaneous_read."""
708-
self._stream_name_to_group = stream_name_to_group
705+
def set_stream_groups(self, manifest: Mapping[str, Any]) -> None:
706+
"""Build and set the stream-name-to-group mapping from the manifest's stream_groups config.
707+
708+
After manifest reference resolution, each stream reference in stream_groups.streams
709+
is resolved to the full stream definition dict containing a 'name' field.
710+
"""
711+
self._stream_name_to_group = self._build_stream_name_to_group(manifest)
712+
713+
@staticmethod
714+
def _build_stream_name_to_group(manifest: Mapping[str, Any]) -> Dict[str, str]:
715+
"""Build a mapping from stream name to group name based on the stream_groups manifest config.
716+
717+
After manifest reference resolution, each stream reference in stream_groups.streams
718+
is resolved to the full stream definition dict containing a 'name' field.
719+
720+
Returns:
721+
A dict mapping stream name -> group name for streams that belong to a group.
722+
"""
723+
stream_name_to_group: Dict[str, str] = {}
724+
stream_groups = manifest.get("stream_groups", {})
725+
if not stream_groups:
726+
return stream_name_to_group
727+
728+
for group_name, group_config in stream_groups.items():
729+
streams = group_config.get("streams", [])
730+
for stream_ref in streams:
731+
if isinstance(stream_ref, dict):
732+
# After reference resolution, stream_ref is a full stream definition dict
733+
stream_name = stream_ref.get("name", "")
734+
if stream_name:
735+
stream_name_to_group[stream_name] = group_name
736+
elif isinstance(stream_ref, str):
737+
# If not resolved (shouldn't happen normally), extract name from ref path
738+
# e.g., "#/definitions/my_stream" -> "my_stream"
739+
if stream_ref.startswith("#/definitions/"):
740+
stream_name = stream_ref.split("/")[-1]
741+
stream_name_to_group[stream_name] = group_name
742+
743+
return stream_name_to_group
709744

710745
def _init_mappings(self) -> None:
711746
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@
4545
from airbyte_cdk.sources.declarative.auth.token_provider import SessionTokenProvider
4646
from airbyte_cdk.sources.declarative.checks import CheckStream
4747
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
48-
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
49-
ConcurrentDeclarativeSource,
50-
)
5148
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
5249
from airbyte_cdk.sources.declarative.decoders import JsonDecoder, PaginationDecoderDecorator
5350
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
@@ -5323,11 +5320,9 @@ def test_block_simultaneous_read_from_stream_groups():
53235320
parsed_manifest = YamlDeclarativeSource._parse(content)
53245321
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
53255322

5326-
# Build stream_name_to_group from the manifest's stream_groups (as ConcurrentDeclarativeSource does)
5327-
stream_name_to_group = ConcurrentDeclarativeSource._build_stream_name_to_group(
5328-
resolved_manifest
5329-
)
5330-
factory_with_groups = ModelToComponentFactory(stream_name_to_group=stream_name_to_group)
5323+
# Use the factory's set_stream_groups to resolve stream_groups from the manifest
5324+
factory_with_groups = ModelToComponentFactory()
5325+
factory_with_groups.set_stream_groups(resolved_manifest)
53315326

53325327
# Test parent stream gets block_simultaneous_read from stream_groups
53335328
parent_manifest = transformer.propagate_types_and_parameters(

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
from airbyte_cdk.sources.declarative.extractors.record_filter import (
5757
ClientSideIncrementalRecordFilterDecorator,
5858
)
59+
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
60+
ModelToComponentFactory,
61+
)
5962
from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
6063
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
6164
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
@@ -5219,5 +5222,5 @@ def test_given_record_selector_is_filtering_when_read_then_raise_error():
52195222
)
52205223
def test_build_stream_name_to_group(manifest, expected):
52215224
"""Test _build_stream_name_to_group correctly maps stream names to group names."""
5222-
result = ConcurrentDeclarativeSource._build_stream_name_to_group(manifest)
5225+
result = ModelToComponentFactory._build_stream_name_to_group(manifest)
52235226
assert result == expected

0 commit comments

Comments
 (0)