diff --git a/.github/workflows/offline-bundle.yml b/.github/workflows/offline-bundle.yml new file mode 100644 index 00000000..142fb2fa --- /dev/null +++ b/.github/workflows/offline-bundle.yml @@ -0,0 +1,83 @@ +name: Offline Docker Bundle + +on: + workflow_dispatch: + inputs: + ref: + description: 'Git ref to package (branch, tag, SHA)' + required: true + default: 'timescale-sync' + + release: + types: [published] + + push: + branches: [ timescale-sync, main, dev ] + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + package-offline: + runs-on: ubuntu-latest + permissions: + contents: read + packages: read + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + ref: ${{ github.event.inputs.ref || github.ref }} + + - name: Log in to Container Registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Build cloud-sync image + run: | + docker compose -f universal-telemetry-software/deploy/docker-compose.macbook-base.yml build cloud-sync + + - name: Pull all required images + run: | + docker pull ghcr.io/western-formula-racing/daq-radio/universal-telemetry:latest + docker pull ghcr.io/western-formula-racing/daq-radio/pecan:latest + docker pull timescale/timescaledb:latest-pg16 + docker pull redis:8.2 + docker pull bluenviron/mediamtx:latest + docker pull grafana/grafana:latest + + - name: Save images as tarball + run: | + cd universal-telemetry-software/deploy + docker save \ + ghcr.io/western-formula-racing/daq-radio/universal-telemetry:latest \ + ghcr.io/western-formula-racing/daq-radio/pecan:latest \ + timescale/timescaledb:latest-pg16 \ + redis:8.2 \ + bluenviron/mediamtx:latest \ + deploy-cloud-sync:latest \ + grafana/grafana:latest \ + -o offline/wfr-docker-images.tar + + - name: Upload as workflow artifact + uses: actions/upload-artifact@v4 + with: + name: wfr-docker-images-offline + path: universal-telemetry-software/deploy/offline/wfr-docker-images.tar + retention-days: 30 + + - name: Upload to Release + if: github.event_name == 'release' + uses: softprops/action-gh-release@v1 + with: + files: universal-telemetry-software/deploy/offline/wfr-docker-images.tar + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/pecan/src/pages/Trace.tsx b/pecan/src/pages/Trace.tsx index 80d9ee85..55f0e36c 100644 --- a/pecan/src/pages/Trace.tsx +++ b/pecan/src/pages/Trace.tsx @@ -458,7 +458,6 @@ function Trace() { const handleClear = useCallback(() => { clearTrace(); - frozenRef.current = []; }, [clearTrace]); // Handle Easter Egg Trigger diff --git a/server/installer/sandbox/Dockerfile b/server/installer/sandbox/Dockerfile index 5cfb326a..810f6ce6 100644 --- a/server/installer/sandbox/Dockerfile +++ b/server/installer/sandbox/Dockerfile @@ -12,7 +12,9 @@ RUN python -c "from langchain_community.embeddings import FastEmbedEmbeddings; F # Copy application code COPY code_generator.py . COPY stats_report.py . -COPY prompt-guide.txt prompt-guide.txt.example ./ +COPY prompt-guide.txt.example ./ +# prompt-guide.txt is optional (local customization); fall back to example if absent +RUN if [ ! -f prompt-guide.txt ]; then cp prompt-guide.txt.example prompt-guide.txt; fi # Create directories for generated code, ChromaDB, and cache RUN mkdir -p /app/generated /app/chroma_db /app/cache diff --git a/universal-telemetry-software/checklist/SETUP_CARD.tex b/universal-telemetry-software/checklist/SETUP_CARD.tex index e46b3cb7..6462b05c 100644 --- a/universal-telemetry-software/checklist/SETUP_CARD.tex +++ b/universal-telemetry-software/checklist/SETUP_CARD.tex @@ -545,7 +545,7 @@ \section*{9 -- TIMESCALE LOGGING (MacBook Only)} TimescaleDB runs on MacBook's \cmd{macbook-base} stack only. \\ RPi base (\cmd{rpi-base}) does NOT write to DB -- it relays to MacBook via UDP/TCP. \\[1mm] Connect with \cmd{psql}: \cmd{psql postgresql://wfr:wfr\_password@localhost:5432/wfr} \\ - Check tables: \cmd{\textbackslash dt} \quad Check data: \cmd{SELECT COUNT(*) FROM wfr26test\_base;} \\ + Check tables: \cmd{\textbackslash dt} \quad Check data: \cmd{SELECT COUNT(*) FROM wfr26base;} \\ Grafana: \cmd{http://localhost:8087} \quad user: \cmd{admin} \quad pass: \cmd{admin} (or as set in \cmd{.env}) \\ \end{tabular} @@ -555,7 +555,7 @@ \section*{9 -- TIMESCALE LOGGING (MacBook Only)} \noindent \begin{tabular}{p{126mm}} \cmd{docker logs daq-telemetry $\vert$ grep "table="} \\[1mm] - \textbf{PASS:} \cmd{table=WFR26test\_base} (or as set by \cmd{TIMESCALE\_TABLE}) \\ + \textbf{PASS:} \cmd{table=wfr26base} (or as set by \cmd{TIMESCALE\_TABLE}) \\ \textbf{FAIL:} \cmd{ENABLE\_TIMESCALE\_LOGGING=true} not set, or TimescaleDB not reachable \\ \textit{Schema:} wide format -- one row per CAN message, each decoded signal as a column. \\ \textit{Table is created automatically on first write.} \\ @@ -635,7 +635,7 @@ \section*{10 -- TIMESYNCALEDB BRIDGE (Base Only)} It reads decoded CAN frames from Redis and writes them directly to the server stack's \\ TimescaleDB over the network (writes to \cmd{POSTGRES\_DSN}). No local TimescaleDB. \\[1mm] Wide format: one row per CAN message, all signals as columns. \\ - Table: \cmd{WFR26test\_base} (MacBook) or as set by \cmd{TIMESCALE\_TABLE}. \\ + Table: \cmd{wfr26base} (MacBook) or as set by \cmd{TIMESCALE\_TABLE}. \\ \end{tabular} \vspace{1mm} @@ -710,6 +710,7 @@ \section*{14 -- KEY PORTS} 5006 & TCP & Packet retransmission \\ 5432 & TCP & TimescaleDB (MacBook only) \\ 6379 & TCP & Redis \\ + 8092 & HTTP & Cloud Sync dashboard (MacBook only) \\ 8080 & HTTP & Status monitoring page \\ 8087 & HTTP & Grafana dashboards (MacBook) \\ 9080 & WebSocket & PECAN telemetry feed (plain WS) \\ @@ -741,7 +742,7 @@ \section*{15 -- ENV VAR QUICKREF} ENABLE\_VIDEO & false & Video streaming \\ ENABLE\_AUDIO & false & Audio streaming \\ ENABLE\_TIMESCALE\_LOGGING & false & Log telemetry to TimescaleDB (MacBook only) \\ - TIMESCALE\_TABLE & WFR26test\_base & Table name for telemetry writes \\ + TIMESCALE\_TABLE & wfr26base & Table name for telemetry writes \\ POSTGRES\_DSN & (local) & TimescaleDB connection string (MacBook: local; RPi: points to MacBook) \\ ENABLE\_WS\_RELAY & false & Start WS relay (ws\_relay.py) on port 9089 \\ RELAY\_UPSTREAM\_WS & ws://127.0.0.1:9080 & Upstream WS URL for relay \\ diff --git a/universal-telemetry-software/cloud-sync/Dockerfile b/universal-telemetry-software/cloud-sync/Dockerfile new file mode 100644 index 00000000..bdba8d9e --- /dev/null +++ b/universal-telemetry-software/cloud-sync/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.11-slim + +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 + +WORKDIR /app + +COPY requirements.txt /tmp/requirements.txt +RUN pip install --no-cache-dir -r /tmp/requirements.txt + +COPY config.py sync.py app.py ./ +COPY static/ ./static/ + +EXPOSE 8092 + +HEALTHCHECK --interval=30s --timeout=10s --start-period=15s --retries=3 \ + CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8092/api/status')" + +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8092"] diff --git a/universal-telemetry-software/cloud-sync/app.py b/universal-telemetry-software/cloud-sync/app.py new file mode 100644 index 00000000..6d3e7f9e --- /dev/null +++ b/universal-telemetry-software/cloud-sync/app.py @@ -0,0 +1,215 @@ +import logging +import time +from datetime import datetime, timezone + +from fastapi import BackgroundTasks, FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import RedirectResponse +from fastapi.staticfiles import StaticFiles +from pydantic import BaseModel + +import config +from sync import SyncEngine + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s", +) +logger = logging.getLogger("cloud-sync") + +app = FastAPI(title="WFR Cloud Sync") +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) +app.mount("/static", StaticFiles(directory="static"), name="static") + +engine = SyncEngine() + +# ── Sync state (module-level, single process) ───────────────────────────────── + +_sync_state: dict = { + "running": False, + "rows_done": 0, + "rows_total": 0, + "last_sync_iso": None, # ISO timestamp of last completed sync + "last_sync_rows": None, # row count of last completed sync + "last_sync_elapsed": None, # seconds + "last_error": None, + # cached from last status call so /api/status is fast + "_cloud_cursor": None, + "_unsynced_count": None, + "_unsynced_ts": 0.0, # monotonic time of last unsynced_count fetch +} + +_UNSYNCED_CACHE_TTL = 30.0 # seconds + + +@app.get("/") +def root(): + return RedirectResponse(url="/static/index.html") + + +@app.get("/api/status") +def status(): + # Local count — always fresh (fast local query) + try: + local_count = engine.get_local_count() + except Exception as e: + local_count = None + logger.warning(f"get_local_count failed: {e}") + + # Unsynced count — cached with TTL to avoid hammering cloud on every poll + now = time.monotonic() + if now - _sync_state["_unsynced_ts"] > _UNSYNCED_CACHE_TTL and not _sync_state["running"]: + try: + cursor = engine.get_cloud_cursor() + _sync_state["_cloud_cursor"] = cursor.isoformat() if cursor else None + _sync_state["_unsynced_count"] = engine.get_unsynced_count(cursor) + _sync_state["_unsynced_ts"] = now + except Exception as e: + logger.warning(f"unsynced_count fetch failed: {e}") + + cloud_configured = bool(config.CLOUD_POSTGRES_DSN) + + return { + "local_count": local_count, + "local_table": config.LOCAL_TABLE, + "cloud_table": engine.cloud_table, + "cloud_configured": cloud_configured, + "cloud_cursor": _sync_state["_cloud_cursor"], + "unsynced_count": _sync_state["_unsynced_count"], + "last_sync_iso": _sync_state["last_sync_iso"], + "last_sync_rows": _sync_state["last_sync_rows"], + "last_sync_elapsed": _sync_state["last_sync_elapsed"], + "last_error": _sync_state["last_error"], + "sync_running": _sync_state["running"], + } + + +@app.post("/api/check-cloud") +def check_cloud(): + result = engine.check_cloud_connection() + return result + + +@app.post("/api/sync") +def trigger_sync(background_tasks: BackgroundTasks): + if _sync_state["running"]: + raise HTTPException(status_code=409, detail="Sync already in progress") + + if not config.CLOUD_POSTGRES_DSN: + raise HTTPException(status_code=400, detail="CLOUD_POSTGRES_DSN not configured") + + _sync_state["running"] = True + _sync_state["rows_done"] = 0 + _sync_state["rows_total"] = 0 + _sync_state["last_error"] = None + + background_tasks.add_task(_run_sync) + return {"status": "started"} + + +@app.get("/api/sync-status") +def sync_status(): + return { + "running": _sync_state["running"], + "rows_done": _sync_state["rows_done"], + "rows_total": _sync_state["rows_total"], + "last_sync_iso": _sync_state["last_sync_iso"], + "last_sync_rows": _sync_state["last_sync_rows"], + "last_sync_elapsed": _sync_state["last_sync_elapsed"], + "last_error": _sync_state["last_error"], + } + + +def _progress_cb(rows_done: int, rows_total: int) -> None: + _sync_state["rows_done"] = rows_done + _sync_state["rows_total"] = rows_total + + +class SelectTablePayload(BaseModel): + table: str + + +class CreateTablePayload(BaseModel): + table: str + + +@app.get("/api/local-tables") +def list_local_tables(): + """List existing local tables (tables matching ^wfr[0-9] on the local DB).""" + tables = engine.list_local_tables() + return {"tables": tables, "current": engine.local_table} + + +@app.post("/api/select-local-table") +def select_local_table(payload: SelectTablePayload): + """Switch the active local source table for the next sync.""" + if _sync_state["running"]: + raise HTTPException(status_code=409, detail="Cannot change table while sync is running") + name = payload.table.lower().strip() + if not name: + raise HTTPException(status_code=400, detail="Table name is required") + engine.local_table = name + # Invalidate unsynced cache + _sync_state["_unsynced_ts"] = 0.0 + _sync_state["_unsynced_count"] = None + _sync_state["_cloud_cursor"] = None + return {"selected": name} + + +@app.get("/api/cloud-tables") +def list_cloud_tables(): + """List existing cloud tables (tables matching ^wfr[0-9] on the cloud DB).""" + tables = engine.list_cloud_tables() + return {"tables": tables, "current": engine.cloud_table} + + +@app.post("/api/cloud-tables") +def create_cloud_table(payload: CreateTablePayload): + """Create a new cloud hypertable.""" + name = payload.table.lower().strip() + if not name: + raise HTTPException(status_code=400, detail="Table name is required") + try: + engine.create_cloud_table(name) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + # Invalidate unsynced cache + _sync_state["_unsynced_ts"] = 0.0 + return {"created": name} + + +@app.post("/api/select-table") +def select_table(payload: SelectTablePayload): + """Switch the active cloud table for the next sync.""" + if _sync_state["running"]: + raise HTTPException(status_code=409, detail="Cannot change table while sync is running") + name = payload.table.lower().strip() + if not name: + raise HTTPException(status_code=400, detail="Table name is required") + engine.cloud_table = name + # Invalidate unsynced cache so next /api/status recalculates + _sync_state["_unsynced_ts"] = 0.0 + _sync_state["_unsynced_count"] = None + _sync_state["_cloud_cursor"] = None + return {"selected": name} + + +def _run_sync() -> None: + try: + result = engine.sync(progress_cb=_progress_cb) + _sync_state["last_sync_iso"] = datetime.now(timezone.utc).isoformat() + _sync_state["last_sync_rows"] = result["rows_synced"] + _sync_state["last_sync_elapsed"] = result["elapsed_s"] + _sync_state["last_error"] = None + # Invalidate unsynced cache + _sync_state["_unsynced_ts"] = 0.0 + except Exception as e: + logger.error(f"Sync failed: {e}") + _sync_state["last_error"] = str(e) + finally: + _sync_state["running"] = False diff --git a/universal-telemetry-software/cloud-sync/config.py b/universal-telemetry-software/cloud-sync/config.py new file mode 100644 index 00000000..33240ef5 --- /dev/null +++ b/universal-telemetry-software/cloud-sync/config.py @@ -0,0 +1,12 @@ +import os + +LOCAL_POSTGRES_DSN = os.getenv( + "LOCAL_POSTGRES_DSN", + "postgresql://wfr:wfr_password@timescaledb:5432/wfr", +) +LOCAL_TABLE = os.getenv("LOCAL_TABLE", "wfr26base").lower() + +CLOUD_POSTGRES_DSN = os.getenv("CLOUD_POSTGRES_DSN", "") +CLOUD_TABLE = os.getenv("CLOUD_TABLE", "wfr26").lower() + +SYNC_BATCH_SIZE = int(os.getenv("SYNC_BATCH_SIZE", "5000")) diff --git a/universal-telemetry-software/cloud-sync/requirements.txt b/universal-telemetry-software/cloud-sync/requirements.txt new file mode 100644 index 00000000..3eb9b775 --- /dev/null +++ b/universal-telemetry-software/cloud-sync/requirements.txt @@ -0,0 +1,3 @@ +fastapi==0.115.4 +uvicorn[standard]==0.32.1 +psycopg2-binary>=2.9.9 diff --git a/universal-telemetry-software/cloud-sync/static/index.html b/universal-telemetry-software/cloud-sync/static/index.html new file mode 100644 index 00000000..3ce169a0 --- /dev/null +++ b/universal-telemetry-software/cloud-sync/static/index.html @@ -0,0 +1,656 @@ + + + + + + WFR Cloud Sync + + + + +
+
+

WFR Cloud Sync

+
local → cloud
+
+
+ + + +
+
+
Local Rows
+
+
+
+
+
Cloud Status
+
+ Unknown +
+
+
+
+
Last Sync
+
Never
+
+
+
+ +
+ + +
+ + +
+ + + +
+ +
+ + +
+ +
+
Syncing…
+
+
+
+
+ +
+
Activity Log
+
+
No activity yet.
+
+
+ + + + diff --git a/universal-telemetry-software/cloud-sync/sync.py b/universal-telemetry-software/cloud-sync/sync.py new file mode 100644 index 00000000..f66ada5a --- /dev/null +++ b/universal-telemetry-software/cloud-sync/sync.py @@ -0,0 +1,333 @@ +""" +SyncEngine — copy rows from local TimescaleDB to cloud TimescaleDB. + +Sync cursor is stateless: SELECT MAX(time) FROM cloud_table. +Write pattern mirrors timescale_bridge.py and file-uploader/helper.py: + - ON CONFLICT (time, message_name) DO UPDATE (idempotent) + - ALTER TABLE ADD COLUMN IF NOT EXISTS for dynamic signal columns + - Named psycopg2 cursor for server-side iteration (avoids loading all rows into RAM) +""" + +import logging +import time +from datetime import datetime +from typing import Callable, Optional, Set + +import psycopg2 +import psycopg2.extras + +import config + +logger = logging.getLogger("SyncEngine") + +# Fixed columns that are always present — not treated as signal columns +_FIXED_COLS = {"time", "message_name", "can_id"} + + +class SyncEngine: + def __init__(self): + self.local_dsn = config.LOCAL_POSTGRES_DSN + self.local_table = config.LOCAL_TABLE + self.cloud_dsn = config.CLOUD_POSTGRES_DSN + self.cloud_table = config.CLOUD_TABLE + self.batch_size = config.SYNC_BATCH_SIZE + + # ── Helpers ────────────────────────────────────────────────────────────── + + def _local_conn(self): + return psycopg2.connect(self.local_dsn) + + def _cloud_conn(self): + if not self.cloud_dsn: + raise ValueError("CLOUD_POSTGRES_DSN is not configured") + return psycopg2.connect(self.cloud_dsn, connect_timeout=10) + + # ── Status queries ──────────────────────────────────────────────────────── + + def get_local_count(self) -> int: + with self._local_conn() as conn: + with conn.cursor() as cur: + cur.execute(f"SELECT COUNT(*) FROM {self.local_table}") + row = cur.fetchone() + return row[0] if row else 0 + + def get_cloud_cursor(self) -> Optional[datetime]: + """Return MAX(time) from cloud table, or None if empty / table missing.""" + if not self.cloud_dsn: + return None + try: + with self._cloud_conn() as conn: + with conn.cursor() as cur: + cur.execute(f"SELECT MAX(time) FROM {self.cloud_table}") + row = cur.fetchone() + return row[0] if row else None + except psycopg2.errors.UndefinedTable: + return None + except Exception: + return None + + def get_unsynced_count(self, cursor: Optional[datetime]) -> int: + with self._local_conn() as conn: + with conn.cursor() as cur: + if cursor is None: + cur.execute(f"SELECT COUNT(*) FROM {self.local_table}") + else: + cur.execute( + f"SELECT COUNT(*) FROM {self.local_table} WHERE time > %s", + (cursor,), + ) + row = cur.fetchone() + return row[0] if row else 0 + + def list_local_tables(self) -> list: + """List existing tables on the local DB matching ^wfr[0-9].""" + try: + with self._local_conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT table_name + FROM information_schema.tables + WHERE table_schema = 'public' + AND table_type = 'BASE TABLE' + AND table_name ~ '^wfr[0-9]' + ORDER BY table_name DESC + """) + return [r[0] for r in cur.fetchall()] + except Exception as e: + logger.warning(f"list_local_tables failed: {e}") + return [] + + def list_cloud_tables(self) -> list: + """List existing tables on the cloud DB matching ^wfr[0-9]. No DBC needed.""" + if not self.cloud_dsn: + return [] + try: + with self._cloud_conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT table_name + FROM information_schema.tables + WHERE table_schema = 'public' + AND table_type = 'BASE TABLE' + AND table_name ~ '^wfr[0-9]' + ORDER BY table_name DESC + """) + return [r[0] for r in cur.fetchall()] + except Exception as e: + logger.warning(f"list_cloud_tables failed: {e}") + return [] + + def create_cloud_table(self, table_name: str) -> None: + """Create a new cloud hypertable with the given name.""" + table_name = table_name.lower().strip() + if not table_name: + raise ValueError("Table name cannot be empty") + conn = self._cloud_conn() + try: + # Temporarily use a dedicated engine instance to avoid mutating self + tmp = SyncEngine() + tmp.cloud_table = table_name + tmp.ensure_cloud_table(conn) + finally: + conn.close() + + def check_cloud_connection(self) -> dict: + if not self.cloud_dsn: + return {"ok": False, "detail": "CLOUD_POSTGRES_DSN not configured", "latency_ms": None} + t0 = time.monotonic() + try: + conn = self._cloud_conn() + with conn.cursor() as cur: + cur.execute("SELECT 1") + conn.close() + latency_ms = round((time.monotonic() - t0) * 1000) + return {"ok": True, "detail": "Connected", "latency_ms": latency_ms} + except Exception as e: + latency_ms = round((time.monotonic() - t0) * 1000) + return {"ok": False, "detail": str(e), "latency_ms": latency_ms} + + # ── Cloud table setup ───────────────────────────────────────────────────── + + def ensure_cloud_table(self, cloud_conn) -> None: + """Create cloud hypertable if it doesn't exist. Mirrors CANTimescaleStreamer.ensure_season_table().""" + with cloud_conn.cursor() as cur: + cur.execute(f""" + CREATE TABLE IF NOT EXISTS {self.cloud_table} ( + time TIMESTAMPTZ NOT NULL, + message_name TEXT, + can_id INTEGER + ) + """) + cur.execute(""" + SELECT create_hypertable(%s, 'time', + chunk_time_interval => INTERVAL '1 day', + if_not_exists => TRUE) + """, (self.cloud_table,)) + try: + cur.execute(f""" + ALTER TABLE {self.cloud_table} SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'message_name', + timescaledb.compress_orderby = 'time DESC' + ) + """) + except Exception as exc: + logger.debug("Skipping compression setup for '%s': %s", self.cloud_table, exc) + try: + cur.execute(""" + SELECT add_compression_policy(%s, INTERVAL '2 days', if_not_exists => TRUE) + """, (self.cloud_table,)) + except Exception as exc: + logger.debug("Skipping compression policy setup for '%s': %s", self.cloud_table, exc) + cur.execute( + f"CREATE INDEX IF NOT EXISTS {self.cloud_table}_time_idx " + f"ON {self.cloud_table} (time DESC)" + ) + cur.execute( + f"CREATE UNIQUE INDEX IF NOT EXISTS {self.cloud_table}_dedup_idx " + f"ON {self.cloud_table} (time, message_name)" + ) + cloud_conn.commit() + logger.info(f"Cloud table '{self.cloud_table}' ready") + + # ── Dynamic column management ───────────────────────────────────────────── + + def _ensure_cloud_signal_columns( + self, + cloud_conn, + signal_names: Set[str], + known: Set[str], + ) -> None: + """Add missing signal columns to the cloud table. Mirrors timescale_bridge._ensure_signal_columns().""" + new_signals = signal_names - known + if not new_signals: + return + with cloud_conn.cursor() as cur: + for sig in sorted(new_signals): + cur.execute( + f'ALTER TABLE {self.cloud_table} ' + f'ADD COLUMN IF NOT EXISTS "{sig}" DOUBLE PRECISION' + ) + cloud_conn.commit() + known.update(new_signals) + + # ── Main sync ───────────────────────────────────────────────────────────── + + def sync(self, progress_cb: Optional[Callable[[int, int], None]] = None) -> dict: + """ + Copy all rows from local newer than cloud MAX(time) to the cloud table. + + progress_cb(rows_done, rows_total) is called after each batch. + Returns {"rows_synced": int, "batches": int, "elapsed_s": float}. + """ + t0 = time.monotonic() + rows_synced = 0 + batches = 0 + + local_conn = self._local_conn() + cloud_conn = self._cloud_conn() + known_signals: Set[str] = set() + + try: + self.ensure_cloud_table(cloud_conn) + + # Determine sync cursor + with cloud_conn.cursor() as cur: + cur.execute(f"SELECT MAX(time) FROM {self.cloud_table}") + cursor_row = cur.fetchone() + cursor = cursor_row[0] if cursor_row else None + + logger.info(f"Sync cursor: {cursor} (None = full sync)") + + # Count total rows to sync (for progress reporting) + with local_conn.cursor() as count_cur: + if cursor is None: + count_cur.execute(f"SELECT COUNT(*) FROM {self.local_table}") + else: + count_cur.execute( + f"SELECT COUNT(*) FROM {self.local_table} WHERE time > %s", + (cursor,), + ) + total_rows = count_cur.fetchone()[0] + + logger.info(f"Rows to sync: {total_rows}") + + if total_rows == 0: + return {"rows_synced": 0, "batches": 0, "elapsed_s": 0.0} + + # Use a named (server-side) cursor to avoid loading all rows into RAM. + # description is not populated until after the first fetchmany() on named cursors. + with local_conn.cursor("_cloud_sync") as read_cur: + if cursor is None: + read_cur.execute( + f"SELECT * FROM {self.local_table} ORDER BY time" + ) + else: + read_cur.execute( + f"SELECT * FROM {self.local_table} WHERE time > %s ORDER BY time", + (cursor,), + ) + + col_names = None + signal_cols = None + + rows = read_cur.fetchmany(self.batch_size) + while rows: + # description is available after the first fetch on named cursors + if col_names is None: + col_names = [d.name for d in read_cur.description] + signal_cols = [c for c in col_names if c not in _FIXED_COLS] + # Ensure all local signal columns exist in cloud upfront — + # batches may have nulls for columns that exist in the schema. + self._ensure_cloud_signal_columns(cloud_conn, set(signal_cols), known_signals) + + # Build INSERT for the fixed + all signal columns + all_cols = ["time", "message_name", "can_id"] + signal_cols + col_sql = ", ".join(f'"{c}"' for c in all_cols) + + if signal_cols: + update_sql = ", ".join(f'"{s}" = EXCLUDED."{s}"' for s in signal_cols) + update_sql += ', "can_id" = EXCLUDED."can_id"' + else: + update_sql = '"can_id" = EXCLUDED."can_id"' + + insert_sql = ( + f'INSERT INTO {self.cloud_table} ({col_sql}) VALUES %s ' + f'ON CONFLICT (time, message_name) DO UPDATE SET {update_sql}' + ) + + # Build value tuples + values = [] + for row in rows: + row_dict = dict(zip(col_names, row)) + tup = tuple(row_dict.get(c) for c in all_cols) + values.append(tup) + + with cloud_conn.cursor() as write_cur: + psycopg2.extras.execute_values( + write_cur, insert_sql, values, page_size=self.batch_size + ) + cloud_conn.commit() + + rows_synced += len(rows) + batches += 1 + + if progress_cb: + progress_cb(rows_synced, total_rows) + + logger.info(f"Batch {batches}: {rows_synced}/{total_rows} rows synced") + rows = read_cur.fetchmany(self.batch_size) + + finally: + try: + local_conn.close() + except Exception: + logger.debug("Ignoring error while closing local DB connection", exc_info=True) + try: + cloud_conn.close() + except Exception: + logger.debug("Ignoring error while closing cloud DB connection", exc_info=True) + + elapsed = time.monotonic() - t0 + logger.info(f"Sync complete: {rows_synced} rows in {elapsed:.1f}s ({batches} batches)") + return {"rows_synced": rows_synced, "batches": batches, "elapsed_s": round(elapsed, 2)} diff --git a/universal-telemetry-software/deploy/.env.macbook b/universal-telemetry-software/deploy/.env.macbook index 72b933a9..d1b9d436 100644 --- a/universal-telemetry-software/deploy/.env.macbook +++ b/universal-telemetry-software/deploy/.env.macbook @@ -2,6 +2,14 @@ # Usage: docker compose -f deploy/docker-compose.macbook-base.yml --env-file deploy/.env.macbook up -d REMOTE_IP=10.71.1.10 -TIMESCALE_TABLE=WFR26test -DBC_HOST_PATH=./example.dbc +TIMESCALE_TABLE=wfr26base +DBC_HOST_PATH=../../secret-dbc/WFR25.dbc +DBC_DISPLAY_NAME=WFR25.dbc GRAFANA_ADMIN_PASSWORD=admin + +# Cloud Sync dashboard — http://localhost:8092 +# Fill in your VPS DSN to enable syncing to the cloud TimescaleDB. +# Example: postgresql://wfr:password@your-vps-ip:5432/wfr +CLOUD_POSTGRES_DSN=postgresql://wfr:wfr_password@100.72.11.60:5432/wfr +CLOUD_TABLE=wfr26 +LOCAL_TABLE=wfr26base diff --git a/universal-telemetry-software/deploy/docker-compose.macbook-base.yml b/universal-telemetry-software/deploy/docker-compose.macbook-base.yml index 39c8b931..640ad865 100644 --- a/universal-telemetry-software/deploy/docker-compose.macbook-base.yml +++ b/universal-telemetry-software/deploy/docker-compose.macbook-base.yml @@ -1,17 +1,25 @@ # MacBook base station — full local stack with TimescaleDB persistence # Use this for development and testing with full telemetry recording. -# Run from universal-telemetry-software/: -# docker compose -f deploy/docker-compose.macbook-base.yml up -d --build +# Run from repo root (data-acquisition/): +# docker compose -f universal-telemetry-software/deploy/docker-compose.macbook-base.yml --env-file universal-telemetry-software/deploy/.env.macbook up -d --build # # Access points: # Pecan dashboard: http://localhost:3000 +# Status page: http://localhost:8080 # Grafana: http://localhost:8087 # TimescaleDB: postgresql://wfr:wfr_password@localhost:5432/wfr +# Cloud Sync: http://localhost:8092 networks: datalink: driver: bridge +volumes: + redis_data: + timescaledb_data: + raw_can_data: + grafana_storage: + services: telemetry: image: ghcr.io/western-formula-racing/daq-radio/universal-telemetry:latest @@ -35,10 +43,11 @@ services: - ENABLE_VIDEO=false - ENABLE_AUDIO=false - SET_TIME_ENABLED=false - - ENABLE_TIMESCALE_LOGGING=true + - ENABLE_TIMESCALE_LOGGING=${ENABLE_TIMESCALE_LOGGING:-false} - POSTGRES_DSN=postgresql://wfr:wfr_password@timescaledb:5432/wfr - - TIMESCALE_TABLE=${TIMESCALE_TABLE:-WFR26test}_base + - TIMESCALE_TABLE=${TIMESCALE_TABLE:-wfr26base} - DBC_FILE_PATH=/app/active.dbc + - DBC_DISPLAY_NAME=${DBC_DISPLAY_NAME:-example.dbc} volumes: - ${DBC_HOST_PATH:-./example.dbc}:/app/active.dbc:ro - raw_can_data:/app/raw_can_logs @@ -91,7 +100,7 @@ services: networks: - datalink - # ── TimescaleDB (local, for received telemetry hot-load) ───────────────── +# ── TimescaleDB (local, for received telemetry hot-load) ───────────────── timescaledb: image: timescale/timescaledb:latest-pg16 container_name: daq-timescaledb @@ -105,6 +114,8 @@ services: volumes: - timescaledb_data:/var/lib/postgresql/data - ./timescaledb_init_macbook.sql:/docker-entrypoint-initdb.d/001_init.sql:ro + - ./pg_hba.conf:/etc/postgresql/pg_hba.conf:ro + command: postgres -c hba_file=/etc/postgresql/pg_hba.conf networks: - datalink healthcheck: @@ -118,7 +129,55 @@ services: limits: memory: 8G -volumes: - redis_data: - timescaledb_data: - raw_can_data: + cloud-sync: + build: ../cloud-sync + container_name: daq-cloud-sync + restart: unless-stopped + ports: + - "8092:8092" + environment: + - LOCAL_POSTGRES_DSN=postgresql://wfr:wfr_password@timescaledb:5432/wfr + - LOCAL_TABLE=${LOCAL_TABLE:-${TIMESCALE_TABLE:-wfr26base}} + - CLOUD_POSTGRES_DSN=${CLOUD_POSTGRES_DSN:-} + - CLOUD_TABLE=${CLOUD_TABLE:-wfr26} + - SYNC_BATCH_SIZE=${SYNC_BATCH_SIZE:-5000} + depends_on: + timescaledb: + condition: service_healthy + networks: + - datalink + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" + + grafana: + image: grafana/grafana + container_name: daq-grafana + restart: unless-stopped + ports: + - "8087:3000" + environment: + GF_SECURITY_ADMIN_USER: admin + GF_SECURITY_ADMIN_PASSWORD: ${GRAFANA_ADMIN_PASSWORD:-admin} + GF_AUTH_ANONYMOUS_ENABLED: "true" + GF_AUTH_ANONYMOUS_ORG_ROLE: Viewer + GF_AUTH_ANONYMOUS_ALLOW_SIGN_UP: "false" + GF_INSTALL_PLUGINS: grafana-clock-panel + GF_PATHS_PROVISIONING: /etc/grafana/provisioning + GF_DASHBOARDS_MIN_REFRESH_INTERVAL: 100ms + POSTGRES_USER: wfr + POSTGRES_PASSWORD: wfr_password + volumes: + - grafana_storage:/var/lib/grafana + - ../../server/installer/grafana/provisioning:/etc/grafana/provisioning:ro + - ../../server/installer/grafana/dashboards:/var/lib/grafana/dashboards:ro + networks: + - datalink + depends_on: + timescaledb: + condition: service_healthy + +# Run from repo root (data-acquisition/): +# docker compose -f universal-telemetry-software/deploy/docker-compose.macbook-base.yml --env-file universal-telemetry-software/deploy/.env.macbook up -d diff --git a/universal-telemetry-software/deploy/offline/.gitignore b/universal-telemetry-software/deploy/offline/.gitignore new file mode 100644 index 00000000..74a4c43d --- /dev/null +++ b/universal-telemetry-software/deploy/offline/.gitignore @@ -0,0 +1,2 @@ +*.tar +*.tar.gz diff --git a/universal-telemetry-software/deploy/offline/README.md b/universal-telemetry-software/deploy/offline/README.md new file mode 100644 index 00000000..7f5e3d43 --- /dev/null +++ b/universal-telemetry-software/deploy/offline/README.md @@ -0,0 +1,38 @@ +# Offline Docker Images + +This folder contains a tarball of all Docker images needed to run the MacBook stack without internet. + +## Build the tarball (with internet, before going to the field) + +```bash +cd universal-telemetry-software/deploy +docker save \ + ghcr.io/western-formula-racing/daq-radio/universal-telemetry:latest \ + ghcr.io/western-formula-racing/daq-radio/pecan:latest \ + timescale/timescaledb:latest-pg16 \ + redis:8.2 \ + bluenviron/mediamtx:latest \ + deploy-cloud-sync:latest \ + grafana/grafana:latest \ + -o offline/wfr-docker-images.tar +``` + +## Load the tarball (on site, no internet) + +```bash +cd universal-telemetry-software/deploy +docker load -i offline/wfr-docker-images.tar +``` + +## Bring up the stack offline + +```bash +cd universal-telemetry-software/deploy +docker compose -f docker-compose.macbook-base.yml --env-file .env.macbook up -d +``` + +Then access at: +- Pecan: http://localhost:3000 +- Grafana: http://localhost:8087 (admin / admin) +- Status: http://localhost:8080 +- Cloud Sync: http://localhost:8092 diff --git a/universal-telemetry-software/deploy/pg_hba.conf b/universal-telemetry-software/deploy/pg_hba.conf new file mode 100644 index 00000000..7bf5b614 --- /dev/null +++ b/universal-telemetry-software/deploy/pg_hba.conf @@ -0,0 +1,10 @@ +# PostgreSQL Client Authentication Configuration +# local dev — all external connections use md5 (required for DBeaver TimescaleDB driver) + +local all all trust +host all all 127.0.0.1/32 trust +host all all ::1/128 trust +local replication all trust +host replication all 127.0.0.1/32 trust +host replication all ::1/128 trust +host all all all md5 diff --git a/universal-telemetry-software/deploy/timescaledb_init_macbook.sql b/universal-telemetry-software/deploy/timescaledb_init_macbook.sql index d5a11e08..b03e2d31 100644 --- a/universal-telemetry-software/deploy/timescaledb_init_macbook.sql +++ b/universal-telemetry-software/deploy/timescaledb_init_macbook.sql @@ -58,7 +58,7 @@ $$; -- ───────────────────────────────────────────────────────────── -- Season table — WFR26test (no preloaded data) -- ───────────────────────────────────────────────────────────── -SELECT create_season_table('WFR26test'); +SELECT create_season_table('wfr26base'); -- ───────────────────────────────────────────────────────────── -- Monitoring table diff --git a/universal-telemetry-software/src/data.py b/universal-telemetry-software/src/data.py index 18255918..f35cfcca 100644 --- a/universal-telemetry-software/src/data.py +++ b/universal-telemetry-software/src/data.py @@ -636,7 +636,7 @@ async def stats_publisher(): "ecu_synced": self._ecu_synced, "ecu_sync_source": self._sync_source, "timescale": timescale_status, - "dbc_file": os.getenv("DBC_FILE_PATH", "unknown"), + "dbc_file": os.getenv("DBC_DISPLAY_NAME") or os.path.basename(os.getenv("DBC_FILE_PATH", "unknown")), "car_time_synced": self._car_time_synced, "base_clock_bad": self._base_clock_bad, "last_udp_time": self.last_udp_time, diff --git a/universal-telemetry-software/status/index.html b/universal-telemetry-software/status/index.html index f29ad353..48983c32 100644 --- a/universal-telemetry-software/status/index.html +++ b/universal-telemetry-software/status/index.html @@ -539,6 +539,10 @@

[DAQ_BASE_STATION]

Write Errors -- +
+ DBC File + -- +
@@ -729,7 +733,13 @@

[DAQ_BASE_STATION]

if (stats.dbc_file) { const dbcWarning = document.getElementById('dbcWarningBanner'); - if (stats.dbc_file.includes('example.dbc')) { + const dbcFileName = document.getElementById('dbcFileName'); + const isExample = stats.dbc_file.includes('example.dbc'); + // Show just the filename, not the full container path + const shortName = stats.dbc_file.split('/').pop(); + dbcFileName.textContent = shortName; + dbcFileName.style.color = isExample ? '#f85149' : '#3fb950'; + if (isExample) { dbcWarning.style.display = 'block'; } else { dbcWarning.style.display = 'none';