1919"""
2020
2121import base64
22+ import io
2223import json
2324import os
2425import random
2829import urllib .parse
2930import urllib .request
3031from datetime import datetime , timedelta , timezone
32+ from typing import Any
33+
34+ import fastavro
3135
3236from materialize .biglake import (
3337 biglake_request ,
@@ -69,6 +73,55 @@ def _require_env(name: str) -> str:
6973 return value
7074
7175
76+ def _read_gcs_avro (gs_url : str , token : str , project : str ) -> list [Any ]:
77+ """Download a ``gs://`` Avro file (Iceberg manifest list or manifest) and parse it.
78+
79+ We hit the GCS JSON API's media endpoint directly with the same bearer token and
80+ `x-goog-user-project` header the catalog uses, so no extra GCS client is needed.
81+ """
82+ if not gs_url .startswith ("gs://" ):
83+ raise ValueError (f"expected a gs:// URL, got { gs_url } " )
84+ bucket , _ , obj = gs_url [len ("gs://" ) :].partition ("/" )
85+ media_url = (
86+ f"https://storage.googleapis.com/storage/v1/b/{ urllib .parse .quote (bucket , safe = '' )} "
87+ f"/o/{ urllib .parse .quote (obj , safe = '' )} ?alt=media"
88+ )
89+ req = urllib .request .Request (
90+ media_url ,
91+ headers = {
92+ "Authorization" : f"Bearer { token } " ,
93+ "x-goog-user-project" : project ,
94+ },
95+ )
96+ with urllib .request .urlopen (req ) as resp :
97+ return list (fastavro .reader (io .BytesIO (resp .read ())))
98+
99+
100+ def _live_row_count (snapshot : dict , token : str , project : str ) -> int :
101+ """Count live rows in a snapshot by reading its manifest list.
102+
103+ The snapshot `summary.total-records` is unreliable here: the sink's 1s commit
104+ interval emits a fresh (empty) snapshot every interval, and those empty commits
105+ report `total-records: 0` even though the data files from the original commit are
106+ still live. The manifest list always references the live data manifests with their
107+ true per-file row counts, so summing those is what a reader actually sees in the
108+ table, regardless of which snapshot first added the data.
109+
110+ NOTE: this counts live data-file rows (added + existing - deleted). It does not
111+ subtract equality/position deletes, so it is only exact for insert-only tables.
112+ The e2e test inserts three rows and never updates them.
113+ """
114+ rows = 0
115+ for manifest in _read_gcs_avro (snapshot ["manifest-list" ], token , project ):
116+ # content 0 = data manifest, 1 = delete manifest. We only count data rows.
117+ if manifest .get ("content" , 0 ) != 0 :
118+ continue
119+ rows += manifest .get ("added_rows_count" ) or 0
120+ rows += manifest .get ("existing_rows_count" ) or 0
121+ rows -= manifest .get ("deleted_rows_count" ) or 0
122+ return rows
123+
124+
72125def _verify_sink_committed (
73126 token : str ,
74127 project : str ,
@@ -77,10 +130,11 @@ def _verify_sink_committed(
77130 table : str ,
78131 expected_rows : int ,
79132) -> None :
80- """Poll BigLake until the table's current snapshot has expected_rows .
133+ """Poll BigLake until the table holds expected_rows of live data .
81134
82135 Iceberg sinks commit asynchronously. Iceberg uses -1 to mean "no snapshot yet";
83- we poll until the snapshot lands or we time out.
136+ we poll until data lands or we time out. Row count comes from the current
137+ snapshot's manifest list (see `_live_row_count`), not the snapshot summary.
84138
85139 Unlike the AWS test, we verify only the row count, not the row values. DuckDB
86140 is how the AWS test reads the iceberg table back, but DuckDB's iceberg
@@ -127,10 +181,9 @@ def _verify_sink_committed(
127181 time .sleep (2 )
128182 continue
129183
130- # Iceberg summary values are strings per the REST spec.
131- total_records = int (snapshot ["summary" ]["total-records" ])
132- if total_records != expected_rows :
133- last_state = f"snapshot has { total_records } rows, want { expected_rows } "
184+ live_rows = _live_row_count (snapshot , token , project )
185+ if live_rows != expected_rows :
186+ last_state = f"table has { live_rows } live rows, want { expected_rows } "
134187 time .sleep (2 )
135188 continue
136189
@@ -237,8 +290,8 @@ def workflow_default(c: Composition) -> None:
237290 today = datetime .now (timezone .utc ).strftime (NAMESPACE_DATE_FORMAT )
238291 namespace = f"{ NAMESPACE_PREFIX } _{ today } _{ seed :08x} "
239292 table = "demo_table"
240- # The .td inserts these three rows; verification asserts the snapshot
241- # summary matches.
293+ # The .td inserts these three rows; verification asserts the table's live
294+ # row count matches.
242295 expected_rows = 3
243296
244297 materialized = Materialized ()
0 commit comments