|
| 1 | +# Periodic materialization — Cloud Run Job + Cloud Scheduler |
| 2 | + |
| 3 | +Run `bqaa-materialize-window` on a cron, against your own |
| 4 | +BigQuery project, with one local command and one deploy command. |
| 5 | + |
| 6 | +The MAKO demo (`examples/migration_v5/`) ships the ontology, |
| 7 | +binding, and entity-table DDL. This directory wraps them in a |
| 8 | +hands-off scheduled deployment: a Cloud Run Job that fires every |
| 9 | +N hours via Cloud Scheduler, materializes the last N hours of |
| 10 | +events into your graph dataset, and emits a structured JSON |
| 11 | +report to Cloud Logging. |
| 12 | + |
| 13 | +## Production shape |
| 14 | + |
| 15 | +``` |
| 16 | +agent_events bqaa-materialize-window graph entity/ |
| 17 | +(your events DS) ────► (Cloud Run Job, every N hrs) ──► relationship tables |
| 18 | + │ (your graph DS) |
| 19 | + ▼ |
| 20 | + _bqaa_materialization_state |
| 21 | + (checkpoint / state table, |
| 22 | + co-located with the graph DS) |
| 23 | +``` |
| 24 | + |
| 25 | +Per run, the orchestrator: |
| 26 | + |
| 27 | +1. Reads the prior checkpoint from `_bqaa_materialization_state`. |
| 28 | +2. Scans events in `[checkpoint - overlap_minutes, run_started_at)`, |
| 29 | + capped at `lookback_hours` worth of history. |
| 30 | +3. Discovers terminal-event sessions (`event_type = |
| 31 | + 'AGENT_COMPLETED'`) and materializes them one at a time. |
| 32 | +4. Advances the checkpoint to the latest successful session's |
| 33 | + completion timestamp (never past a failure — partial failure |
| 34 | + leaves a tight high-water mark for the next run). |
| 35 | +5. Writes the JSON report to stdout for Cloud Logging. |
| 36 | + |
| 37 | +State-table semantics, overlap-windowed late-arrival handling, |
| 38 | +and idempotent retries are all in the SDK's design contract — see |
| 39 | +`src/bigquery_agent_analytics/materialize_window.py` for the |
| 40 | +full prose. |
| 41 | + |
| 42 | +## Prerequisites |
| 43 | + |
| 44 | +* GCP project with the BigQuery, Cloud Run, Cloud Scheduler, and |
| 45 | + Cloud Build APIs enabled. |
| 46 | +* **Events dataset** (`BQAA_EVENTS_DATASET_ID`) already exists |
| 47 | + with a populated `agent_events` table. The BQ AA plugin writes |
| 48 | + to this; if you've never run an agent against BQAA, seed one |
| 49 | + for this demo via `python examples/migration_v5/run_agent.py |
| 50 | + --project YOUR_PROJECT --dataset YOUR_EVENTS_DS --sessions 3`. |
| 51 | + This dataset is **read-only** for the periodic job — the |
| 52 | + job never writes here. |
| 53 | +* **Graph dataset** (`BQAA_GRAPH_DATASET_ID`) — `run_job.py` |
| 54 | + creates this on first invocation if missing (idempotent), so |
| 55 | + you don't have to pre-create it. The entity/relationship |
| 56 | + tables and the state/checkpoint table all live here. |
| 57 | +* `gcloud` authenticated with permissions to deploy Cloud Run |
| 58 | + Jobs, create scheduler triggers, and grant IAM bindings. |
| 59 | + |
| 60 | +## Local dry-run |
| 61 | + |
| 62 | +Run the job once on your laptop against a real BigQuery project — |
| 63 | +no Cloud Run required. Useful for shaking out the env-var setup |
| 64 | +before paying for a deploy: |
| 65 | + |
| 66 | +```bash |
| 67 | +# From the repo root, install the SDK in editable mode. The |
| 68 | +# example uses bigquery_agent_analytics.materialize_window |
| 69 | +# (added in PR #162); this isn't in the 0.3.0 PyPI release |
| 70 | +# yet, so install from local until 0.4.0 ships. |
| 71 | +pip install -e . |
| 72 | + |
| 73 | +# Then install the example's ancillary deps: |
| 74 | +pip install -r examples/migration_v5/periodic_materialization/requirements.txt |
| 75 | + |
| 76 | +BQAA_PROJECT_ID=your-project \ |
| 77 | +BQAA_EVENTS_DATASET_ID=your_events_dataset \ |
| 78 | +BQAA_GRAPH_DATASET_ID=your_graph_dataset \ |
| 79 | +BQAA_LOOKBACK_HOURS=6 \ |
| 80 | +BQAA_OVERLAP_MINUTES=15 \ |
| 81 | +python examples/migration_v5/periodic_materialization/run_job.py |
| 82 | +``` |
| 83 | + |
| 84 | +Output is a single JSON line on stdout (the materialize-window |
| 85 | +report) — pipe through `jq` for readability: |
| 86 | + |
| 87 | +```bash |
| 88 | +... python run_job.py | jq . |
| 89 | +``` |
| 90 | + |
| 91 | +Exit codes mirror the SDK CLI: |
| 92 | + |
| 93 | +* `0` — every discovered session materialized cleanly. |
| 94 | +* `1` — expected failure: at least one session failed, or |
| 95 | + binding-validate detected schema drift against live BigQuery. |
| 96 | +* `2` — unexpected internal error (config missing, code bug). |
| 97 | + |
| 98 | +## Deploy to Cloud Run + Cloud Scheduler |
| 99 | + |
| 100 | +One command: |
| 101 | + |
| 102 | +```bash |
| 103 | +./examples/migration_v5/periodic_materialization/deploy_cloud_run_job.sh \ |
| 104 | + --project your-project \ |
| 105 | + --region us-central1 \ |
| 106 | + --events-dataset your_events_dataset \ |
| 107 | + --graph-dataset your_graph_dataset \ |
| 108 | + --schedule "0 */6 * * *" \ |
| 109 | + --smoke |
| 110 | +``` |
| 111 | + |
| 112 | +`--smoke` (optional) runs the job once after deploy and tails |
| 113 | +the logs, so you find out *now* whether the deploy actually |
| 114 | +works — not when the first scheduled fire happens six hours |
| 115 | +later. |
| 116 | + |
| 117 | +The script: |
| 118 | + |
| 119 | +1. **Pre-creates the graph dataset** (`bq mk`, idempotent) so |
| 120 | + the runtime SA never needs `bigquery.datasets.create`. |
| 121 | +2. **Creates a service account** (`bqaa-periodic-sa@…`) if |
| 122 | + absent. This SA serves two roles: **runtime identity** for |
| 123 | + the Cloud Run Job (does the BigQuery work) and **scheduler |
| 124 | + caller** for the Cloud Scheduler HTTP trigger. For |
| 125 | + production, splitting these into separate SAs is reasonable |
| 126 | + hardening; the script's structure makes that a small edit. |
| 127 | +3. **Grants narrow IAM** to the SA: |
| 128 | + * Project-level `roles/bigquery.jobUser` — |
| 129 | + `bigquery.jobs.create` only. |
| 130 | + * Dataset-level `roles/bigquery.dataViewer` on |
| 131 | + **events** — read-only access. The events dataset stays |
| 132 | + effectively read-only per the contract above. |
| 133 | + * Dataset-level `roles/bigquery.dataEditor` on |
| 134 | + **graph** — read + write on entity tables, state table, |
| 135 | + DDL bootstrap. |
| 136 | +4. **Bundles the deploy** into a self-contained staging dir: |
| 137 | + `run_job.py`, demo artifacts, **the local SDK source** |
| 138 | + under `sdk_src/`. The deploy-time `requirements.txt` |
| 139 | + installs the SDK from `./sdk_src` (not PyPI) so the |
| 140 | + deployed image uses the same code as the local dry-run. |
| 141 | + This avoids depending on a PyPI release that may not yet |
| 142 | + contain `materialize_window` (added in PR #162). |
| 143 | +5. **Deploys the Cloud Run Job** via `gcloud run jobs deploy |
| 144 | + --source <staging>` (Buildpacks autodetects Python) with |
| 145 | + `--service-account` pointing at the SA. The job's runtime |
| 146 | + identity is the SA, **not** the Compute Engine default |
| 147 | + service account — important, since the default SA may lack |
| 148 | + the dataset-level perms above. |
| 149 | +6. **Grants `roles/run.invoker`** on the job to the same SA |
| 150 | + (the scheduler-caller side of the cross-product). |
| 151 | +7. **Creates / updates a Cloud Scheduler HTTP job** that POSTs |
| 152 | + to the Cloud Run Jobs `:run` endpoint with the SA's OAuth |
| 153 | + identity. |
| 154 | + |
| 155 | +## Inspecting results |
| 156 | + |
| 157 | +**The JSON report (Cloud Logging).** Every run emits a |
| 158 | +single-line JSON to stdout, picked up by Cloud Logging as a |
| 159 | +structured entry. Filter on `resource.labels.job_name=<job>`: |
| 160 | + |
| 161 | +```bash |
| 162 | +gcloud logging read \ |
| 163 | + "resource.type=cloud_run_job AND \ |
| 164 | + resource.labels.job_name=bqaa-periodic-materialization AND \ |
| 165 | + jsonPayload.message=\"materialization complete\"" \ |
| 166 | + --project your-project \ |
| 167 | + --limit 5 \ |
| 168 | + --format='value(jsonPayload)' |
| 169 | +``` |
| 170 | + |
| 171 | +Each entry includes: |
| 172 | + |
| 173 | +* `run_id`, `state_key`, `window_start`, `window_end`. |
| 174 | +* `sessions_discovered` / `sessions_materialized` / |
| 175 | + `sessions_failed`. |
| 176 | +* `rows_materialized` — per-entity row counts. |
| 177 | +* `table_statuses` — per-table cleanup/insert status. A |
| 178 | + `cleanup_status = "delete_failed"` entry means the BQ |
| 179 | + streaming buffer pinned a table within the ~90-min window — |
| 180 | + expected, not a code error. |
| 181 | +* `compiled_outcomes` — C2 (compiled-extractor) telemetry. |
| 182 | +* `failures` — list of failed sessions with error codes. |
| 183 | +* `ok` — overall success boolean. |
| 184 | + |
| 185 | +**The state table.** Co-located with the graph dataset (NOT |
| 186 | +the events dataset — the events dataset stays read-only per |
| 187 | +the contract above). A real BQ table at |
| 188 | +`<project>.<graph_dataset>._bqaa_materialization_state`, one |
| 189 | +append-only row per run. `run_job.py` passes |
| 190 | +`state_table="{project}.{graph_dataset}._bqaa_materialization_state"` |
| 191 | +explicitly to the orchestrator so the default-dataset fallback |
| 192 | +can never point it at the events dataset. Query it for the |
| 193 | +audit log: |
| 194 | + |
| 195 | +```sql |
| 196 | +SELECT |
| 197 | + run_started_at, |
| 198 | + scan_start, |
| 199 | + scan_end, |
| 200 | + last_completion_at AS checkpoint, |
| 201 | + sessions_discovered, |
| 202 | + sessions_materialized, |
| 203 | + sessions_failed, |
| 204 | + ok, |
| 205 | + error_detail |
| 206 | +FROM `your-project.your_graph_dataset._bqaa_materialization_state` |
| 207 | +ORDER BY run_started_at DESC |
| 208 | +LIMIT 20; |
| 209 | +``` |
| 210 | + |
| 211 | +The `state_key` column (sha256 of the config) lets you separate |
| 212 | +runs from different ontology/binding/predicate combinations — a |
| 213 | +predicate switch (e.g. swapping `--completion-event-type`) shows |
| 214 | +up as a new key with a fresh bootstrap, not an inherited |
| 215 | +checkpoint. |
| 216 | + |
| 217 | +## Configuration reference |
| 218 | + |
| 219 | +All configuration goes through env vars on the Cloud Run Job. |
| 220 | +The deploy script wires them via `--set-env-vars`; for local |
| 221 | +dry-run, set them in your shell. |
| 222 | + |
| 223 | +| Env var | Required | Default | Notes | |
| 224 | +|----------------------------|----------|---------|-------| |
| 225 | +| `BQAA_PROJECT_ID` | yes | — | GCP project. | |
| 226 | +| `BQAA_EVENTS_DATASET_ID` | yes | — | Dataset with `agent_events`. | |
| 227 | +| `BQAA_GRAPH_DATASET_ID` | yes | — | Target dataset for entity/relationship tables + the state table. | |
| 228 | +| `BQAA_LOCATION` | no | `US` | BigQuery location. Must match both datasets. | |
| 229 | +| `BQAA_LOOKBACK_HOURS` | no | `6` | Max history scanned per run. Hard upper bound on scan window. | |
| 230 | +| `BQAA_OVERLAP_MINUTES` | no | `15` | Re-scan window for late-arriving events. Bump (e.g. `60`) if ingestion can lag tens of minutes. | |
| 231 | +| `BQAA_MAX_SESSIONS` | no | unlimited | Per-run cost guardrail. | |
| 232 | + |
| 233 | +## Operational notes |
| 234 | + |
| 235 | +**State-table behavior.** Append-only; never truncate it. Each |
| 236 | +run inserts one row. The next run reads |
| 237 | +`MAX(last_completion_at) WHERE state_key = <current_config>` as |
| 238 | +its starting point. A heartbeat row (empty window) carries |
| 239 | +forward the prior checkpoint so the most recent row is self- |
| 240 | +documenting. |
| 241 | + |
| 242 | +**Overlap-windowed re-claim.** `BQAA_OVERLAP_MINUTES` re-scans |
| 243 | +events slightly older than the prior checkpoint. Default 15 min |
| 244 | +is fine for low-latency ingestion; bump higher for slower |
| 245 | +sources. The materializer is idempotent per session (delete-then- |
| 246 | +insert keyed on `session_id`), so re-scanning is safe. |
| 247 | + |
| 248 | +**Partial failures.** If session 3 of 5 raises during |
| 249 | +extraction, the orchestrator stops, advances the checkpoint to |
| 250 | +session 2's completion timestamp, writes a state row with |
| 251 | +`ok=False`, and exits non-zero. The next scheduled run picks up |
| 252 | +from session 2's timestamp and retries session 3 (idempotent |
| 253 | +because session-level delete-then-insert). |
| 254 | + |
| 255 | +**Streaming-buffer pinning.** When inserts land in the streaming |
| 256 | +buffer (default for `insert_rows_json`), BQ pins those rows for |
| 257 | +~30-90 min during which DML `DELETE` returns an error. The |
| 258 | +materializer surfaces this as `cleanup_status = "delete_failed"` |
| 259 | +in `table_statuses` — operator-visible, not silent. The session- |
| 260 | +level delete-then-insert pattern degrades gracefully: if delete |
| 261 | +failed, the insert still happens, producing duplicates that the |
| 262 | +*next* successful delete cleans up. |
| 263 | + |
| 264 | +**Idempotent retries.** Cloud Run Job retry policy: this script |
| 265 | +sets `--max-retries 1`. If a transient BQ error fails a run, |
| 266 | +Cloud Run retries once; the orchestrator's checkpoint plus |
| 267 | +session-level idempotency ensure no double-counting. For |
| 268 | +sustained failure (e.g., binding drift), the second retry will |
| 269 | +also fail and the scheduled fire will be reported as failed in |
| 270 | +Cloud Monitoring. Set up an alert on |
| 271 | +`logging.googleapis.com/log_entry_count` with severity `ERROR`. |
| 272 | + |
| 273 | +## Not in scope here |
| 274 | + |
| 275 | +* **Terraform / Pulumi.** A scripted deploy is easier to read |
| 276 | + and easier to copy than IaC. IaC can come once the command |
| 277 | + shape stabilizes. |
| 278 | +* **Compiled-bundle materialization.** This example uses the |
| 279 | + plain `from_ontology_binding` extraction path (Gemini-backed). |
| 280 | + For compiled extractors (`--bundles-root`), see |
| 281 | + `docs/extractor_compilation/` and PR #152. |
| 282 | +* **Backfill mode.** A separate `--backfill --from / --to` |
| 283 | + CLI mode is on the roadmap (per #161); for now, run the |
| 284 | + CLI manually with a wider `--lookback-hours` to catch up. |
| 285 | + |
| 286 | +## Troubleshooting |
| 287 | + |
| 288 | +**`required env var BQAA_PROJECT_ID is not set`** — the local |
| 289 | +dry-run path. Set the three required env vars in your shell. |
| 290 | + |
| 291 | +**`binding-validate failed before extraction`** — the schema |
| 292 | +drift contract from #161. Your binding references columns that |
| 293 | +don't exist in the live tables. Either fix the binding, fix the |
| 294 | +tables, or pass `--no-validate-binding` to bypass (not |
| 295 | +recommended in production). |
| 296 | + |
| 297 | +**`Permission denied: bigquery.datasets.create`** — the runtime |
| 298 | +SA lacks dataset-create permission. The deploy script grants |
| 299 | +project-level `roles/bigquery.user` which includes this; if you |
| 300 | +swapped in a custom SA, grant it manually or pre-create the |
| 301 | +graph dataset (`bq mk --location=$LOCATION |
| 302 | +$PROJECT:$GRAPH_DATASET`) and grant the SA dataEditor on it. |
| 303 | + |
| 304 | +**`insert_failed` across every table on the first run** — the |
| 305 | +entity tables don't exist yet. The wrapper bootstraps them via |
| 306 | +`CREATE TABLE IF NOT EXISTS`, but if the runtime SA lacks |
| 307 | +`bigquery.tables.create`, the bootstrap silently no-ops and |
| 308 | +inserts fail. The deploy script grants `roles/bigquery.user` + |
| 309 | +`roles/bigquery.dataEditor` to cover this. |
| 310 | + |
| 311 | +**Scheduler fires but the job doesn't run** — IAM. Confirm the |
| 312 | +scheduler's service account (`bqaa-periodic-sa@…`) has |
| 313 | +`roles/run.invoker` on the job. The deploy script grants this; |
| 314 | +if you renamed the SA or job, regrant manually. |
0 commit comments