Skip to content

Commit 706ba21

Browse files
committed
Fix mypy and unittest
1 parent 739c429 commit 706ba21

2 files changed

Lines changed: 169 additions & 81 deletions

File tree

airbyte_cdk/sources/declarative/checks/check_stream.py

Lines changed: 82 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import logging
66
import traceback
77
from dataclasses import InitVar, dataclass
8-
from typing import Any, List, Mapping, Tuple
8+
from typing import Any, List, Mapping, Tuple, Dict, Optional
99

1010
from airbyte_cdk import AbstractSource
1111
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
@@ -32,81 +32,105 @@ class CheckStream(ConnectionChecker):
3232
"""
3333

3434
stream_names: List[str]
35-
dynamic_streams_check_configs: List[DynamicStreamCheckConfig]
3635
parameters: InitVar[Mapping[str, Any]]
36+
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
3737

3838
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3939
self._parameters = parameters
40+
if self.dynamic_streams_check_configs is None:
41+
self.dynamic_streams_check_configs = []
42+
43+
def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> Tuple[bool, str]:
44+
"""Logs an error and returns a formatted error message."""
45+
error_message = f"Encountered an error while {action}. Error: {error}"
46+
logger.error(error_message + f"Error traceback: \n {traceback.format_exc()}", exc_info=True)
47+
return False, error_message
4048

4149
def check_connection(
42-
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
50+
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
4351
) -> Tuple[bool, Any]:
52+
"""Checks the connection to the source and its streams."""
4453
try:
4554
streams = source.streams(config=config)
55+
if not streams:
56+
return False, f"No streams to connect to from source {source}"
4657
except Exception as error:
47-
error_message = (
48-
f"Encountered an error trying to connect to streams. Error: {error}"
49-
)
50-
logger.error(error_message, exc_info=True)
51-
return False, error_message
58+
return self._log_error(logger, "connecting to streams", error)
5259

5360
stream_name_to_stream = {s.name: s for s in streams}
54-
if len(streams) == 0:
55-
return False, f"No streams to connect to from source {source}"
5661
for stream_name in self.stream_names:
57-
if stream_name not in stream_name_to_stream.keys():
62+
if stream_name not in stream_name_to_stream:
5863
raise ValueError(
59-
f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}."
64+
f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}."
6065
)
6166

67+
stream_availability, message = self._check_stream_availability(stream_name_to_stream, stream_name, logger)
68+
if not stream_availability:
69+
return stream_availability, message
70+
71+
should_check_dynamic_streams = hasattr(source, "resolved_manifest") and hasattr(source, "dynamic_streams") and self.dynamic_streams_check_configs
72+
73+
if should_check_dynamic_streams:
74+
return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger)
75+
76+
return True, None
77+
78+
def _check_stream_availability(self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger) -> Tuple[bool, Any]:
79+
"""Checks if streams are available."""
80+
availability_strategy = HttpAvailabilityStrategy()
81+
try:
6282
stream = stream_name_to_stream[stream_name]
63-
availability_strategy = HttpAvailabilityStrategy()
83+
stream_is_available, reason = availability_strategy.check_availability(stream, logger)
84+
if not stream_is_available:
85+
message = f"Stream {stream_name} is not available: {reason}"
86+
logger.warning(message)
87+
return stream_is_available, message
88+
except Exception as error:
89+
return self._log_error(logger, f"checking availability of stream {stream_name}", error)
90+
return True, None
91+
92+
def _check_dynamic_streams_availability(
93+
self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger
94+
) -> Tuple[bool, Any]:
95+
"""Checks the availability of dynamic streams."""
96+
dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
97+
dynamic_stream_name_to_dynamic_stream = {
98+
ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams)
99+
}
100+
generated_streams = self._map_generated_streams(source.dynamic_streams) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method
101+
102+
for check_config in self.dynamic_streams_check_configs: # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__
103+
if check_config.dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream:
104+
return False, f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest."
105+
106+
generated = generated_streams.get(check_config.dynamic_stream_name, [])
107+
stream_availability, message = self._check_generated_streams_availability(generated, stream_name_to_stream, logger, check_config.stream_count)
108+
if not stream_availability:
109+
return stream_availability, message
110+
111+
return True, None
112+
113+
def _map_generated_streams(self, dynamic_streams: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
114+
"""Maps dynamic stream names to their corresponding generated streams."""
115+
mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
116+
for stream in dynamic_streams:
117+
mapped_streams.setdefault(stream["dynamic_stream_name"], []).append(stream)
118+
return mapped_streams
119+
120+
def _check_generated_streams_availability(
121+
self, generated_streams: List[Dict[str, Any]], stream_name_to_stream: Dict[str, Any],
122+
logger: logging.Logger, max_count: int
123+
) -> Tuple[bool, Any]:
124+
"""Checks availability of generated dynamic streams."""
125+
availability_strategy = HttpAvailabilityStrategy()
126+
for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]:
127+
stream = stream_name_to_stream[declarative_stream["name"]]
64128
try:
65-
stream_is_available, reason = availability_strategy.check_availability(
66-
stream, logger
67-
)
129+
stream_is_available, reason = availability_strategy.check_availability(stream, logger)
68130
if not stream_is_available:
69-
return False, reason
131+
message = f"Dynamic Stream {stream.name} is not available: {reason}"
132+
logger.warning(message)
133+
return False, message
70134
except Exception as error:
71-
logger.error(
72-
f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}"
73-
)
74-
return False, f"Unable to connect to stream {stream_name} - {error}"
75-
76-
if hasattr(source, "resolved_manifest") and hasattr(source, "dynamic_streams") and self.dynamic_streams_check_configs:
77-
dynamic_stream_name_to_dynamic_stream = {dynamic_stream.get("name", f"dynamic_stream_{i}"): dynamic_stream for i, dynamic_stream in enumerate(source.resolved_manifest.get("dynamic_streams", []))}
78-
79-
dynamic_stream_name_to_generated_streams = {}
80-
for stream in source.dynamic_streams:
81-
dynamic_stream_name_to_generated_streams[
82-
stream["dynamic_stream_name"]] = dynamic_stream_name_to_generated_streams.setdefault(
83-
stream["dynamic_stream_name"], []) + [stream]
84-
85-
for dynamic_streams_check_config in self.dynamic_streams_check_configs:
86-
dynamic_stream = dynamic_stream_name_to_dynamic_stream.get(dynamic_streams_check_config.dynamic_stream_name)
87-
88-
is_config_depend = dynamic_stream["components_resolver"]["type"] == "ConfigComponentsResolver"
89-
90-
if not is_config_depend and not bool(dynamic_streams_check_config.stream_count):
91-
continue
92-
93-
generated_streams = dynamic_stream_name_to_generated_streams.get(dynamic_streams_check_config.dynamic_stream_name)
94-
availability_strategy = HttpAvailabilityStrategy()
95-
96-
for declarative_stream in generated_streams[: min(dynamic_streams_check_config.stream_count, len(generated_streams))]:
97-
stream = stream_name_to_stream.get(declarative_stream["name"])
98-
try:
99-
stream_is_available, reason = availability_strategy.check_availability(
100-
stream, logger
101-
)
102-
if not stream_is_available:
103-
logger.warning(f"Stream {stream.name} is not available: {reason}")
104-
return False, reason
105-
except Exception as error:
106-
error_message = (
107-
f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
108-
)
109-
logger.error(error_message, exc_info=True)
110-
return False, error_message
111-
135+
return self._log_error(logger, f"checking availability of dynamic stream {stream.name}", error)
112136
return True, None

unit_tests/sources/declarative/checks/test_check_stream.py

Lines changed: 87 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import pytest
1212
import requests
1313

14+
from jsonschema.exceptions import ValidationError
1415
from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
1516
from airbyte_cdk.sources.streams.http import HttpStream
1617
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
@@ -343,52 +344,97 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
343344
"name": "static_stream",
344345
"primary_key": "id",
345346
"schema_loader": {
346-
"type": "InlineSchemaLoader",
347-
"schema": {
348-
"$schema": "http://json-schema.org/schema#",
349-
"properties": {
350-
"id": {"type": "integer"},
351-
"name": {"type": "string"},
352-
},
353-
"type": "object",
347+
"type": "InlineSchemaLoader",
348+
"schema": {
349+
"$schema": "http://json-schema.org/schema#",
350+
"properties": {
351+
"id": {"type": "integer"},
352+
"name": {"type": "string"},
354353
},
355-
}
354+
"type": "object",
355+
},
356+
}
356357
}
357358
]
358359
}
359360

360361

361362
@pytest.mark.parametrize(
362-
"check_component",
363+
"check_component, expected_result, expectation, response_code, expected_messages",
363364
[
364-
pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}},
365+
pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, True, False, 200,
366+
[{"id": 1, "name": "static_1"}, {"id": 2, "name": "static_2"}],
365367
id="test_check_only_static_streams"),
366368
pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"],
367369
"dynamic_streams_check_configs": [
368370
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream",
369-
"stream_count": 1}]}}, id="test_check_static_streams_and_http_dynamic_stream"),
371+
"stream_count": 1}]}}, True, False, 200, [],
372+
id="test_check_static_streams_and_http_dynamic_stream"),
370373
pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"],
371374
"dynamic_streams_check_configs": [
372375
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1",
373-
"stream_count": 1}]}}, id="test_check_static_streams_and_config_dynamic_stream"),
374-
pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1",
375-
"stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}},
376+
"stream_count": 1}]}}, True, False, 200, [],
377+
id="test_check_static_streams_and_config_dynamic_stream"),
378+
pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [
379+
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1",
380+
"stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}},
381+
True, False, 200, [],
376382
id="test_check_http_dynamic_stream_and_config_dynamic_stream"),
377383
pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"],
378384
"dynamic_streams_check_configs": [
379385
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1",
380386
"stream_count": 1},
381-
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}},
387+
{"type": "DynamicStreamCheckConfig",
388+
"dynamic_stream_name": "http_dynamic_stream"}]}}, True, False, 200, [],
382389
id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream"),
390+
pytest.param(
391+
{"check": {"type": "CheckStream", "stream_names": ["non_existent_stream"]}},
392+
False,
393+
True, 200, [],
394+
id="test_non_existent_static_stream"
395+
),
396+
pytest.param(
397+
{"check": {"type": "CheckStream", "dynamic_streams_check_configs": [
398+
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "unknown_dynamic_stream",
399+
"stream_count": 1}]}}
400+
,
401+
False,
402+
False, 200, [],
403+
id="test_non_existent_dynamic_stream"
404+
),
405+
pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, False, False, 404,
406+
["Not found. The requested resource was not found on the server."],
407+
id="test_stream_unavailable_unhandled_error"),
408+
pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, False, False, 403,
409+
["Forbidden. You don't have permission to access this resource."],
410+
id="test_stream_unavailable_handled_error"),
411+
pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, False, False, 401,
412+
["Unauthorized. Please ensure you are authenticated correctly."],
413+
id="test_stream_unauthorized_error"),
414+
pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [
415+
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1",
416+
"stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}},
417+
False, False, 404, ["Not found. The requested resource was not found on the server."],
418+
id="test_dynamic_stream_unavailable_unhandled_error"),
419+
pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [
420+
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1",
421+
"stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}},
422+
False, False, 403, ["Forbidden. You don't have permission to access this resource."],
423+
id="test_dynamic_stream_unavailable_handled_error"),
424+
pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [
425+
{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1",
426+
"stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}},
427+
False, False, 401, ["Unauthorized. Please ensure you are authenticated correctly."],
428+
id="test_dynamic_stream_unauthorized_error"),
383429
],
384430
)
385-
def test_check_stream(check_component):
431+
def test_check_stream1(check_component, expected_result, expectation, response_code, expected_messages):
386432
manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **check_component}
387433

388434
with HttpMocker() as http_mocker:
389435
static_stream_request = HttpRequest(url="https://api.test.com/static")
390436
static_stream_response = HttpResponse(
391-
body=json.dumps([{"id": 1, "name": "static_1"}, {"id": 2, "name": "static_2"}])
437+
body=json.dumps(expected_messages), status_code=response_code
392438
)
393439
http_mocker.get(static_stream_request, static_stream_response)
394440

@@ -399,11 +445,11 @@ def test_check_stream(check_component):
399445
http_mocker.get(items_request, items_response)
400446

401447
item_request = HttpRequest(url="https://api.test.com/items/1")
402-
item_response = HttpResponse(body=json.dumps([]), status_code=200)
448+
item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code)
403449
http_mocker.get(item_request, item_response)
404450

405451
item_request = HttpRequest(url="https://api.test.com/items/3")
406-
item_response = HttpResponse(body=json.dumps([]), status_code=200)
452+
item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code)
407453
http_mocker.get(item_request, item_response)
408454

409455
source = ConcurrentDeclarativeSource(
@@ -412,14 +458,32 @@ def test_check_stream(check_component):
412458
catalog=None,
413459
state=None,
414460
)
461+
if expectation:
462+
with pytest.raises(ValueError):
463+
source.check_connection(logger, _CONFIG)
464+
else:
465+
stream_is_available, reason = source.check_connection(logger, _CONFIG)
415466

416-
stream_is_available, reason = source.check_connection(logger, _CONFIG)
467+
assert stream_is_available == expected_result
417468

418-
assert stream_is_available
469+
470+
def test_check_stream_missing_fields():
471+
"""Test if ValueError is raised when dynamic_streams_check_configs is missing required fields."""
472+
manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT),
473+
**{"check": {"type": "CheckStream",
474+
"dynamic_streams_check_configs": [{"type": "DynamicStreamCheckConfig"}]}}}
475+
with pytest.raises(ValidationError):
476+
source = ConcurrentDeclarativeSource(
477+
source_config=manifest,
478+
config=_CONFIG,
479+
catalog=None,
480+
state=None,
481+
)
419482

420483

421484
def test_check_stream_only_type_provided():
422-
manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **{"check": {"type": "CheckStream"}}}
485+
manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT),
486+
**{"check": {"type": "CheckStream"}}}
423487
source = ConcurrentDeclarativeSource(
424488
source_config=manifest,
425489
config=_CONFIG,

0 commit comments

Comments
 (0)