diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index aa3298b53..540813a49 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -36,7 +36,7 @@ class DynamicStreamCheckConfig: and type enforcement.""" dynamic_stream_name: str - stream_count: int = 0 + stream_count: Optional[int] = None @dataclass @@ -162,10 +162,15 @@ def _check_generated_streams_availability( generated_streams: List[Dict[str, Any]], stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], logger: logging.Logger, - max_count: int, + max_count: Optional[int], ) -> Tuple[bool, Any]: - """Checks availability of generated dynamic streams.""" - for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]: + """Checks availability of generated dynamic streams. + + If `max_count` is `None`, all generated streams are checked. Otherwise, the + first `max_count` streams are checked (capped at the number of available streams). + """ + streams_to_check = generated_streams if max_count is None else generated_streams[:max_count] + for declarative_stream in streams_to_check: stream = stream_name_to_stream[declarative_stream["name"]] try: stream_is_available, reason = evaluate_availability(stream, logger) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 66c126d94..dda723c00 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -358,9 +358,9 @@ definitions: type: string stream_count: title: Stream Count - description: The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used. + description: The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked. type: integer - default: 0 + minimum: 1 CheckDynamicStream: title: Dynamic Streams to Check description: (This component is experimental. Use at your own risk.) Defines the dynamic streams to try reading when running a check operation. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 1ce38fec0..2fe33e672 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -58,8 +58,9 @@ class DynamicStreamCheckConfig(BaseModel): ..., description="The dynamic stream name.", title="Dynamic Stream Name" ) stream_count: Optional[int] = Field( - 0, - description="The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.", + None, + description="The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked.", + ge=1, title="Stream Count", ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index aefe01364..f0b984e8e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1246,7 +1246,7 @@ def create_dynamic_stream_check_config( ) -> DynamicStreamCheckConfig: return DynamicStreamCheckConfig( dynamic_stream_name=model.dynamic_stream_name, - stream_count=model.stream_count or 0, + stream_count=model.stream_count, ) def create_check_stream( diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index 317abb6c9..5821ae02e 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -439,7 +439,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 200, [], - 0, + 1, id="test_check_http_dynamic_stream_and_config_dynamic_stream", ), pytest.param( @@ -464,7 +464,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 200, [], - 0, + 1, id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream", ), pytest.param( @@ -487,6 +487,44 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp 1, id="test_stream_count_gt_generated_streams", ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + Status.SUCCEEDED, + False, + 200, + [], + 1, + id="test_stream_count_unset_checks_all_streams", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + Status.FAILED, + False, + 404, + ["Not found. The requested resource was not found on the server."], + 0, + id="test_stream_count_unset_failed", + ), pytest.param( {"check": {"type": "CheckStream", "stream_names": ["non_existent_stream"]}}, Status.FAILED, @@ -682,6 +720,36 @@ def test_check_stream_missing_fields(): ) +@pytest.mark.parametrize( + "stream_count", + [pytest.param(0, id="zero"), pytest.param(-1, id="negative")], +) +def test_check_stream_non_positive_stream_count(stream_count: int) -> None: + """A ValidationError is raised when stream_count is less than 1.""" + manifest = { + **deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), + **{ + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + "stream_count": stream_count, + } + ], + } + }, + } + with pytest.raises(ValidationError): + ConcurrentDeclarativeSource( + source_config=manifest, + config=_CONFIG, + catalog=None, + state=None, + ) + + def test_check_stream_only_type_provided(): manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **{"check": {"type": "CheckStream"}}} source = ConcurrentDeclarativeSource(