|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""Refresh sof.*_flat materialized tables + re-apply indexes + ANALYZE. |
| 3 | +
|
| 4 | +For production setups where Patient/Encounter/Condition data changes over time |
| 5 | +(bulk imports, new clinical events). $materialize does a full rebuild — there |
| 6 | +is no incremental refresh path today. Schedule this script from cron, or run |
| 7 | +it on demand after a bulk load. |
| 8 | +
|
| 9 | +What it does (idempotent, safe to re-run): |
| 10 | + 1. DROP VIEW IF EXISTS for each wrapper view CASCADE. The $materialize |
| 11 | + operation drops sof.X then recreates it; PostgreSQL refuses to drop a |
| 12 | + table while a wrapper view depends on it, so we drop wrappers first. |
| 13 | + 2. POST /fhir/ViewDefinition/{id}/$materialize for each of 9 ViewDefinitions |
| 14 | + 3. Re-run measures/shared/sql/01-wrapper-views.sql (recreate wrappers) |
| 15 | + 4. Re-run measures/shared/sql/03-sof-indexes.sql (CREATE INDEX IF NOT EXISTS |
| 16 | + plus ANALYZE — indexes are dropped on each $materialize because DROP TABLE) |
| 17 | + 5. Log per-table row counts and timings |
| 18 | +
|
| 19 | +Exits non-zero if any $materialize call fails. |
| 20 | +
|
| 21 | +Usage: |
| 22 | + python3 scripts/refresh_sof.py |
| 23 | + python3 scripts/refresh_sof.py --base-url http://localhost:9999 |
| 24 | + python3 scripts/refresh_sof.py --vds patient-flat encounter-flat |
| 25 | +
|
| 26 | +Cron example (every 4 hours): |
| 27 | + 0 */4 * * * /usr/bin/python3 /opt/measure-evaluate/tools/refresh_sof.py \\ |
| 28 | + --base-url http://aidbox:8888 --user $AIDBOX_USER --password $AIDBOX_PASS \\ |
| 29 | + >> /var/log/sof-refresh.log 2>&1 |
| 30 | +""" |
| 31 | +from __future__ import annotations |
| 32 | +import argparse |
| 33 | +import base64 |
| 34 | +import json |
| 35 | +import os |
| 36 | +import sys |
| 37 | +import time |
| 38 | +import urllib.error |
| 39 | +import urllib.request |
| 40 | + |
| 41 | +REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| 42 | + |
| 43 | +VD_IDS = [ |
| 44 | + "patient-flat", "encounter-flat", "condition-flat", "procedure-flat", |
| 45 | + "observation-flat", "observation-bp-flat", |
| 46 | + "servicerequest-flat", "medicationrequest-flat", "devicerequest-flat", |
| 47 | +] |
| 48 | + |
| 49 | +WRAPPER_VIEWS_SQL = os.path.join(REPO_ROOT, "sql", "01-wrapper-views.sql") |
| 50 | +SOF_INDEXES_SQL = os.path.join(REPO_ROOT, "sql", "03-sof-indexes.sql") |
| 51 | + |
| 52 | +# Wrapper view names that depend on sof.*_flat tables — must be dropped before |
| 53 | +# $materialize re-creates the underlying table. Recreated from 01-wrapper-views.sql |
| 54 | +# after $materialize completes. |
| 55 | +WRAPPER_VIEWS = [ |
| 56 | + "patient_flat", "encounter_flat", "condition_flat", "procedure_flat", |
| 57 | + "observation_flat", "observation_bp_flat", |
| 58 | + "servicerequest_flat", "medicationrequest_flat", "devicerequest_flat", |
| 59 | +] |
| 60 | + |
| 61 | + |
| 62 | +def auth_header(user: str, password: str) -> str: |
| 63 | + return "Basic " + base64.b64encode(f"{user}:{password}".encode()).decode() |
| 64 | + |
| 65 | + |
| 66 | +def materialize(vd_id: str, base_url: str, auth: str) -> tuple[bool, str]: |
| 67 | + body = json.dumps({ |
| 68 | + "resourceType": "Parameters", |
| 69 | + "parameter": [{"name": "type", "valueCode": "table"}], |
| 70 | + }).encode() |
| 71 | + req = urllib.request.Request( |
| 72 | + f"{base_url}/fhir/ViewDefinition/{vd_id}/$materialize", |
| 73 | + method="POST", data=body, |
| 74 | + ) |
| 75 | + req.add_header("Authorization", auth) |
| 76 | + req.add_header("Content-Type", "application/json") |
| 77 | + try: |
| 78 | + urllib.request.urlopen(req, timeout=600) |
| 79 | + return True, "" |
| 80 | + except urllib.error.HTTPError as e: |
| 81 | + return False, f"HTTP {e.code}: {e.read()[:200].decode(errors='replace')}" |
| 82 | + except Exception as e: |
| 83 | + return False, str(e)[:200] |
| 84 | + |
| 85 | + |
| 86 | +def run_sql(sql: str, base_url: str, auth: str, timeout: int = 60): |
| 87 | + req = urllib.request.Request( |
| 88 | + f"{base_url}/$sql", method="POST", |
| 89 | + data=json.dumps([sql]).encode(), |
| 90 | + ) |
| 91 | + req.add_header("Authorization", auth) |
| 92 | + req.add_header("Content-Type", "application/json") |
| 93 | + with urllib.request.urlopen(req, timeout=timeout) as resp: |
| 94 | + body = resp.read().decode() |
| 95 | + return json.loads(body) if body.strip() else [] |
| 96 | + |
| 97 | + |
| 98 | +def table_count(vd_id: str, base_url: str, auth: str) -> int | None: |
| 99 | + table = vd_id.replace("-", "_") # 'patient-flat' → 'patient_flat' |
| 100 | + try: |
| 101 | + r = run_sql(f"SELECT COUNT(*) AS c FROM sof.{table}", base_url, auth) |
| 102 | + return r[0]["c"] if r else None |
| 103 | + except Exception: |
| 104 | + return None |
| 105 | + |
| 106 | + |
| 107 | +def apply_sql_file(path: str, base_url: str, auth: str) -> int: |
| 108 | + """Apply a SQL file as a single statement. Returns count of warnings.""" |
| 109 | + with open(path) as fh: |
| 110 | + text = fh.read() |
| 111 | + text = "SET LOCAL lock_timeout = '60s';\n" + text |
| 112 | + try: |
| 113 | + run_sql(text, base_url, auth, timeout=300) |
| 114 | + return 0 |
| 115 | + except Exception as e: |
| 116 | + print(f" WARN: {os.path.basename(path)}: {str(e)[:120]}") |
| 117 | + return 1 |
| 118 | + |
| 119 | + |
| 120 | +def drop_wrapper_views(base_url: str, auth: str) -> None: |
| 121 | + """Drop wrapper views CASCADE so $materialize can DROP TABLE underneath.""" |
| 122 | + for view in WRAPPER_VIEWS: |
| 123 | + try: |
| 124 | + run_sql(f"DROP VIEW IF EXISTS {view} CASCADE", base_url, auth, timeout=30) |
| 125 | + except Exception as e: |
| 126 | + print(f" WARN: DROP VIEW {view} — {str(e)[:80]}") |
| 127 | + |
| 128 | + |
| 129 | +def main(): |
| 130 | + ap = argparse.ArgumentParser(description=__doc__) |
| 131 | + ap.add_argument("--base-url", default=os.environ.get("AIDBOX_URL", "http://localhost:8888")) |
| 132 | + ap.add_argument("--user", default=os.environ.get("AIDBOX_USER", "root")) |
| 133 | + ap.add_argument("--password", default=os.environ.get("AIDBOX_PASS", "secret")) |
| 134 | + ap.add_argument("--vds", nargs="+", default=VD_IDS, |
| 135 | + help="ViewDefinition IDs to materialize (default: all 9)") |
| 136 | + args = ap.parse_args() |
| 137 | + |
| 138 | + auth = auth_header(args.user, args.password) |
| 139 | + print(f"[refresh_sof] target: {args.base_url}") |
| 140 | + print(f"[refresh_sof] VDs: {len(args.vds)}") |
| 141 | + |
| 142 | + t0 = time.time() |
| 143 | + |
| 144 | + # Step 1: drop wrapper views (CASCADE) — otherwise $materialize cannot DROP TABLE |
| 145 | + print(f"\n[refresh_sof] Dropping wrapper views (CASCADE) ...") |
| 146 | + drop_wrapper_views(args.base_url, auth) |
| 147 | + |
| 148 | + # Step 2: $materialize each VD |
| 149 | + print(f"\n[refresh_sof] Materializing {len(args.vds)} ViewDefinitions ...") |
| 150 | + failed = [] |
| 151 | + for vd_id in args.vds: |
| 152 | + t_vd = time.time() |
| 153 | + ok, err = materialize(vd_id, args.base_url, auth) |
| 154 | + dt = time.time() - t_vd |
| 155 | + cnt = table_count(vd_id, args.base_url, auth) if ok else None |
| 156 | + cnt_str = f" ({cnt:,} rows)" if cnt is not None else "" |
| 157 | + status = "OK" if ok else "FAIL" |
| 158 | + print(f" {status:4s} {vd_id:25s} {dt:>6.2f}s{cnt_str}" + (f" — {err}" if not ok else "")) |
| 159 | + if not ok: |
| 160 | + failed.append(vd_id) |
| 161 | + |
| 162 | + # Step 3: recreate wrapper views |
| 163 | + print(f"\n[refresh_sof] Re-applying wrapper views from 01-wrapper-views.sql ...") |
| 164 | + t_wrap = time.time() |
| 165 | + warnings_w = apply_sql_file(WRAPPER_VIEWS_SQL, args.base_url, auth) |
| 166 | + print(f" Done in {time.time()-t_wrap:.2f}s ({warnings_w} warnings)") |
| 167 | + |
| 168 | + # Step 4: re-apply indexes + ANALYZE |
| 169 | + print(f"\n[refresh_sof] Re-applying indexes + ANALYZE from 03-sof-indexes.sql ...") |
| 170 | + t_idx = time.time() |
| 171 | + warnings_i = apply_sql_file(SOF_INDEXES_SQL, args.base_url, auth) |
| 172 | + print(f" Done in {time.time()-t_idx:.2f}s ({warnings_i} warnings)") |
| 173 | + |
| 174 | + elapsed = time.time() - t0 |
| 175 | + print(f"\n[refresh_sof] Total: {elapsed:.1f}s, {len(args.vds) - len(failed)}/{len(args.vds)} materialized") |
| 176 | + if failed: |
| 177 | + print(f"[refresh_sof] FAILED: {', '.join(failed)}") |
| 178 | + sys.exit(1) |
| 179 | + sys.exit(0) |
| 180 | + |
| 181 | + |
| 182 | +if __name__ == "__main__": |
| 183 | + main() |
0 commit comments