Skip to content

Commit d78a860

Browse files
authored
fix(materialize_window): empty-extraction sessions are not ok (#167)
Closes the silent-failure mode #166's live deploy surfaced. ## The bug When ``AI.GENERATE`` failed per-event (e.g., runtime SA missing ``roles/aiplatform.user``), the SDK swallowed the error and returned an empty graph for every session. The orchestrator's session loop then: * Called ``materialize_with_status`` on the empty graph (didn't raise — the materializer just had nothing to insert). * Marked the session as ``ok=True``. * Reported ``sessions_materialized == sessions_discovered``, ``ok=true``, and an EMPTY ``rows_materialized`` dict. Operators monitoring on ``jsonPayload.ok`` saw "all good" while the entity tables stayed empty. Hard-to-spot drift. ## The fix After ``materialize_with_status`` succeeds, sum the per-table row counts. If the total is zero, treat as a per-session failure (matching exception semantics — break the loop, advance checkpoint only to the prior successful session): * ``ok=False`` on the SessionResult. * ``error_code="empty_extraction"``. * ``error_detail`` documents the common causes: missing ``aiplatform.user`` IAM, transient AI.GENERATE rate limit, or no extractable ontology content in the session's events. A legitimately empty session (no MAKO decision points) gets flagged too — better a false-positive that prompts the operator to look than a silent zero. The empty-window heartbeat path (``sessions_discovered == 0``) is untouched: the empty-extraction guard is per-session and doesn't fire when no sessions were processed. ``ok=true`` remains the right shape for "the cron ran, nothing to do". ## Tests Three new cases in ``TestEmptyExtractionNotOk``: * ``test_all_sessions_zero_rows_reports_not_ok`` — every session extracts to empty; first marked failed, loop breaks, ``ok=false``, ``sessions_materialized=0``, ``sessions_failed=1``, error_code matches. * ``test_partial_extraction_partial_failure_conservative_checkpoint`` — session 1 succeeds with real rows, session 2 empty, session 3 never reached; checkpoint advances ONLY to session 1's timestamp; materializer called exactly twice (BQ-quota safe). * ``test_empty_window_remains_ok`` — zero sessions discovered still produces ``ok=true``; the per-session guard doesn't flip the heartbeat case. 71/71 focused tests pass (68 prior + 3 new). Full suite 2930 pass.
1 parent d830398 commit d78a860

3 files changed

Lines changed: 430 additions & 24 deletions

File tree

examples/migration_v5/periodic_materialization/README.md

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -397,23 +397,38 @@ ff1e956df8b8 2026-05-16 04:38:59 3 / 3 / 0 true
397397

398398
(Row 1 is the deploy script's `--smoke` execution, which ran
399399
BEFORE the verification added `roles/aiplatform.user` to the
400-
deploy. AI.GENERATE failed for every session there, but the
401-
orchestrator still reported `ok=true` with empty
402-
`rows_materialized` — see the known issue below.)
403-
404-
### Known issue surfaced by this verification
405-
406-
The orchestrator currently reports `sessions_materialized ==
407-
sessions_discovered` and `ok=true` even when every per-event
408-
`AI.GENERATE` call failed. The `rows_materialized` dict is
409-
empty in that case, but `sessions_materialized` doesn't
410-
reflect the underlying failure. Operators monitoring on
411-
`jsonPayload.ok` would miss the silent extraction failure.
412-
413-
Tracked for SDK follow-up — out of scope for this example PR.
414-
Workaround: alert on
415-
`jsonPayload.rows_materialized == {}` in Cloud Logging /
416-
Monitoring as a second-line check.
400+
deploy. AI.GENERATE failed for every session there. At the time
401+
of #166, the orchestrator reported `ok=true` with empty
402+
`rows_materialized` — the silent-failure mode that PR #167
403+
fixed. Today, the same situation would produce `ok=false` with
404+
`failures[0].error_code = "empty_extraction"`.)
405+
406+
### Failure-mode surface (post-#167)
407+
408+
The orchestrator distinguishes two zero-row session outcomes
409+
that look identical from `rows_materialized` alone:
410+
411+
* **`empty_extraction`** — extraction (AI.GENERATE or compiled
412+
bundle) returned an empty graph; no inserts attempted.
413+
Diagnose by checking the runtime SA's `roles/aiplatform.user`
414+
grant, AI.GENERATE quotas, or whether the session's events
415+
legitimately had any MAKO content.
416+
417+
* **`materialization_failed`** — extraction produced rows but
418+
every insert returned an error. The `failures[].error_detail`
419+
names the specific tables (e.g.,
420+
`DecisionExecution: rows_attempted=3, insert_status='insert_failed'`),
421+
and the aggregate `table_statuses` carries the per-table
422+
diagnostic at the top level of the report. Diagnose by
423+
checking the SA's dataset write perm on the graph dataset,
424+
schema drift the binding-validate pre-flight missed, or
425+
streaming-buffer pinning.
426+
427+
In both cases: `ok=false`, CLI exit 1, the cron run shows up
428+
as a failed execution in Cloud Monitoring. Alert directly on
429+
`jsonPayload.ok=false` plus `jsonPayload.failures[].error_code`
430+
for the failure-mode breakdown — no second-line
431+
`rows_materialized == {}` check needed.
417432

418433
## Not in scope here
419434

src/bigquery_agent_analytics/materialize_window.py

