-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathanalyzer_network_manager.py
More file actions
407 lines (360 loc) · 17.8 KB
/
analyzer_network_manager.py
File metadata and controls
407 lines (360 loc) · 17.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
"""
Deterministic, idempotent Docker network management for mock analyzers.
Each mock analyzer gets its own Docker network (10.42.N.0/24) so it has a
distinct source IP — the bridge identifies analyzers by source IP. The mock
container and bridge container are both attached to each analyzer's network.
Dynamic but deterministic:
- The subnet for an analyzer is a PURE FUNCTION of its name (see
`_subnet_id_for`): the same analyzer always lands on the same IP across runs,
independent of creation order, concurrency, or what else is allocated. There
is no mutable allocation counter.
- Provisioning is CONVERGENT/idempotent (`create_analyzer` = "ensure"): Docker is
the source of truth. If the network already exists (live, or an orphan from a
prior run) its actual subnet is adopted rather than guessed, and connecting is
a no-op when already attached. Re-running for the same analyzer yields the same
result. A boot-time `reconcile_orphans` drains leftovers from crashed runs.
This is what lets the per-analyzer-network design withstand create/teardown churn
without leaking orphans or flaking on "ip=missing".
Requires:
- Docker socket mounted: /var/run/docker.sock
- `docker` Python package installed
- Env vars: MOCK_CONTAINER_NAME, BRIDGE_CONTAINER_NAME
Usage:
manager = AnalyzerNetworkManager()
manager.reconcile_orphans() # once, at startup
result = manager.create_analyzer("bc5380", "mindray_bc5380")
# result = {"name": "bc5380", "ip": "10.42.21.10", "network": "mock-analyzer-bc5380"}
manager.remove_analyzer("bc5380")
"""
import functools
import hashlib
import logging
import os
import re
import threading
import time
from typing import Dict, List, Optional
logger = logging.getLogger(__name__)
def _synchronized(method):
"""Serialize a network-manager method on ``self._lock``.
The mock's HTTP server is threaded (api.py ThreadingHTTPServer), so concurrent
``/analyzers`` requests would otherwise interleave Docker network create/connect
and ``_analyzers`` updates. With deterministic per-name allocation, creates of
*different* analyzers no longer contend on a shared counter — the lock is a
backstop that keeps each ensure/remove atomic. RLock so one guarded method may
call another (e.g. cleanup_all → remove_analyzer).
"""
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
with self._lock:
return method(self, *args, **kwargs)
return wrapper
# Subnet allocation: 10.42.{N}.0/24 — a separate range from analyzer-net
# (172.21.1.0/24) to avoid Docker routing conflicts.
ANALYZER_IP_SUFFIX = 10 # Analyzer (mock) gets .10 on each subnet
BRIDGE_IP_SUFFIX = 2 # Bridge gets .2 (.1 is the Docker gateway)
NETWORK_PREFIX = "mock-analyzer-"
# Fixed subnet assignments for the canonical analyzers — stable, human-readable
# IPs. Everything else gets a deterministic hash-derived slot in the dynamic
# range below (also stable per name, just not human-assigned).
FIXED_SUBNETS: Dict[str, int] = {
"genexpert": 20,
"bc5380": 21,
"bs200": 22,
"bs300": 23,
}
DYNAMIC_SUBNET_BASE = 50 # Dynamic (hash-derived) slots live in [BASE, MAX]
DYNAMIC_SUBNET_MAX = 250 # (won't collide with the fixed 20-23 range)
MAX_SUBNET_ATTEMPTS = 30 # Probe budget when a chosen /24 collides with a different network
ATTACH_MAX_RETRIES = 4 # Container-attach is the one step that flakes under Docker churn
ATTACH_RETRY_BACKOFF_S = 0.5
class AnalyzerNetworkManager:
"""Manages Docker networks for mock analyzers (deterministic + idempotent)."""
def __init__(self):
self._docker = None
self._lock = threading.RLock()
# Cache of provisioned analyzers (name → info). Docker is the source of
# truth; this is a fast-path cache, reconciled against Docker on miss.
self._analyzers: Dict[str, dict] = {}
self._mock_container = os.environ.get("MOCK_CONTAINER_NAME", os.environ.get("HOSTNAME", ""))
self._bridge_container = os.environ.get("BRIDGE_CONTAINER_NAME", "")
@property
def docker(self):
"""Lazy-init Docker client."""
if self._docker is None:
try:
import docker
self._docker = docker.from_env()
logger.info("Docker client initialized")
except ImportError:
logger.error("Docker SDK not installed. Run: pip install docker")
raise
except Exception as e:
logger.error("Failed to connect to Docker: %s", e)
raise
return self._docker
# --- Deterministic allocation -------------------------------------------
def _subnet_id_for(self, name: str) -> int:
"""Map an analyzer name → its subnet id (the N in 10.42.N.0/24).
PURE function of the name: same name → same subnet on every run,
independent of creation order, concurrency, or what else is allocated.
Canonical analyzers use FIXED_SUBNETS; everything else gets a stable
hash-derived slot in the dynamic range. Two distinct dynamic names could
in principle hash to the same slot; that rare collision is resolved at
create time by a bounded forward probe (see create_analyzer).
"""
normalized = name.lower()
if normalized in FIXED_SUBNETS:
return FIXED_SUBNETS[normalized]
span = DYNAMIC_SUBNET_MAX - DYNAMIC_SUBNET_BASE + 1
digest = int(hashlib.sha1(normalized.encode("utf-8")).hexdigest(), 16)
return DYNAMIC_SUBNET_BASE + (digest % span)
@staticmethod
def _subnet_id_of(network) -> Optional[int]:
"""The N from a network's 10.42.N.0/24 IPAM subnet, or None if absent."""
for entry in (network.attrs.get("IPAM", {}).get("Config") or []):
match = re.match(r"^10\.42\.(\d+)\.0/24$", entry.get("Subnet", "") or "")
if match:
return int(match.group(1))
return None
def _get_network(self, network_name: str):
"""Return the Docker network by name, or None if it doesn't exist."""
try:
return self.docker.networks.get(network_name)
except Exception:
return None
def _subnet_in_use(self, subnet_id: int) -> bool:
target = f"10.42.{subnet_id}.0/24"
try:
for network in self.docker.networks.list():
# IPAM.Config can be explicitly null (not just missing) when a
# network was created without an address pool — `or []` handles
# that so iteration doesn't blow up with "'NoneType' is not iterable".
config = network.attrs.get("IPAM", {}).get("Config") or []
if any(entry.get("Subnet") == target for entry in config):
return True
except Exception as err:
logger.warning("Failed to inspect existing Docker subnets: %s", err)
return False
# --- Provisioning (convergent / idempotent) -----------------------------
@_synchronized
def create_analyzer(
self,
name: str,
template_name: str,
port: int = 0,
connect_mock: bool = True,
) -> dict:
"""Ensure a Docker network exists for the analyzer and the mock + bridge
are attached, then return its connection info.
Convergent / idempotent: re-running for the same name returns the same
result with no churn. Docker is the source of truth —
- if the network already exists (live or an orphan), its ACTUAL subnet is
adopted (so containers connect at IPs inside the network, never the
"invalid endpoint settings" mismatch);
- a NEW network's subnet is the deterministic `_subnet_id_for(name)`, so
the same analyzer always lands on the same IP across runs.
Args:
name: Unique analyzer name (e.g., "bc5380")
template_name: Mock template to associate (e.g., "mindray_bc5380")
port: Analyzer listen port (informational)
connect_mock: attach the mock container now (api.py defers this to an
async step to avoid tearing down the in-flight HTTP socket)
Returns:
dict with name, ip, network, subnet, template, port
"""
network_name = f"{NETWORK_PREFIX}{name}"
# Fast path: provisioned in this process AND still present in Docker.
cached = self._analyzers.get(name)
if cached is not None and self._get_network(network_name) is not None:
return cached
import docker.types
created_here = False
network = self._get_network(network_name)
subnet_id: Optional[int] = None
if network is not None:
# Adopt the existing network's actual subnet (Docker = truth).
subnet_id = self._subnet_id_of(network)
if subnet_id is None:
logger.warning("Existing %s has no parseable 10.42.x subnet; recreating", network_name)
self._cleanup_network(network_name)
network = None
if network is None:
subnet_id = self._subnet_id_for(name)
for _attempt in range(MAX_SUBNET_ATTEMPTS):
subnet = f"10.42.{subnet_id}.0/24"
try:
ipam = docker.types.IPAMConfig(pool_configs=[docker.types.IPAMPool(subnet=subnet)])
network = self.docker.networks.create(network_name, driver="bridge", ipam=ipam)
created_here = True
logger.info("Created network %s (subnet %s)", network_name, subnet)
break
except Exception as create_err:
msg = str(create_err)
if "overlap" not in msg.lower():
raise
# The deterministic slot collided with a DIFFERENT network —
# probe forward to the next free subnet (bounded, deterministic
# given current Docker state). The analyzer's own network does
# not exist yet (we checked), so this is a genuine cross-name
# collision, not our own.
logger.warning("Subnet %s overlaps another network; probing next free for %s", subnet, name)
subnet_id += 1
while self._subnet_in_use(subnet_id):
subnet_id += 1
continue
if network is None:
raise RuntimeError(
f"Could not allocate a non-overlapping subnet for {name} "
f"after {MAX_SUBNET_ATTEMPTS} attempts"
)
subnet = f"10.42.{subnet_id}.0/24"
analyzer_ip = f"10.42.{subnet_id}.{ANALYZER_IP_SUFFIX}"
bridge_ip = f"10.42.{subnet_id}.{BRIDGE_IP_SUFFIX}"
try:
if connect_mock and self._mock_container:
self._ensure_connected(network, self._mock_container, analyzer_ip, "mock")
if self._bridge_container:
self._ensure_connected(network, self._bridge_container, bridge_ip, "bridge")
except Exception as e:
logger.error("Failed to wire up analyzer network %s: %s", name, e)
# Roll back ONLY a network we created in this call — never one we merely
# adopted (that could be a live/seeded network).
if created_here:
try:
self._cleanup_network(network_name)
except Exception:
pass
raise
result = {
"name": name,
"ip": analyzer_ip,
"network": network_name,
"subnet": subnet,
"template": template_name,
"port": port,
}
self._analyzers[name] = result
return result
def _ensure_connected(self, network, container: str, ip: str, label: str):
"""Idempotently attach `container` to `network` at `ip`.
No-op when already connected at `ip`; reconnects at `ip` if attached at a
different address (e.g. the network was recreated). The connect is the one
step that flakes transiently under Docker network churn (the historical
`ip=missing` provisioning failure), so it is retried with backoff; only a
persistent failure is raised so the caller can roll back.
"""
last_err = None
for attempt in range(1, ATTACH_MAX_RETRIES + 1):
try:
network.connect(container, ipv4_address=ip)
logger.info("Connected %s (%s) to %s at %s", label, container, network.name, ip)
return
except Exception as err:
if "already" in str(err).lower():
last_err = None
break # attached — fall through to reconcile/confirm
last_err = err
logger.warning("Attach %s (%s) to %s failed (attempt %d/%d): %s",
label, container, network.name, attempt, ATTACH_MAX_RETRIES, err)
if attempt < ATTACH_MAX_RETRIES:
time.sleep(ATTACH_RETRY_BACKOFF_S * attempt)
try:
network.reload() # refresh Docker state before retrying
except Exception:
pass
if last_err is not None:
raise last_err
# Already attached — confirm the IP, reconcile if it drifted. Best-effort.
try:
network.reload()
for cid, info in (network.attrs.get("Containers") or {}).items():
if info.get("Name") == container or cid == container:
current = (info.get("IPv4Address") or "").split("/")[0]
if current and current != ip:
logger.warning("%s on %s at %s, expected %s — reconnecting",
label, network.name, current, ip)
network.disconnect(container, force=True)
network.connect(container, ipv4_address=ip)
else:
logger.info("%s already connected to %s at %s", label, network.name, ip)
return
except Exception as err:
logger.warning("Could not verify %s attachment on %s: %s", label, network.name, err)
@_synchronized
def connect_mock_to_analyzer(self, name: str) -> bool:
"""Attach the running mock container to an existing analyzer network
(api.py's deferred/async mock attach). Idempotent."""
info = self._analyzers.get(name)
if not info or not self._mock_container:
return False
network = self._get_network(info["network"])
if network is None:
logger.warning("connect_mock_to_analyzer: network %s not found for %s", info["network"], name)
return False
try:
self._ensure_connected(network, self._mock_container, info["ip"], "mock")
return True
except Exception as conn_err:
logger.warning("Failed to connect mock to %s: %s", info["network"], conn_err)
return False
@_synchronized
def remove_analyzer(self, name: str) -> bool:
"""Remove an analyzer's Docker network (cached or orphaned). Idempotent —
returns True if the network is gone afterwards."""
self._analyzers.pop(name, None)
return self._cleanup_network(f"{NETWORK_PREFIX}{name}")
def list_analyzers(self) -> List[dict]:
"""List all active mock analyzers (from the in-process cache)."""
return list(self._analyzers.values())
def get_analyzer(self, name: str) -> Optional[dict]:
"""Get a specific analyzer's info."""
return self._analyzers.get(name)
@_synchronized
def cleanup_all(self):
"""Remove all analyzer networks this process created. Called on shutdown."""
for name in list(self._analyzers.keys()):
self.remove_analyzer(name)
logger.info("Cleaned up all analyzer networks")
@_synchronized
def reconcile_orphans(self) -> int:
"""Drain orphaned analyzer networks at startup, converging to a clean
baseline. An orphan is a ``mock-analyzer-*`` network with NO containers
attached — a leftover from a crashed/killed prior run. Networks WITH
containers are live (e.g. seeded analyzers) and are kept. Returns the
number removed. Safe to call once at boot (before any provisioning).
"""
removed = 0
try:
for network in self.docker.networks.list():
nm = getattr(network, "name", "") or network.attrs.get("Name", "")
if not nm.startswith(NETWORK_PREFIX):
continue
if network.attrs.get("Containers"):
continue # live — keep
if self._cleanup_network(nm):
removed += 1
except Exception as e:
logger.warning("Orphan reconcile failed: %s", e)
if removed:
logger.info("Reconcile removed %d orphaned analyzer network(s)", removed)
return removed
def _cleanup_network(self, network_name: str) -> bool:
"""Disconnect all containers and remove a network. Returns True if the
network is absent afterwards (removed now, or already gone)."""
try:
network = self.docker.networks.get(network_name)
except Exception:
return True # already absent
try:
for container in (network.attrs.get("Containers") or {}).values():
try:
network.disconnect(container["Name"], force=True)
except Exception:
pass
network.remove()
logger.info("Removed network %s", network_name)
return True
except Exception as e:
logger.warning("Failed to cleanup network %s: %s", network_name, e)
return False