-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtest_analyzers_api.py
More file actions
322 lines (266 loc) · 13.2 KB
/
test_analyzers_api.py
File metadata and controls
322 lines (266 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
"""
Unit tests for /analyzers HTTP API endpoints.
Tests POST /analyzers, GET /analyzers, and DELETE /analyzers/{name} with a
mocked network manager (Docker is not required).
"""
import json
import threading
import time
import unittest
import http.client
from http.server import HTTPServer
from unittest.mock import MagicMock, patch
from api import MockAPIHandler as SimulateAPIHandler
class TestAnalyzersAPI(unittest.TestCase):
"""Tests for /analyzers CRUD endpoints with mocked network manager."""
@classmethod
def setUpClass(cls):
cls.httpd = HTTPServer(("127.0.0.1", 0), SimulateAPIHandler)
cls.port = cls.httpd.server_address[1]
cls.thread = threading.Thread(target=cls.httpd.serve_forever, daemon=True)
cls.thread.start()
# Build a mock network manager used by all tests
cls.mock_mgr = MagicMock()
cls.mock_mgr.list_analyzers.return_value = [
{"name": "test-analyzer", "ip": "172.20.0.2", "template": "mindray_bc5380"},
]
cls.mock_mgr.create_analyzer.return_value = {
"name": "my-analyzer",
"ip": "172.20.0.3",
"template": "mindray_bc5380",
}
cls.mock_mgr.remove_analyzer.side_effect = lambda n: n != "nonexistent"
@classmethod
def tearDownClass(cls):
cls.httpd.shutdown()
cls.httpd.server_close()
cls.thread.join(timeout=10)
# Reset the class-level attribute so other test modules aren't affected
SimulateAPIHandler._network_manager = None
def setUp(self):
# Patch _get_network_manager for every test so Docker is never touched
patcher = patch.object(
SimulateAPIHandler, "_get_network_manager", return_value=self.mock_mgr
)
patcher.start()
self.addCleanup(patcher.stop)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _conn(self):
return http.client.HTTPConnection("127.0.0.1", self.port, timeout=5)
def _post_analyzers(self, body=None, headers=None, raw_body=None):
"""Send POST /analyzers and return (status, parsed_body)."""
conn = self._conn()
hdrs = headers or {}
if body is not None and raw_body is None:
raw_body = json.dumps(body)
if raw_body is not None and "Content-Length" not in hdrs:
hdrs.setdefault("Content-Type", "application/json")
conn.request("POST", "/analyzers", body=raw_body, headers=hdrs)
resp = conn.getresponse()
data = json.loads(resp.read().decode("utf-8"))
status = resp.status
conn.close()
return status, data
def _get_analyzers(self):
conn = self._conn()
conn.request("GET", "/analyzers")
resp = conn.getresponse()
data = json.loads(resp.read().decode("utf-8"))
status = resp.status
conn.close()
return status, data
def _delete_analyzer(self, name, query=""):
path = f"/analyzers/{name}{query}"
conn = self._conn()
conn.request("DELETE", path)
resp = conn.getresponse()
data = json.loads(resp.read().decode("utf-8"))
status = resp.status
conn.close()
return status, data
def test_send_json_flushes_after_write(self):
class RecordingWFile:
def __init__(self):
self.events = []
self.payload = b""
def write(self, data):
self.events.append("write")
self.payload += data
def flush(self):
self.events.append("flush")
handler = SimulateAPIHandler.__new__(SimulateAPIHandler)
handler.wfile = RecordingWFile()
handler.send_response = MagicMock()
handler.send_header = MagicMock()
handler.end_headers = MagicMock()
handler._send_json(201, {"name": "my-analyzer", "ip": "172.20.0.3"})
self.assertEqual(handler.wfile.events, ["write", "flush"])
self.assertEqual(
json.loads(handler.wfile.payload.decode("utf-8")),
{"name": "my-analyzer", "ip": "172.20.0.3"},
)
# ------------------------------------------------------------------
# GET /analyzers
# ------------------------------------------------------------------
def test_get_analyzers_returns_200_with_list(self):
status, body = self._get_analyzers()
self.assertEqual(status, 200)
self.assertIn("analyzers", body)
self.assertIsInstance(body["analyzers"], list)
self.assertEqual(body["analyzers"][0]["name"], "test-analyzer")
# ------------------------------------------------------------------
# POST /analyzers — success
# ------------------------------------------------------------------
def test_post_valid_analyzer_returns_201(self):
status, body = self._post_analyzers({"name": "my-analyzer", "template": "mindray_bc5380"})
self.assertEqual(status, 201)
self.assertEqual(body["name"], "my-analyzer")
def test_post_calls_create_with_connect_mock_false(self):
"""The 201 response path creates the network synchronously but must
defer docker-attach so the HTTP caller doesn't wait for it."""
self.mock_mgr.create_analyzer.reset_mock()
status, _ = self._post_analyzers({"name": "async-1", "template": "t"})
self.assertEqual(status, 201)
self.mock_mgr.create_analyzer.assert_called_once()
_, kwargs = self.mock_mgr.create_analyzer.call_args
self.assertIs(kwargs.get("connect_mock"), False)
def test_post_invokes_connect_mock_after_response(self):
"""The background thread must call connect_mock_to_analyzer(name).
Patch threading.Thread so the target runs synchronously and we can
assert on mgr.connect_mock_to_analyzer without a sleep race."""
self.mock_mgr.connect_mock_to_analyzer.reset_mock()
self.mock_mgr.connect_mock_to_analyzer.return_value = True
real_thread = threading.Thread
def _inline_thread(target=None, args=(), kwargs=None, daemon=None):
# Run synchronously in the current thread so the assertion below
# is deterministic. Return a thread-like object with start()=noop.
if target:
target(*args, **(kwargs or {}))
return MagicMock(start=lambda: None)
with patch("api.threading.Thread", side_effect=_inline_thread):
status, _ = self._post_analyzers({"name": "async-2", "template": "t"})
self.assertEqual(status, 201)
# The connect runs in the httpd worker thread AFTER the 201 is flushed, so
# the client can return before the worker reaches it. Poll with a bounded
# deadline instead of racing the worker's scheduling (flakes under load).
deadline = time.time() + 5.0
while self.mock_mgr.connect_mock_to_analyzer.call_count == 0 and time.time() < deadline:
time.sleep(0.01)
self.mock_mgr.connect_mock_to_analyzer.assert_called_once_with("async-2")
# Sanity: we patched api.threading.Thread, not the global one
self.assertIs(threading.Thread, real_thread)
def test_post_logs_when_connect_mock_raises(self):
"""If connect_mock_to_analyzer raises, the error must be logged —
otherwise silent background failures leave analyzers created-but-
unreachable with no signal to the operator."""
self.mock_mgr.connect_mock_to_analyzer.reset_mock()
self.mock_mgr.connect_mock_to_analyzer.side_effect = RuntimeError(
"docker down"
)
def _inline_thread(target=None, args=(), kwargs=None, daemon=None):
if target:
target(*args, **(kwargs or {}))
return MagicMock(start=lambda: None)
# Patch the module logger directly (assertLogs misses cross-thread
# emissions from httpd's worker thread).
with patch("api.threading.Thread", side_effect=_inline_thread), \
patch("api.logger") as mock_logger:
status, _ = self._post_analyzers({"name": "async-3", "template": "t"})
self.assertEqual(status, 201)
# Same worker-thread race as above — poll for the cross-thread log.
deadline = time.time() + 5.0
while mock_logger.exception.call_count == 0 and time.time() < deadline:
time.sleep(0.01)
mock_logger.exception.assert_called_once()
msg = mock_logger.exception.call_args[0][0]
self.assertIn("connect_mock_to_analyzer raised", msg)
def test_post_valid_name_with_underscores_and_digits(self):
status, _ = self._post_analyzers({"name": "analyzer_01", "template": "t"})
self.assertEqual(status, 201)
def test_post_valid_name_with_dashes(self):
status, _ = self._post_analyzers({"name": "my-cool-analyzer", "template": "t"})
self.assertEqual(status, 201)
# ------------------------------------------------------------------
# POST /analyzers — name validation
# ------------------------------------------------------------------
def test_post_invalid_name_slash_returns_400(self):
status, body = self._post_analyzers({"name": "foo/bar", "template": "t"})
self.assertEqual(status, 400)
self.assertIn("alphanumeric", body["error"])
def test_post_invalid_name_space_returns_400(self):
status, body = self._post_analyzers({"name": "foo bar", "template": "t"})
self.assertEqual(status, 400)
self.assertIn("alphanumeric", body["error"])
def test_post_invalid_name_dot_returns_400(self):
status, body = self._post_analyzers({"name": "foo.bar", "template": "t"})
self.assertEqual(status, 400)
self.assertIn("alphanumeric", body["error"])
def test_post_invalid_name_special_chars_returns_400(self):
status, body = self._post_analyzers({"name": "foo@bar!", "template": "t"})
self.assertEqual(status, 400)
self.assertIn("alphanumeric", body["error"])
# ------------------------------------------------------------------
# POST /analyzers — missing fields
# ------------------------------------------------------------------
def test_post_missing_name_returns_400(self):
status, body = self._post_analyzers({"template": "t"})
self.assertEqual(status, 400)
self.assertIn("required", body["error"])
def test_post_missing_template_returns_400(self):
status, body = self._post_analyzers({"name": "ok"})
self.assertEqual(status, 400)
self.assertIn("required", body["error"])
def test_post_empty_name_returns_400(self):
status, body = self._post_analyzers({"name": "", "template": "t"})
self.assertEqual(status, 400)
self.assertIn("required", body["error"])
# ------------------------------------------------------------------
# POST /analyzers — bad Content-Length
# ------------------------------------------------------------------
def test_post_invalid_content_length_returns_400(self):
conn = self._conn()
headers = {"Content-Length": "abc", "Content-Type": "application/json"}
conn.request("POST", "/analyzers", body=b"", headers=headers)
resp = conn.getresponse()
data = json.loads(resp.read().decode("utf-8"))
conn.close()
self.assertEqual(resp.status, 400)
self.assertIn("Content-Length", data["error"])
# ------------------------------------------------------------------
# POST /analyzers — empty / malformed body
# ------------------------------------------------------------------
def test_post_empty_body_returns_400(self):
conn = self._conn()
headers = {"Content-Length": "0", "Content-Type": "application/json"}
conn.request("POST", "/analyzers", body=b"", headers=headers)
resp = conn.getresponse()
data = json.loads(resp.read().decode("utf-8"))
conn.close()
self.assertEqual(resp.status, 400)
self.assertIn("body required", data["error"])
def test_post_invalid_json_returns_400(self):
status, body = self._post_analyzers(raw_body="not json {{{")
self.assertEqual(status, 400)
self.assertIn("Invalid JSON", body["error"])
# ------------------------------------------------------------------
# DELETE /analyzers/{name}
# ------------------------------------------------------------------
def test_delete_existing_analyzer_returns_200(self):
status, body = self._delete_analyzer("valid-name")
self.assertEqual(status, 200)
self.assertTrue(body["removed"])
self.assertEqual(body["name"], "valid-name")
def test_delete_with_query_string_strips_query(self):
status, body = self._delete_analyzer("valid-name", "?query=1")
self.assertEqual(status, 200)
self.assertTrue(body["removed"])
self.assertEqual(body["name"], "valid-name")
def test_delete_nonexistent_analyzer_returns_404(self):
status, body = self._delete_analyzer("nonexistent")
self.assertEqual(status, 404)
self.assertFalse(body["removed"])
self.assertIn("not found", body["error"])
if __name__ == "__main__":
unittest.main()