Lines changed: 94 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,81 @@ def run_materialize_window(
921921
"insert_status": ts.insert_status,
922922
"idempotent": ts.idempotent,
923923
}
924+
# Zero-rows guard. ``row_counts`` only includes tables
925+
# whose insert SUCCEEDED (per ``ontology_materializer.py``
926+
# — see the ``inserted_tables`` filter at the build site).
927+
# So ``total_rows == 0`` collapses two distinct failure
928+
# modes that operators must triage differently:
929+
#
930+
# * **empty_extraction** — the graph itself was empty.
931+
# Extraction (AI.GENERATE or compiled bundle) returned
932+
# no rows; nothing to insert. ``table_statuses`` will
933+
# be empty or have ``rows_attempted == 0`` everywhere.
934+
# Common cause: missing ``roles/aiplatform.user`` on
935+
# the runtime SA (surfaced by #166 live deploy), or
936+
# the session's events legitimately had no MAKO
937+
# content.
938+
#
939+
# * **materialization_failed** — the graph HAD rows but
940+
# every insert returned an error. ``table_statuses``
941+
# will have entries with ``rows_attempted > 0`` and
942+
# ``insert_status == "insert_failed"``. Common cause:
943+
# dataset write-perm regression, streaming-buffer
944+
# pinning that hits a delete-then-insert idempotency
945+
# boundary, or a schema mismatch the binding-validate
946+
# pre-flight didn't catch.
947+
#
948+
# Operators chasing the wrong failure mode is the failure
949+
# mode this PR was meant to prevent. Classify here.
950+
total_rows = sum(int(v) for v in (mat.row_counts or {}).values())
951+
if total_rows == 0:
952+
any_attempted = any(
953+
int(ts.get("rows_attempted", 0)) > 0
954+
for ts in table_statuses_dict.values()
955+
)
956+
any_insert_failed = any(
957+
ts.get("insert_status") == "insert_failed"
958+
for ts in table_statuses_dict.values()
959+
)
960+
if any_attempted and any_insert_failed:
961+
error_code = "materialization_failed"
962+
error_detail = (
963+
"session extracted rows but every insert failed: "
964+
+ "; ".join(
965+
f"{name}: rows_attempted={ts.get('rows_attempted')}, "
966+
f"insert_status={ts.get('insert_status')!r}, "
967+
f"cleanup_status={ts.get('cleanup_status')!r}"
968+
for name, ts in sorted(table_statuses_dict.items())
969+
)
970+
+ ". Common causes: dataset write-permission "
971+
"regression on the runtime SA, schema mismatch the "
972+
"binding-validate pre-flight didn't catch, or "
973+
"streaming-buffer pinning blocking a delete-then-"
974+
"insert cycle."
975+
)
976+
else:
977+
error_code = "empty_extraction"
978+
error_detail = (
979+
"session materialized zero rows across every entity "
980+
"table and no inserts were attempted; usually means "
981+
"extraction (AI.GENERATE or compiled bundle) returned "
982+
"an empty graph. Common causes: missing "
983+
"roles/aiplatform.user on the runtime SA, transient "
984+
"AI.GENERATE rate limit, or the session's events did "
985+
"not contain any extractable ontology content."
986+
)
987+
session_results.append(
988+
SessionResult(
989+
session_id=session.session_id,
990+
ok=False,
991+
completion_timestamp=session.completion_timestamp,
992+
rows_materialized=dict(mat.row_counts),
993+
table_statuses=table_statuses_dict,
994+
error_code=error_code,
995+
error_detail=error_detail,
996+
)
997+
)
998+
break
924999
session_results.append(
9251000
SessionResult(
9261001
session_id=session.session_id,
@@ -1266,18 +1341,30 @@ def _build_result(
12661341
# clean ``deleted``. Operators rely on the report as the
12671342
# "did anything go wrong" signal — any delete failure must
12681343
# bubble up.
1344+
# Aggregate ``table_statuses`` from EVERY session — including
1345+
# failed ones. The status surface ("did the delete succeed?
1346+
# did the insert succeed?") is operator-visible regardless of
1347+
# session-level success. Dropping failed sessions' statuses
1348+
# was the gap #167's reviewer flagged: when a session fails
1349+
# with ``materialization_failed``, the per-table diagnostic
1350+
# (rows_attempted=N, insert_status=insert_failed) belongs in
1351+
# the report so operators don't have to dig into log payloads
1352+
# to see which table broke.
1353+
#
1354+
# ``rows_materialized`` still aggregates only successful
1355+
# sessions — that's the "rows that actually landed" view.
12691356
table_statuses_agg: dict[str, dict[str, Any]] = {}
12701357
for r in session_results:
12711358
if r.ok:
12721359
for table, n in r.rows_materialized.items():
12731360
rows_materialized[table] = rows_materialized.get(table, 0) + n
1274-
for table, ts in r.table_statuses.items():
1275-
if table in table_statuses_agg:
1276-
table_statuses_agg[table] = _merge_table_status(
1277-
table_statuses_agg[table], ts
1278-
)
1279-
else:
1280-
table_statuses_agg[table] = dict(ts)
1361+
for table, ts in r.table_statuses.items():
1362+
if table in table_statuses_agg:
1363+
table_statuses_agg[table] = _merge_table_status(
1364+
table_statuses_agg[table], ts
1365+
)
1366+
else:
1367+
table_statuses_agg[table] = dict(ts)
12811368

12821369
failures = [
12831370
{

0 commit comments

Comments
 (0)