Skip to content

Commit 0390f4e

Browse files
refactor: use stream_groups manifest in factory test instead of hardcoded dict
- Test now defines stream_groups with references in the manifest YAML - Uses _build_stream_name_to_group() to derive the mapping from manifest - Removed test_set_stream_name_to_group (redundant with the manifest-based test) - Added ConcurrentDeclarativeSource import for _build_stream_name_to_group Co-Authored-By: unknown <>
1 parent 219f7df commit 0390f4e

1 file changed

Lines changed: 111 additions & 162 deletions

File tree

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 111 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545
from airbyte_cdk.sources.declarative.auth.token_provider import SessionTokenProvider
4646
from airbyte_cdk.sources.declarative.checks import CheckStream
4747
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
48+
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
49+
ConcurrentDeclarativeSource,
50+
)
4851
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
4952
from airbyte_cdk.sources.declarative.decoders import JsonDecoder, PaginationDecoderDecorator
5053
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
@@ -5215,117 +5218,119 @@ def test_catalog_defined_cursor_field_stream_missing():
52155218

52165219

52175220
def test_block_simultaneous_read_from_stream_groups():
5218-
"""Test that block_simultaneous_read flows through from stream_name_to_group to DefaultStream.
5219-
5220-
The stream_groups config is parsed by ConcurrentDeclarativeSource into a stream_name_to_group
5221-
mapping, which is then set on the ModelToComponentFactory. The factory uses this mapping to
5222-
look up the group for each stream it creates.
5223-
"""
5221+
"""Test that stream_groups in the manifest flow through to DefaultStream.block_simultaneous_read."""
52245222
content = """
5225-
parent_stream:
5226-
type: DeclarativeStream
5227-
name: "parent"
5228-
primary_key: "id"
5229-
retriever:
5230-
type: SimpleRetriever
5231-
requester:
5232-
type: HttpRequester
5233-
url_base: "https://api.example.com"
5234-
path: "/parent"
5235-
http_method: "GET"
5236-
authenticator:
5237-
type: BearerAuthenticator
5238-
api_token: "{{ config['api_key'] }}"
5239-
record_selector:
5240-
type: RecordSelector
5241-
extractor:
5242-
type: DpathExtractor
5243-
field_path: []
5244-
schema_loader:
5245-
type: InlineSchemaLoader
5246-
schema:
5247-
type: object
5248-
properties:
5249-
id:
5250-
type: string
5251-
5252-
child_stream:
5253-
type: DeclarativeStream
5254-
name: "child"
5255-
primary_key: "id"
5256-
retriever:
5257-
type: SimpleRetriever
5258-
requester:
5259-
type: HttpRequester
5260-
url_base: "https://api.example.com"
5261-
path: "/child"
5262-
http_method: "GET"
5263-
authenticator:
5264-
type: BearerAuthenticator
5265-
api_token: "{{ config['api_key'] }}"
5266-
record_selector:
5267-
type: RecordSelector
5268-
extractor:
5269-
type: DpathExtractor
5270-
field_path: []
5271-
partition_router:
5272-
type: SubstreamPartitionRouter
5273-
parent_stream_configs:
5274-
- type: ParentStreamConfig
5275-
stream: "#/parent_stream"
5276-
parent_key: "id"
5277-
partition_field: "parent_id"
5278-
schema_loader:
5279-
type: InlineSchemaLoader
5280-
schema:
5281-
type: object
5282-
properties:
5283-
id:
5284-
type: string
5285-
parent_id:
5286-
type: string
5287-
5288-
no_block_stream:
5289-
type: DeclarativeStream
5290-
name: "no_block"
5291-
primary_key: "id"
5292-
retriever:
5293-
type: SimpleRetriever
5294-
requester:
5295-
type: HttpRequester
5296-
url_base: "https://api.example.com"
5297-
path: "/no_block"
5298-
http_method: "GET"
5299-
authenticator:
5300-
type: BearerAuthenticator
5301-
api_token: "{{ config['api_key'] }}"
5302-
record_selector:
5303-
type: RecordSelector
5304-
extractor:
5305-
type: DpathExtractor
5306-
field_path: []
5307-
schema_loader:
5308-
type: InlineSchemaLoader
5309-
schema:
5310-
type: object
5311-
properties:
5312-
id:
5313-
type: string
5223+
definitions:
5224+
parent_stream:
5225+
type: DeclarativeStream
5226+
name: "parent"
5227+
primary_key: "id"
5228+
retriever:
5229+
type: SimpleRetriever
5230+
requester:
5231+
type: HttpRequester
5232+
url_base: "https://api.example.com"
5233+
path: "/parent"
5234+
http_method: "GET"
5235+
authenticator:
5236+
type: BearerAuthenticator
5237+
api_token: "{{ config['api_key'] }}"
5238+
record_selector:
5239+
type: RecordSelector
5240+
extractor:
5241+
type: DpathExtractor
5242+
field_path: []
5243+
schema_loader:
5244+
type: InlineSchemaLoader
5245+
schema:
5246+
type: object
5247+
properties:
5248+
id:
5249+
type: string
5250+
5251+
child_stream:
5252+
type: DeclarativeStream
5253+
name: "child"
5254+
primary_key: "id"
5255+
retriever:
5256+
type: SimpleRetriever
5257+
requester:
5258+
type: HttpRequester
5259+
url_base: "https://api.example.com"
5260+
path: "/child"
5261+
http_method: "GET"
5262+
authenticator:
5263+
type: BearerAuthenticator
5264+
api_token: "{{ config['api_key'] }}"
5265+
record_selector:
5266+
type: RecordSelector
5267+
extractor:
5268+
type: DpathExtractor
5269+
field_path: []
5270+
partition_router:
5271+
type: SubstreamPartitionRouter
5272+
parent_stream_configs:
5273+
- type: ParentStreamConfig
5274+
stream: "#/definitions/parent_stream"
5275+
parent_key: "id"
5276+
partition_field: "parent_id"
5277+
schema_loader:
5278+
type: InlineSchemaLoader
5279+
schema:
5280+
type: object
5281+
properties:
5282+
id:
5283+
type: string
5284+
parent_id:
5285+
type: string
5286+
5287+
no_block_stream:
5288+
type: DeclarativeStream
5289+
name: "no_block"
5290+
primary_key: "id"
5291+
retriever:
5292+
type: SimpleRetriever
5293+
requester:
5294+
type: HttpRequester
5295+
url_base: "https://api.example.com"
5296+
path: "/no_block"
5297+
http_method: "GET"
5298+
authenticator:
5299+
type: BearerAuthenticator
5300+
api_token: "{{ config['api_key'] }}"
5301+
record_selector:
5302+
type: RecordSelector
5303+
extractor:
5304+
type: DpathExtractor
5305+
field_path: []
5306+
schema_loader:
5307+
type: InlineSchemaLoader
5308+
schema:
5309+
type: object
5310+
properties:
5311+
id:
5312+
type: string
5313+
5314+
stream_groups:
5315+
issues_endpoint:
5316+
streams:
5317+
- "#/definitions/parent_stream"
5318+
- "#/definitions/child_stream"
5319+
action: BlockSimultaneousSyncsAction
53145320
"""
53155321

