Skip to content

Commit 932539d

Browse files
Revise stream_count semantics: None=check all, 0=check none
Co-Authored-By: gl_anatolii.yatsuk <gl_anatolii.yatsuk@airbyte.io>
1 parent 3ae1320 commit 932539d

5 files changed

Lines changed: 80 additions & 12 deletions

File tree

airbyte_cdk/sources/declarative/checks/check_stream.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class DynamicStreamCheckConfig:
3636
and type enforcement."""
3737

3838
dynamic_stream_name: str
39-
stream_count: int = 0
39+
stream_count: Optional[int] = None
4040

4141

4242
@dataclass
@@ -162,13 +162,16 @@ def _check_generated_streams_availability(
162162
generated_streams: List[Dict[str, Any]],
163163
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
164164
logger: logging.Logger,
165-
max_count: int,
165+
max_count: Optional[int],
166166
) -> Tuple[bool, Any]:
167167
"""Checks availability of generated dynamic streams.
168168
169-
If max_count is 0 or negative, all generated streams are checked.
169+
If `max_count` is `None`, all generated streams are checked. If `max_count` is 0,
170+
no streams are checked. Otherwise, the first `max_count` streams are checked.
170171
"""
171-
streams_to_check = generated_streams if max_count <= 0 else generated_streams[:max_count]
172+
streams_to_check = (
173+
generated_streams if max_count is None else generated_streams[:max_count]
174+
)
172175
for declarative_stream in streams_to_check:
173176
stream = stream_name_to_stream[declarative_stream["name"]]
174177
try:

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,9 @@ definitions:
358358
type: string
359359
stream_count:
360360
title: Stream Count
361-
description: The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.
361+
description: The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. If set to 0, no streams are checked. If set to a positive value greater than the total number of available streams, all streams are checked.
362362
type: integer
363-
default: 0
363+
minimum: 0
364364
CheckDynamicStream:
365365
title: Dynamic Streams to Check
366366
description: (This component is experimental. Use at your own risk.) Defines the dynamic streams to try reading when running a check operation.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ class DynamicStreamCheckConfig(BaseModel):
5858
..., description="The dynamic stream name.", title="Dynamic Stream Name"
5959
)
6060
stream_count: Optional[int] = Field(
61-
0,
62-
description="The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.",
61+
None,
62+
description="The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. If set to 0, no streams are checked. If set to a positive value greater than the total number of available streams, all streams are checked.",
63+
ge=0,
6364
title="Stream Count",
6465
)
6566

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1246,7 +1246,7 @@ def create_dynamic_stream_check_config(
12461246
) -> DynamicStreamCheckConfig:
12471247
return DynamicStreamCheckConfig(
12481248
dynamic_stream_name=model.dynamic_stream_name,
1249-
stream_count=model.stream_count or 0,
1249+
stream_count=model.stream_count,
12501250
)
12511251

12521252
def create_check_stream(

unit_tests/sources/declarative/checks/test_check_stream.py

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -504,8 +504,8 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
504504
False,
505505
200,
506506
[],
507-
1,
508-
id="test_stream_count_zero_checks_all_streams",
507+
0,
508+
id="test_stream_count_zero_checks_no_streams",
509509
),
510510
pytest.param(
511511
{
@@ -520,12 +520,50 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
520520
],
521521
}
522522
},
523+
Status.SUCCEEDED,
524+
False,
525+
404,
526+
["Not found. The requested resource was not found on the server."],
527+
0,
528+
id="test_stream_count_zero_skips_failing_streams",
529+
),
530+
pytest.param(
531+
{
532+
"check": {
533+
"type": "CheckStream",
534+
"dynamic_streams_check_configs": [
535+
{
536+
"type": "DynamicStreamCheckConfig",
537+
"dynamic_stream_name": "http_dynamic_stream",
538+
},
539+
],
540+
}
541+
},
542+
Status.SUCCEEDED,
543+
False,
544+
200,
545+
[],
546+
1,
547+
id="test_stream_count_unset_checks_all_streams",
548+
),
549+
pytest.param(
550+
{
551+
"check": {
552+
"type": "CheckStream",
553+
"dynamic_streams_check_configs": [
554+
{
555+
"type": "DynamicStreamCheckConfig",
556+
"dynamic_stream_name": "http_dynamic_stream",
557+
},
558+
],
559+
}
560+
},
523561
Status.FAILED,
524562
False,
525563
404,
526564
["Not found. The requested resource was not found on the server."],
527565
0,
528-
id="test_stream_count_zero_failed",
566+
id="test_stream_count_unset_failed",
529567
),
530568
pytest.param(
531569
{"check": {"type": "CheckStream", "stream_names": ["non_existent_stream"]}},
@@ -722,6 +760,32 @@ def test_check_stream_missing_fields():
722760
)
723761

724762

763+
def test_check_stream_negative_stream_count():
764+
"""Test that a ValidationError is raised when stream_count is negative."""
765+
manifest = {
766+
**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT),
767+
**{
768+
"check": {
769+
"type": "CheckStream",
770+
"dynamic_streams_check_configs": [
771+
{
772+
"type": "DynamicStreamCheckConfig",
773+
"dynamic_stream_name": "http_dynamic_stream",
774+
"stream_count": -1,
775+
}
776+
],
777+
}
778+
},
779+
}
780+
with pytest.raises(ValidationError):
781+
ConcurrentDeclarativeSource(
782+
source_config=manifest,
783+
config=_CONFIG,
784+
catalog=None,
785+
state=None,
786+
)
787+
788+
725789
def test_check_stream_only_type_provided():
726790
manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **{"check": {"type": "CheckStream"}}}
727791
source = ConcurrentDeclarativeSource(

0 commit comments

Comments
 (0)