Status: Implemented (PR C2.c.3 of issue #75 Phase C / Milestone C2)
Parent epic: issue #75
Builds on: extractor_compilation_bundle_loader.md (PR C2.a), extractor_compilation_orchestrator_swap.md (PR C2.c.2)
Working plan: issue #96, Milestone C2 / PR C2.c.3
Compiled bundles live on the filesystem and are loaded by load_bundle / discover_bundles (C2.a). This module adds a publish/sync utility so bundles can flow between processes via a BigQuery table — useful for Cloud Run, Cloud Functions, ephemeral CI workers, or any environment where the filesystem isn't shared.
The mirror is a utility, not a runtime loader. The runtime path stays unchanged:
sync_bundles_from_bq → discover_bundles → from_bundles_root
Sync writes verified files to a local directory and lets C2.a's existing loader do the actual import. There is no "fetch-direct-from-BQ" loader — that would double the trust surface and diverge from the loader's audit fields.
from bigquery_agent_analytics.extractor_compilation import (
publish_bundles_to_bq,
sync_bundles_from_bq,
BigQueryBundleStore,
BUNDLE_MIRROR_TABLE_SCHEMA,
PublishResult,
SyncResult,
MirrorFailure,
BundleRow,
BundleStore,
)
from google.cloud import bigquery
# 1. Stand up the store (creates the table if missing).
client = bigquery.Client(project="my-project", location="US")
store = BigQueryBundleStore(
bq_client=client,
table_id="my-project.my_dataset.compiled_bundles",
)
store.ensure_table()
# 2. Publish local bundles to BigQuery.
publish: PublishResult = publish_bundles_to_bq(
bundle_root=pathlib.Path("/var/bqaa/bundles"),
store=store,
bundle_fingerprint_allowlist=None, # or a list of fingerprints
)
# 3. Elsewhere (different process / VM / Cloud Run instance):
sync: SyncResult = sync_bundles_from_bq(
store=store,
dest_dir=pathlib.Path("/tmp/synced-bundles"),
bundle_fingerprint_allowlist=None,
)
# 4. Wire the synced dir into the runtime via C2.a.
from bigquery_agent_analytics.ontology_graph import OntologyGraphManager
manager = OntologyGraphManager.from_bundles_root(
project_id="my-project",
dataset_id="my_dataset",
ontology=ontology,
binding=binding,
bundles_root=sync.dest_dir,
expected_fingerprint=fingerprint,
fallback_extractors=fallback_extractors,
)Publish:
- Walk
bundle_root; for each subdirectory, readmanifest.jsonand parse viaManifest.from_json. - Skip if
bundle_fingerprint_allowlistis set and the manifest's fingerprint isn't in it (lands inskipped_fingerprints). - Run
load_bundle(child, expected_fingerprint=manifest.fingerprint)as a pre-publish validation gate. Bundles that wouldn't load at runtime are NOT published; the mirror only distributes working bundles. Failures land infailureswith codebundle_load_failedand the underlying loader code indetail. - Emit two
BundleRows per bundle (manifest + module file) with denormalizedevent_types/module_filename/function_namefor query-side filtering. - Call
store.publish_rows(rows)once for the whole batch.BigQueryBundleStoreissuesDELETE FROM ... WHERE (fingerprint, path) IN (...)thenINSERT, so re-publishing the same fingerprint replaces the prior rows rather than accumulating.
Sync:
- Fetch rows via
store.fetch_rows(bundle_fingerprints=allowlist). - Shape-check each row (
malformed_rowif wrong types). - Group by fingerprint; per fingerprint:
- Reject if any row's
bundle_pathis unsafe (invalid_bundle_path: traversal, absolute path, NUL, backslash). - Reject if any
(fingerprint, bundle_path)pair appears twice (duplicate_row). - Require the
manifest.jsonrow (manifest_row_missing) and parse it (manifest_row_unreadable). - Require exactly the manifest's two files (
unexpected_fileif any extra;module_row_missingif the module row is absent). - Write the two files into
dest_dir/<fingerprint>/. - Run
load_bundle(dest_dir/<fingerprint>, expected_fingerprint=fp)as a post-sync validation gate. Tampered or incomplete bundles fail at sync (bundle_load_failed) and the partial directory is scrubbed.
- Reject if any row's
- Fingerprints in the allowlist that have no rows surface as
fingerprint_not_in_tablefailures — the operator knows the publish lag hasn't caught up.
BUNDLE_MIRROR_TABLE_SCHEMA (tuples of (name, type, mode)):
bundle_fingerprint STRING REQUIRED
bundle_path STRING REQUIRED -- "manifest.json" or the manifest's module_filename
file_content BYTES REQUIRED
event_types STRING REPEATED -- denorm from manifest, for query-side filter
module_filename STRING NULLABLE -- denorm from manifest
function_name STRING NULLABLE -- denorm from manifest
published_at TIMESTAMP REQUIRED
Logical primary key: (bundle_fingerprint, bundle_path). BigQuery doesn't enforce uniqueness; BigQueryBundleStore.publish_rows enforces it via DELETE+INSERT, and sync rejects duplicates fail-closed.
The denormalized fields exist for query convenience (SELECT DISTINCT bundle_fingerprint FROM mirror WHERE 'bka_decision' IN UNNEST(event_types)). They are NOT the source of truth at sync time — sync re-parses manifest.json from the row content. The denorm is for query speed; correctness comes from re-validating against load_bundle.
Callers can switch on MirrorFailure.code:
Publish-side:
bundle_root_missing—bundle_rootis not a directory.manifest_missing— bundle subdir has nomanifest.json.manifest_unreadable— manifest fails to parse or has wrong shape.bundle_load_failed— bundle wouldn't load viaload_bundlepre-publish.detailcarries the underlying loader code.duplicate_fingerprint— two or more subdirs ofbundle_rootdeclare the same manifest fingerprint. The mirror is keyed on(bundle_fingerprint, bundle_path); publishing both would land contents-of-the-loser in the table and corrupt the bundle identity. Fail-closed: every participating subdir gets a failure record and no rows are emitted for that fingerprint.
Sync-side:
fingerprint_not_in_table— allowlist named a fingerprint with no rows.manifest_row_missing— bundle has rows but nomanifest.jsonrow.manifest_row_unreadable— manifest row content isn't a validManifest. Also fires when the parsed manifest's shape would let a path-escape or write failure slip past_validate_bundle_path(module_filenamecontaining a path separator, NUL,./.., or non-string fields).invalid_bundle_path— traversal / absolute / NUL / backslash. Offender is never written to disk.unexpected_file— row whosebundle_pathisn'tmanifest.jsonnor the manifest'smodule_filename. Bundles are exactly two files; anything extra is rejected.module_row_missing— manifest is fine but no row for the module file.duplicate_row— two rows share the same(fingerprint, bundle_path).malformed_row— row fields have wrong types (e.g.file_contentnot bytes) or thebundle_fingerprintisn't a strict 64-char lowercase sha256 hex string. The fingerprint check is load-bearing: sync uses the fingerprint as a directory name (dest_dir/<fingerprint>/), so a tampered value like"../escape"would otherwise write outsidedest_dir.bundle_load_failed— sync wrote files to a staging directory butload_bundlerejected the reconstruction. The staging directory is removed and any pre-existingdest_dir/<fingerprint>/is left intact — a bad mirror row never destroys good local state.
Neither publish_bundles_to_bq nor sync_bundles_from_bq raises on per-bundle problems; failures accumulate. Store exceptions (BQ-side: network, auth, table missing) DO propagate — that's the right boundary for "fix the connection and retry."
Sync writes each fingerprint's two files to a side-by-side staging directory (dest_dir/.staging-<fingerprint>-<uuid>/) and runs load_bundle on the staged copy before touching the target. Only after load_bundle accepts the staged reconstruction does sync rmtree(dest_dir/<fingerprint>) and shutil.move(staging, target). A corrupt mirror row therefore cannot destroy a previously-good local bundle — the load-bundle gate is the safety boundary.
The replace itself is staged, not strictly atomic. Between rmtree(target) and move(staging, target) there is a brief window where the target is absent; a process crash inside that window leaves the bundle missing on disk (a re-sync recovers it). The load-bundle-failure case — the one the staged flow is designed to protect — is correctly atomic in the failure direction: load-bundle failure leaves the target untouched. Locked by test_sync_failure_preserves_existing_good_bundle.
BigQueryBundleStore.publish_rows upserts by (bundle_fingerprint, bundle_path). Re-publishing the same bundle replaces the prior rows rather than duplicating them — verified by test_republishing_same_bundle_does_not_accumulate_rows.
Important caveat: the DELETE + insert_rows_json upsert is not a single atomic transaction. If INSERT fails after DELETE (network, quota, schema drift), rows for the affected (fingerprint, bundle_path) pairs are missing from the table until the caller re-runs publish. The mirror is publish-side idempotent, so the recovery is to call publish_bundles_to_bq again — but operators should be aware that a transient INSERT failure leaves a recoverable, not silent, gap. A staging-table-plus-MERGE flow would close this gap and is deliberately deferred.
Cross-subdir duplicate fingerprints (two bundles claiming the same fingerprint) are caught before any DELETE runs via the duplicate_fingerprint publish-side check. BigQueryBundleStore.publish_rows also raises ValueError on duplicate (fingerprint, path) input pairs as defense in depth for direct callers of the store.
CI suite — tests/test_extractor_compilation_bq_bundle_mirror.py, 24 cases using an in-memory BundleStore substitute:
TestRoundTrip(2) — publish a local bundle, sync it back, verifyload_bundleaccepts the reconstruction. Plus a multi-bundle variant.TestAllowlist(3) — publish-side allowlist skips unlisted; sync-side allowlist skips unlisted; sync-side allowlist names a fingerprint with no rows →fingerprint_not_in_tablefailure.TestPathSafety(3) —../escape.py,/etc/passwd,..\windows-style-escape.pyall rejected withinvalid_bundle_path; no file written outsidedest_dir.TestMissingAndMalformedRows(5) — missing manifest row, malformed manifest content, unexpected extra file, wrong field type, duplicate rows.TestIdempotentRepublish(1) — two consecutive publishes of the same bundle leave exactly two rows in the store, not four.TestPublishFailures(4) — subdir without manifest; bundle that would failload_bundlepre-publish; missingbundle_root; two subdirs declaring the same fingerprint →duplicate_fingerprint, neither published.TestRoundTwoFindings(6) — manifest row withmodule_filenamecontaining a path separator →manifest_row_unreadable(noFileNotFoundError); existing good local bundle preserved across a corrupt re-sync (staging-then-validate);BigQueryBundleStore.publish_rowsraisesValueErroron duplicate input pairs without running DELETE or INSERT; tamperedbundle_fingerprint="../escape"rejected asmalformed_rowbefore any path is computed (no write outsidedest_dir); tampered manifestfingerprint="../escape"rejected at publish-side;BigQueryBundleStore.__init__raisesValueErroron malformedtable_id(backtick, semicolon, whitespace, wrong dot count, empty segment,--comment marker, trailing newline) so injection can't reach the SQL.
Live BQ suite — tests/test_extractor_compilation_bq_bundle_mirror_live.py, 1 case behind BQAA_RUN_LIVE_TESTS=1 + BQAA_RUN_LIVE_BQ_MIRROR_TESTS=1 + PROJECT_ID + DATASET_ID. Creates a temporary table, runs the publish+sync round-trip, asserts load_bundle accepts the reconstruction, deletes the table on the way out.
- GCS-backed signed-URL fetch for very large bundles. Bundles are tiny today (a few KB); a streaming path can land later if real bundles grow.
- Caching / TTL of synced bundles. Sync overwrites; the caller decides how often to sync.
- Garbage collection of stale fingerprints. The mirror's job is publish + fetch; lifecycle policy lives upstream.
- Multi-region replication. The mirror table is created in one BQ location.
extractor_compilation_bundle_loader.md—load_bundle/discover_bundles(C2.a). The mirror callsload_bundleas both a pre-publish gate and a post-sync gate, so the loader is the single source of truth for "is this bundle usable?"extractor_compilation_orchestrator_swap.md—OntologyGraphManager.from_bundles_root(C2.c.2). Once sync lands bundles on disk, this is the entry point that wires them into the runtime.extractor_compilation_runtime_registry.md—build_runtime_extractor_registry(C2.c.1). The registry adapter thatfrom_bundles_rootbuilds internally.