Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ futures = "0.3.32"
futures-core = "0.3.32"
futures-task = "0.3.32"
futures-util = "0.3.32"
gcp_auth = "0.12.6"
gcp_auth = { version = "0.12.6", default-features = false, features = ["aws-lc-rs"] }
glob = "0.3.3"
globset = "0.4.18"
governor = "0.10.1"
Expand Down
1 change: 0 additions & 1 deletion ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,6 @@ steps:
plugins:
- ./ci/plugins/mzcompose:
composition: gcp
skip: "https://linear.app/materializeinc/issue/SS-279"

- group: "Platform checks"
key: platform-checks
Expand Down
69 changes: 61 additions & 8 deletions test/gcp/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""

import base64
import io
import json
import os
import random
Expand All @@ -28,6 +29,9 @@
import urllib.parse
import urllib.request
from datetime import datetime, timedelta, timezone
from typing import Any

import fastavro

from materialize.biglake import (
biglake_request,
Expand Down Expand Up @@ -69,6 +73,55 @@ def _require_env(name: str) -> str:
return value


def _read_gcs_avro(gs_url: str, token: str, project: str) -> list[Any]:
"""Download a ``gs://`` Avro file (Iceberg manifest list or manifest) and parse it.

We hit the GCS JSON API's media endpoint directly with the same bearer token and
`x-goog-user-project` header the catalog uses, so no extra GCS client is needed.
"""
if not gs_url.startswith("gs://"):
raise ValueError(f"expected a gs:// URL, got {gs_url}")
bucket, _, obj = gs_url[len("gs://") :].partition("/")
media_url = (
f"https://storage.googleapis.com/storage/v1/b/{urllib.parse.quote(bucket, safe='')}"
f"/o/{urllib.parse.quote(obj, safe='')}?alt=media"
)
req = urllib.request.Request(
media_url,
headers={
"Authorization": f"Bearer {token}",
"x-goog-user-project": project,
},
)
with urllib.request.urlopen(req) as resp:
return list(fastavro.reader(io.BytesIO(resp.read())))


def _live_row_count(snapshot: dict, token: str, project: str) -> int:
"""Count live rows in a snapshot by reading its manifest list.

The snapshot `summary.total-records` is unreliable here: the sink's 1s commit
interval emits a fresh (empty) snapshot every interval, and those empty commits
report `total-records: 0` even though the data files from the original commit are
still live. The manifest list always references the live data manifests with their
true per-file row counts, so summing those is what a reader actually sees in the
table, regardless of which snapshot first added the data.

NOTE: this counts live data-file rows (added + existing - deleted). It does not
subtract equality/position deletes, so it is only exact for insert-only tables.
The e2e test inserts three rows and never updates them.
"""
rows = 0
for manifest in _read_gcs_avro(snapshot["manifest-list"], token, project):
# content 0 = data manifest, 1 = delete manifest. We only count data rows.
if manifest.get("content", 0) != 0:
continue
rows += manifest.get("added_rows_count") or 0
rows += manifest.get("existing_rows_count") or 0
rows -= manifest.get("deleted_rows_count") or 0
return rows


def _verify_sink_committed(
token: str,
project: str,
Expand All @@ -77,10 +130,11 @@ def _verify_sink_committed(
table: str,
expected_rows: int,
) -> None:
"""Poll BigLake until the table's current snapshot has expected_rows.
"""Poll BigLake until the table holds expected_rows of live data.

Iceberg sinks commit asynchronously. Iceberg uses -1 to mean "no snapshot yet";
we poll until the snapshot lands or we time out.
we poll until data lands or we time out. Row count comes from the current
snapshot's manifest list (see `_live_row_count`), not the snapshot summary.

Unlike the AWS test, we verify only the row count, not the row values. DuckDB
is how the AWS test reads the iceberg table back, but DuckDB's iceberg
Expand Down Expand Up @@ -127,10 +181,9 @@ def _verify_sink_committed(
time.sleep(2)
continue

# Iceberg summary values are strings per the REST spec.
total_records = int(snapshot["summary"]["total-records"])
if total_records != expected_rows:
last_state = f"snapshot has {total_records} rows, want {expected_rows}"
live_rows = _live_row_count(snapshot, token, project)
if live_rows != expected_rows:
last_state = f"table has {live_rows} live rows, want {expected_rows}"
time.sleep(2)
continue

Expand Down Expand Up @@ -237,8 +290,8 @@ def workflow_default(c: Composition) -> None:
today = datetime.now(timezone.utc).strftime(NAMESPACE_DATE_FORMAT)
namespace = f"{NAMESPACE_PREFIX}_{today}_{seed:08x}"
table = "demo_table"
# The .td inserts these three rows; verification asserts the snapshot
# summary matches.
# The .td inserts these three rows; verification asserts the table's live
# row count matches.
expected_rows = 3

materialized = Materialized()
Expand Down
Loading