Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions airbyte_cdk/sources/declarative/checks/check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DynamicStreamCheckConfig:
and type enforcement."""

dynamic_stream_name: str
stream_count: int = 0
stream_count: Optional[int] = None


@dataclass
Expand Down Expand Up @@ -162,10 +162,15 @@ def _check_generated_streams_availability(
generated_streams: List[Dict[str, Any]],
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
logger: logging.Logger,
max_count: int,
max_count: Optional[int],
) -> Tuple[bool, Any]:
"""Checks availability of generated dynamic streams."""
for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]:
"""Checks availability of generated dynamic streams.

If `max_count` is `None`, all generated streams are checked. Otherwise, the
first `max_count` streams are checked (capped at the number of available streams).
"""
streams_to_check = generated_streams if max_count is None else generated_streams[:max_count]
for declarative_stream in streams_to_check:
stream = stream_name_to_stream[declarative_stream["name"]]
try:
stream_is_available, reason = evaluate_availability(stream, logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,9 @@ definitions:
type: string
stream_count:
title: Stream Count
description: The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.
description: The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked.
type: integer
default: 0
minimum: 1
CheckDynamicStream:
title: Dynamic Streams to Check
description: (This component is experimental. Use at your own risk.) Defines the dynamic streams to try reading when running a check operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ class DynamicStreamCheckConfig(BaseModel):
..., description="The dynamic stream name.", title="Dynamic Stream Name"
)
stream_count: Optional[int] = Field(
0,
description="The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.",
None,
description="The number of streams to attempt reading from during a check operation. If unset, all generated streams are checked. Must be a positive integer; if it exceeds the total number of available streams, all streams are checked.",
ge=1,
title="Stream Count",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ def create_dynamic_stream_check_config(
) -> DynamicStreamCheckConfig:
return DynamicStreamCheckConfig(
dynamic_stream_name=model.dynamic_stream_name,
stream_count=model.stream_count or 0,
stream_count=model.stream_count,
)

def create_check_stream(
Expand Down
72 changes: 70 additions & 2 deletions unit_tests/sources/declarative/checks/test_check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
False,
200,
[],
0,
1,
id="test_check_http_dynamic_stream_and_config_dynamic_stream",
),
pytest.param(
Expand All @@ -464,7 +464,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
False,
200,
[],
0,
1,
id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream",
),
pytest.param(
Expand All @@ -487,6 +487,44 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
1,
id="test_stream_count_gt_generated_streams",
),
pytest.param(
{
"check": {
"type": "CheckStream",
"dynamic_streams_check_configs": [
{
"type": "DynamicStreamCheckConfig",
"dynamic_stream_name": "http_dynamic_stream",
},
],
}
},
Status.SUCCEEDED,
False,
200,
[],
1,
id="test_stream_count_unset_checks_all_streams",
),
pytest.param(
{
"check": {
"type": "CheckStream",
"dynamic_streams_check_configs": [
{
"type": "DynamicStreamCheckConfig",
"dynamic_stream_name": "http_dynamic_stream",
},
],
}
},
Status.FAILED,
False,
404,
["Not found. The requested resource was not found on the server."],
0,
id="test_stream_count_unset_failed",
),
pytest.param(
{"check": {"type": "CheckStream", "stream_names": ["non_existent_stream"]}},
Status.FAILED,
Expand Down Expand Up @@ -682,6 +720,36 @@ def test_check_stream_missing_fields():
)


@pytest.mark.parametrize(
"stream_count",
[pytest.param(0, id="zero"), pytest.param(-1, id="negative")],
)
def test_check_stream_non_positive_stream_count(stream_count: int) -> None:
"""A ValidationError is raised when stream_count is less than 1."""
manifest = {
**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT),
**{
"check": {
"type": "CheckStream",
"dynamic_streams_check_configs": [
{
"type": "DynamicStreamCheckConfig",
"dynamic_stream_name": "http_dynamic_stream",
"stream_count": stream_count,
}
],
}
},
}
with pytest.raises(ValidationError):
ConcurrentDeclarativeSource(
source_config=manifest,
config=_CONFIG,
catalog=None,
state=None,
)


def test_check_stream_only_type_provided():
manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **{"check": {"type": "CheckStream"}}}
source = ConcurrentDeclarativeSource(
Expand Down
Loading