-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathtest_availability_strategy.py
More file actions
158 lines (126 loc) · 5 KB
/
test_availability_strategy.py
File metadata and controls
158 lines (126 loc) · 5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import io
import json
import logging
from typing import Any, Iterable, Mapping, Optional
import pytest
import requests
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from airbyte_cdk.sources.streams.http.http import HttpStream
logger = logging.getLogger("airbyte")
class MockHttpStream(HttpStream):
url_base = "https://test_base_url.com"
primary_key = ""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.resp_counter = 1
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None
def path(self, **kwargs) -> str:
return ""
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
stub_resp = {"data": self.resp_counter}
self.resp_counter += 1
yield stub_resp
pass
def retry_factor(self) -> float:
return 0.01
@pytest.mark.parametrize(
("status_code", "json_contents", "expected_is_available", "expected_messages"),
[
(
403,
{"error": "Something went wrong"},
False,
[
"Source's API denied access. Configured credentials have insufficient permissions.",
"Source's API denied access. Configured credentials have insufficient permissions.",
],
),
(200, {}, True, []),
],
)
@pytest.mark.parametrize(
("include_source", "expected_docs_url_messages"),
[
(True, ["Source's API denied access. Configured credentials have insufficient permissions."]),
(False, ["Source's API denied access. Configured credentials have insufficient permissions."]),
],
)
@pytest.mark.parametrize("records_as_list", [True, False])
def test_default_http_availability_strategy(
mocker,
status_code,
json_contents,
expected_is_available,
expected_messages,
include_source,
expected_docs_url_messages,
records_as_list,
):
class MockListHttpStream(MockHttpStream):
def read_records(self, *args, **kvargs):
if records_as_list:
return list(super().read_records(*args, **kvargs))
else:
return super().read_records(*args, **kvargs)
http_stream = MockListHttpStream()
response = requests.Response()
response.status_code = status_code
response.raw = io.BytesIO(json.dumps(json_contents).encode("utf-8"))
mocker.patch.object(requests.Session, "send", return_value=response)
actual_is_available, reason = HttpAvailabilityStrategy().check_availability(http_stream, logger)
assert actual_is_available == expected_is_available
if expected_is_available:
assert reason is None
else:
all_expected_messages = expected_messages + expected_docs_url_messages
for message in all_expected_messages:
assert message in reason
def test_http_availability_raises_unhandled_error(mocker):
http_stream = MockHttpStream()
req = requests.Response()
req.status_code = 404
mocker.patch.object(requests.Session, "send", return_value=req)
is_available, reason = HttpAvailabilityStrategy().check_availability(http_stream, logger)
assert is_available is False
assert "Requested resource not found on source's API." in reason
def test_send_handles_retries_when_checking_availability(mocker, caplog):
mocker.patch("time.sleep", lambda x: None)
http_stream = MockHttpStream()
req_1 = requests.Response()
req_1.status_code = 429
req_2 = requests.Response()
req_2.status_code = 503
req_3 = requests.Response()
req_3.status_code = 200
mock_send = mocker.patch.object(requests.Session, "send", side_effect=[req_1, req_2, req_3])
with caplog.at_level(logging.INFO):
stream_is_available, _ = HttpAvailabilityStrategy().check_availability(
stream=http_stream, logger=logger
)
assert stream_is_available
assert mock_send.call_count == 3
for message in ["Caught retryable error", "Service unavailable", "Service unavailable"]:
assert message in caplog.text
@pytest.mark.parametrize("records_as_list", [True, False])
def test_http_availability_strategy_on_empty_stream(mocker, records_as_list):
class MockEmptyHttpStream(mocker.MagicMock, MockHttpStream):
def __init__(self, *args, **kvargs):
mocker.MagicMock.__init__(self)
self.read_records = mocker.MagicMock()
empty_stream = MockEmptyHttpStream()
assert isinstance(empty_stream, HttpStream)
# Generator should have no values to generate
if records_as_list:
empty_stream.read_records.return_value = []
else:
empty_stream.read_records.return_value = iter([])
logger = logging.getLogger("airbyte.test-source")
stream_is_available, _ = HttpAvailabilityStrategy().check_availability(
stream=empty_stream, logger=logger
)
assert stream_is_available
assert empty_stream.read_records.called