Skip to content

Commit fd553bd

Browse files
fix: require stream_count >= 1 on DynamicStreamCheckConfig (#992)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
1 parent ab39d11 commit fd553bd

5 files changed

Lines changed: 85 additions & 11 deletions

File tree

airbyte_cdk/sources/declarative/checks/check_stream.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class DynamicStreamCheckConfig:
3636
and type enforcement."""
3737

3838
dynamic_stream_name: str
39-
stream_count: int = 0
39+
stream_count: Optional[int] = None
4040

4141

4242
@dataclass
@@ -162,10 +162,15 @@ def _check_generated_streams_availability(
162162
generated_streams: List[Dict[str, Any]],
163163
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
164164
logger: logging.Logger,
165-
max_count: int,
165+
max_count: Optional[int],
166166
) -> Tuple[bool, Any]:
167-
"""Checks availability of generated dynamic streams."""
168-
for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]:
167+
"""Checks availability of generated dynamic streams.
168+
169+
If `max_count` is `None`, all generated streams are checked. Otherwise, the
170+
first `max_count` streams are checked (capped at the number of available streams).
171+
"""
172+
streams_to_check = generated_streams if max_count is None else generated_streams[:max_count]
173+
for declarative_stream in streams_to_check:
169174
stream = stream_name_to_stream[declarative_stream["name"]]
170175
try:
171176
stream_is_available, reason = evaluate_availability(stream, logger)

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,9 @@ definitions:
358358
type: string
359359
stream_count:
360360
title: Stream Count
361-
description: The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.
361+
description: The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked.
362362
type: integer
363-
default: 0
363+
minimum: 1
364364
CheckDynamicStream:
365365
title: Dynamic Streams to Check
366366
description: (This component is experimental. Use at your own risk.) Defines the dynamic streams to try reading when running a check operation.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ class DynamicStreamCheckConfig(BaseModel):
5858
..., description="The dynamic stream name.", title="Dynamic Stream Name"
5959
)
6060
stream_count: Optional[int] = Field(
61-
0,
62-
description="The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.",
61+
None,
62+
description="The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked.",
63+
ge=1,
6364
title="Stream Count",
6465
)
6566

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1246,7 +1246,7 @@ def create_dynamic_stream_check_config(
12461246
) -> DynamicStreamCheckConfig:
12471247
return DynamicStreamCheckConfig(
12481248
dynamic_stream_name=model.dynamic_stream_name,
1249-
stream_count=model.stream_count or 0,
1249+
stream_count=model.stream_count,
12501250
)
12511251

12521252
def create_check_stream(

unit_tests/sources/declarative/checks/test_check_stream.py

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
439439
False,
440440
200,
441441
[],
442-
0,
442+
1,
443443
id="test_check_http_dynamic_stream_and_config_dynamic_stream",
444444
),
445445
pytest.param(
@@ -464,7 +464,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
464464
False,
465465
200,
466466
[],
467-
0,
467+
1,
468468
id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream",
469469
),
470470
pytest.param(
@@ -487,6 +487,44 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
487487
1,
488488
id="test_stream_count_gt_generated_streams",
489489
),
490+
pytest.param(
491+
{
492+
"check": {
493+
"type": "CheckStream",
494+
"dynamic_streams_check_configs": [
495+
{
496+
"type": "DynamicStreamCheckConfig",
497+
"dynamic_stream_name": "http_dynamic_stream",
498+
},
499+
],
500+
}
501+
},
502+
Status.SUCCEEDED,
503+
False,
504+
200,
505+
[],
506+
1,
507+
id="test_stream_count_unset_checks_all_streams",
508+
),
509+
pytest.param(
510+
{
511+
"check": {
512+
"type": "CheckStream",
513+
"dynamic_streams_check_configs": [
514+
{
515+
"type": "DynamicStreamCheckConfig",
516+
"dynamic_stream_name": "http_dynamic_stream",
517+
},
518+
],
519+
}
520+
},
521+
Status.FAILED,
522+
False,
523+
404,
524+
["Not found. The requested resource was not found on the server."],
525+
0,
526+
id="test_stream_count_unset_failed",
527+
),
490528
pytest.param(
491529
{"check": {"type": "CheckStream", "stream_names": ["non_existent_stream"]}},
492530
Status.FAILED,
@@ -682,6 +720,36 @@ def test_check_stream_missing_fields():
682720
)
683721

684722

723+
@pytest.mark.parametrize(
724+
"stream_count",
725+
[pytest.param(0, id="zero"), pytest.param(-1, id="negative")],
726+
)
727+
def test_check_stream_non_positive_stream_count(stream_count: int) -> None:
728+
"""A ValidationError is raised when stream_count is less than 1."""
729+
manifest = {
730+
**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT),
731+
**{
732+
"check": {
733+
"type": "CheckStream",
734+
"dynamic_streams_check_configs": [
735+
{
736+
"type": "DynamicStreamCheckConfig",
737+
"dynamic_stream_name": "http_dynamic_stream",
738+
"stream_count": stream_count,
739+
}
740+
],
741+
}
742+
},
743+
}
744+
with pytest.raises(ValidationError):
745+
ConcurrentDeclarativeSource(
746+
source_config=manifest,
747+
config=_CONFIG,
748+
catalog=None,
749+
state=None,
750+
)
751+
752+
685753
def test_check_stream_only_type_provided():
686754
manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **{"check": {"type": "CheckStream"}}}
687755
source = ConcurrentDeclarativeSource(

0 commit comments

Comments
 (0)