|
22 | 22 |
|
23 | 23 | import argparse |
24 | 24 | import base64 |
| 25 | +import concurrent.futures |
25 | 26 | import copy |
26 | 27 | import json |
27 | 28 | import os |
@@ -146,6 +147,13 @@ def load_copy(measure_id, copy_index): |
146 | 147 | with open(path) as f: |
147 | 148 | bundle = json.load(f) |
148 | 149 | bundle = make_copy(bundle, copy_index) |
| 150 | + for attempt in range(3): |
| 151 | + try: |
| 152 | + http("POST", "/fhir", bundle, timeout=180, as_json=False) |
| 153 | + return len(bundle.get("entry", [])) |
| 154 | + except urllib.error.HTTPError: |
| 155 | + if attempt < 2: |
| 156 | + time.sleep(0.5) |
149 | 157 | http("POST", "/fhir", bundle, timeout=180, as_json=False) |
150 | 158 | return len(bundle.get("entry", [])) |
151 | 159 |
|
@@ -275,25 +283,57 @@ def main(): |
275 | 283 | print(f" Measures: {', '.join(measures)}") |
276 | 284 | print() |
277 | 285 |
|
278 | | - # Phase 1 — load multiplied clinical data |
| 286 | + # Phase 1 — load multiplied clinical data (parallel across measures) |
279 | 287 | print("[1/2] Loading multiplied clinical data...") |
280 | 288 | total = 0 |
281 | 289 | t0 = time.time() |
282 | | - for m in measures: |
283 | | - per_measure = 0 |
| 290 | + results = {} |
| 291 | + |
| 292 | + def load_measure_copies(m): |
| 293 | + n = 0 |
284 | 294 | for i in range(1, args.multiplier + 1): |
285 | 295 | try: |
286 | | - per_measure += load_copy(m, i) |
287 | | - except urllib.error.HTTPError as e: |
288 | | - print(f" FAIL {m} copy {i}: HTTP {e.code}") |
289 | | - break |
| 296 | + n += load_copy(m, i) |
290 | 297 | except Exception as e: |
291 | 298 | print(f" FAIL {m} copy {i}: {str(e)[:80]}") |
292 | 299 | break |
293 | | - print(f" {m}: {per_measure} resources across {args.multiplier} copies") |
294 | | - total += per_measure |
| 300 | + return m, n |
| 301 | + |
| 302 | + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as ex: |
| 303 | + for m, n in ex.map(load_measure_copies, measures): |
| 304 | + results[m] = n |
| 305 | + total += n |
| 306 | + print(f" {m}: {n} resources across {args.multiplier} copies") |
295 | 307 | print(f" Total: {total} resources in {time.time() - t0:.1f}s") |
296 | 308 | print(f" Patient count now: {patient_count()}") |
| 309 | + |
| 310 | + # Verify: count resources per type in DB vs expected |
| 311 | + print(" Verifying resource counts...") |
| 312 | + by_type = {} |
| 313 | + for m in measures: |
| 314 | + path = os.path.join(DATA_DIR, f"{m}-clinical-data.json") |
| 315 | + if not os.path.exists(path): |
| 316 | + continue |
| 317 | + with open(path) as f: |
| 318 | + bundle = json.load(f) |
| 319 | + for entry in bundle.get("entry", []): |
| 320 | + r = entry.get("resource", {}) |
| 321 | + rt = r.get("resourceType") |
| 322 | + rid = r.get("id") |
| 323 | + if rt and rid: |
| 324 | + by_type.setdefault(rt, set()).add(rid) |
| 325 | + ok = True |
| 326 | + for rt, ids in sorted(by_type.items()): |
| 327 | + expected = len(ids) * (1 + args.multiplier) |
| 328 | + try: |
| 329 | + actual = http("POST", "/$sql", [f"SELECT count(*) AS n FROM {rt.lower()}"])[0]["n"] |
| 330 | + except Exception: |
| 331 | + actual = 0 |
| 332 | + if actual < expected: |
| 333 | + print(f" WARN {rt}: expected {expected}, got {actual} (missing {expected - actual})") |
| 334 | + ok = False |
| 335 | + if ok: |
| 336 | + print(" OK — all resource counts match") |
297 | 337 | print() |
298 | 338 |
|
299 | 339 | # Phase 2 — measure timings |
|
0 commit comments