53165322
config = {"api_key": "test_key"}
53175323

5318-
# Create a factory with stream_name_to_group mapping (as ConcurrentDeclarativeSource would do)
5319-
factory_with_groups = ModelToComponentFactory(
5320-
stream_name_to_group={"parent": "issues_endpoint", "child": "issues_endpoint"}
5321-
)
5322-
53235324
parsed_manifest = YamlDeclarativeSource._parse(content)
53245325
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
53255326

5326-
# Test parent stream gets block_simultaneous_read from the factory's stream_name_to_group
5327+
# Build stream_name_to_group from the manifest's stream_groups (as ConcurrentDeclarativeSource does)
5328+
stream_name_to_group = ConcurrentDeclarativeSource._build_stream_name_to_group(resolved_manifest)
5329+
factory_with_groups = ModelToComponentFactory(stream_name_to_group=stream_name_to_group)
5330+
5331+
# Test parent stream gets block_simultaneous_read from stream_groups
53275332
parent_manifest = transformer.propagate_types_and_parameters(
5328-
"", resolved_manifest["parent_stream"], {}
5333+
"", resolved_manifest["definitions"]["parent_stream"], {}
53295334
)
53305335
parent_stream: DefaultStream = factory_with_groups.create_component(
53315336
model_type=DeclarativeStreamModel, component_definition=parent_manifest, config=config
@@ -5335,9 +5340,9 @@ def test_block_simultaneous_read_from_stream_groups():
53355340
assert parent_stream.name == "parent"
53365341
assert parent_stream.block_simultaneous_read == "issues_endpoint"
53375342

5338-
# Test child stream gets block_simultaneous_read from the factory's stream_name_to_group
5343+
# Test child stream gets block_simultaneous_read from stream_groups
53395344
child_manifest = transformer.propagate_types_and_parameters(
5340-
"", resolved_manifest["child_stream"], {}
5345+
"", resolved_manifest["definitions"]["child_stream"], {}
53415346
)
53425347
child_stream: DefaultStream = factory_with_groups.create_component(
53435348
model_type=DeclarativeStreamModel, component_definition=child_manifest, config=config
@@ -5347,9 +5352,9 @@ def test_block_simultaneous_read_from_stream_groups():
53475352
assert child_stream.name == "child"
53485353
assert child_stream.block_simultaneous_read == "issues_endpoint"
53495354

5350-
# Test stream without block_simultaneous_read (should default to empty string)
5355+
# Test stream not in any group defaults to empty string
53515356
no_block_manifest = transformer.propagate_types_and_parameters(
5352-
"", resolved_manifest["no_block_stream"], {}
5357+
"", resolved_manifest["definitions"]["no_block_stream"], {}
53535358
)
53545359
no_block_stream: DefaultStream = factory_with_groups.create_component(
53555360
model_type=DeclarativeStreamModel, component_definition=no_block_manifest, config=config
@@ -5360,62 +5365,6 @@ def test_block_simultaneous_read_from_stream_groups():
53605365
assert no_block_stream.block_simultaneous_read == ""
53615366

53625367

5363-
def test_set_stream_name_to_group():
5364-
"""Test that set_stream_name_to_group updates the factory's stream_name_to_group mapping."""
5365-
content = """
5366-
test_stream:
5367-
type: DeclarativeStream
5368-
name: "test"
5369-
primary_key: "id"
5370-
retriever:
5371-
type: SimpleRetriever
5372-
requester:
5373-
type: HttpRequester
5374-
url_base: "https://api.example.com"
5375-
path: "/test"
5376-
http_method: "GET"
5377-
authenticator:
5378-
type: BearerAuthenticator
5379-
api_token: "{{ config['api_key'] }}"
5380-
record_selector:
5381-
type: RecordSelector
5382-
extractor:
5383-
type: DpathExtractor
5384-
field_path: []
5385-
schema_loader:
5386-
type: InlineSchemaLoader
5387-
schema:
5388-
type: object
5389-
properties:
5390-
id:
5391-
type: string
5392-
"""
5393-
5394-
config = {"api_key": "test_key"}
5395-
5396-
# Create factory without stream_name_to_group
5397-
test_factory = ModelToComponentFactory()
5398-
5399-
parsed_manifest = YamlDeclarativeSource._parse(content)
5400-
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
5401-
stream_manifest = transformer.propagate_types_and_parameters(
5402-
"", resolved_manifest["test_stream"], {}
5403-
)
5404-
5405-
# Without stream_name_to_group, block_simultaneous_read should be empty
5406-
stream: DefaultStream = test_factory.create_component(
5407-
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=config
5408-
)
5409-
assert stream.block_simultaneous_read == ""
5410-
5411-
# After setting stream_name_to_group, block_simultaneous_read should be populated
5412-
test_factory.set_stream_name_to_group({"test": "my_group"})
5413-
stream = test_factory.create_component(
5414-
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=config
5415-
)
5416-
assert stream.block_simultaneous_read == "my_group"
5417-
5418-
54195368
def get_schema_loader(stream: DefaultStream):
54205369
assert isinstance(
54215370
stream._stream_partition_generator._partition_factory._schema_loader,

0 commit comments

Comments
 (0)