-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtest_analyzer_network_manager.py
More file actions
212 lines (180 loc) · 9.36 KB
/
test_analyzer_network_manager.py
File metadata and controls
212 lines (180 loc) · 9.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
"""
Unit tests for AnalyzerNetworkManager — deterministic, idempotent provisioning.
Docker is mocked; no real daemon required.
"""
import sys
import threading
import time
import unittest
from unittest.mock import MagicMock, patch
from analyzer_network_manager import (
DYNAMIC_SUBNET_BASE,
DYNAMIC_SUBNET_MAX,
FIXED_SUBNETS,
NETWORK_PREFIX,
AnalyzerNetworkManager,
)
def _net(name="net", subnet=None, containers=None):
"""A fake Docker network with IPAM subnet + container attachments."""
n = MagicMock()
n.name = name
cfg = [{"Subnet": subnet}] if subnet else []
n.attrs = {"IPAM": {"Config": cfg}, "Containers": containers or {}}
return n
class Base(unittest.TestCase):
def setUp(self):
self.mgr = AnalyzerNetworkManager()
self.docker = MagicMock()
self.docker.networks.list.return_value = []
# Default: no network exists (get raises NotFound-like).
self.docker.networks.get.side_effect = Exception("404 network not found")
self.mgr._docker = self.docker
# Skip real container connects unless a test opts in.
self.mgr._mock_container = ""
self.mgr._bridge_container = ""
# create_analyzer does `import docker.types` — stub it.
types = MagicMock()
p = patch.dict(sys.modules, {"docker": MagicMock(types=types), "docker.types": types})
p.start()
self.addCleanup(p.stop)
# Keep attach-retry backoff out of unit timing.
b = patch("analyzer_network_manager.ATTACH_RETRY_BACKOFF_S", 0)
b.start()
self.addCleanup(b.stop)
class TestDeterministicAllocation(Base):
def test_fixed_exact_match(self):
self.assertEqual(self.mgr._subnet_id_for("genexpert"), FIXED_SUBNETS["genexpert"])
def test_fixed_case_insensitive(self):
self.assertEqual(self.mgr._subnet_id_for("GeneXpert"), FIXED_SUBNETS["genexpert"])
def test_substring_is_not_the_fixed_subnet(self):
s = self.mgr._subnet_id_for("demo-genexpert-site1")
self.assertNotEqual(s, FIXED_SUBNETS["genexpert"])
self.assertTrue(DYNAMIC_SUBNET_BASE <= s <= DYNAMIC_SUBNET_MAX)
def test_same_name_same_subnet_across_calls_and_instances(self):
a = self.mgr._subnet_id_for("demo-outbound-gx")
b = self.mgr._subnet_id_for("demo-outbound-gx")
c = AnalyzerNetworkManager()._subnet_id_for("demo-outbound-gx") # fresh instance, no shared state
self.assertEqual(a, b)
self.assertEqual(a, c, "deterministic across instances — no mutable counter")
def test_distinct_demo_names_get_distinct_subnets(self):
self.assertNotEqual(
self.mgr._subnet_id_for("demo-outbound-gx"),
self.mgr._subnet_id_for("demo-outbound-bc5380"),
)
def test_subnet_id_of_parses_actual_subnet(self):
self.assertEqual(AnalyzerNetworkManager._subnet_id_of(_net(subnet="10.42.77.0/24")), 77)
self.assertIsNone(AnalyzerNetworkManager._subnet_id_of(_net(subnet=None)))
class TestCreateAnalyzer(Base):
def test_fresh_create_uses_deterministic_subnet(self):
self.docker.networks.create.return_value = _net()
r = self.mgr.create_analyzer("demo-outbound-gx", "tmpl", port=9600)
expected = self.mgr._subnet_id_for("demo-outbound-gx")
self.assertEqual(r["subnet"], f"10.42.{expected}.0/24")
self.assertEqual(r["ip"], f"10.42.{expected}.10")
self.docker.networks.create.assert_called_once()
def test_idempotent_adopts_existing_subnet_without_recreating(self):
# The network already exists with a DIFFERENT subnet than the deterministic
# one (e.g. an orphan, or a prior different allocation). Adopt its ACTUAL
# subnet so containers connect at in-subnet IPs — and do NOT recreate.
existing = _net(name=NETWORK_PREFIX + "bc5380", subnet="10.42.99.0/24")
self.docker.networks.get.side_effect = None
self.docker.networks.get.return_value = existing
r = self.mgr.create_analyzer("bc5380", "tmpl", port=5380)
self.assertEqual(r["subnet"], "10.42.99.0/24")
self.assertEqual(r["ip"], "10.42.99.10")
self.docker.networks.create.assert_not_called()
def test_second_create_same_name_is_noop_returns_same(self):
created = _net(name=NETWORK_PREFIX + "demo-x", subnet=None)
self.docker.networks.create.return_value = created
first = self.mgr.create_analyzer("demo-x", "tmpl")
# Second call: now the cache has it AND networks.get must find it.
self.docker.networks.get.side_effect = None
self.docker.networks.get.return_value = created
second = self.mgr.create_analyzer("demo-x", "tmpl")
self.assertEqual(first, second)
self.docker.networks.create.assert_called_once() # not created again
def test_overlap_probes_to_next_free_subnet(self):
good = _net()
self.docker.networks.create.side_effect = [
Exception("403 Client Error: Pool overlaps with other one on this address space"),
good,
]
r = self.mgr.create_analyzer("demo-x", "tmpl")
self.assertEqual(self.docker.networks.create.call_count, 2)
start = self.mgr._subnet_id_for("demo-x")
# list() returns [] so the next id is free → start + 1
self.assertEqual(r["subnet"], f"10.42.{start + 1}.0/24")
def test_connect_failure_rolls_back_only_a_network_we_created(self):
self.mgr._mock_container = "mock-ctr"
created = _net(name=NETWORK_PREFIX + "demo-x")
created.connect.side_effect = Exception("boom: cannot connect")
self.docker.networks.create.return_value = created
# get: existence-check raises (None) → create; then cleanup-lookup returns it.
self.docker.networks.get.side_effect = [Exception("404"), created]
with self.assertRaises(Exception):
self.mgr.create_analyzer("demo-x", "tmpl")
created.remove.assert_called_once()
def test_transient_connect_failure_is_retried_then_succeeds(self):
# A transient Docker-churn attach failure must be retried, not surfaced as
# a provisioning failure — this is what prevents the `ip=missing` flake.
self.mgr._mock_container = "mock-ctr"
created = _net(name=NETWORK_PREFIX + "demo-x")
created.connect.side_effect = [Exception("transient: cannot connect"), None]
self.docker.networks.create.return_value = created
r = self.mgr.create_analyzer("demo-x", "tmpl")
self.assertEqual(r["name"], "demo-x")
self.assertIn("ip", r)
self.assertEqual(created.connect.call_count, 2) # retried past the transient failure
created.remove.assert_not_called() # provisioning succeeded — no rollback
def test_connect_failure_does_NOT_remove_an_adopted_network(self):
# We adopted an existing (possibly live/seeded) network; a connect failure
# must NOT remove it.
self.mgr._mock_container = "mock-ctr"
existing = _net(name=NETWORK_PREFIX + "bc5380", subnet="10.42.21.0/24")
existing.connect.side_effect = Exception("boom: cannot connect")
self.docker.networks.get.side_effect = None
self.docker.networks.get.return_value = existing
with self.assertRaises(Exception):
self.mgr.create_analyzer("bc5380", "tmpl")
existing.remove.assert_not_called()
self.docker.networks.create.assert_not_called()
class TestReconcileOrphans(Base):
def test_drains_zero_container_orphans_keeps_live_and_foreign(self):
orphan = _net(name=NETWORK_PREFIX + "demo-old", subnet="10.42.60.0/24", containers={})
live = _net(name=NETWORK_PREFIX + "genexpert", subnet="10.42.20.0/24",
containers={"c1": {"Name": "openelis-analyzer-mock"}})
foreign = _net(name="some-other-net", containers={})
self.docker.networks.list.return_value = [orphan, live, foreign]
# _cleanup_network looks the network up by name before removing it.
self.docker.networks.get.side_effect = None
self.docker.networks.get.return_value = orphan
removed = self.mgr.reconcile_orphans()
self.assertEqual(removed, 1, "only the zero-container mock-analyzer orphan")
orphan.remove.assert_called_once()
live.remove.assert_not_called()
foreign.remove.assert_not_called()
class TestConcurrency(Base):
def test_concurrent_distinct_analyzers_get_distinct_subnets(self):
# Deterministic per-name allocation means concurrent creates of DIFFERENT
# analyzers don't contend on any shared counter — each lands on its own
# name-derived subnet. The delay widens any race window.
def slow_create(*args, **kwargs):
time.sleep(0.02)
return _net()
self.docker.networks.create.side_effect = slow_create
results = {}
errors = []
def worker(i):
try:
results[i] = self.mgr.create_analyzer(f"demo-conc-{i}", "tmpl")["subnet"]
except Exception as e: # noqa: BLE001
errors.append(repr(e))
threads = [threading.Thread(target=worker, args=(i,)) for i in range(6)]
for t in threads:
t.start()
for t in threads:
t.join()
self.assertEqual(errors, [], f"concurrent creates errored: {errors}")
self.assertEqual(len(set(results.values())), 6, f"subnets collided: {results}")
if __name__ == "__main__":
unittest.main()