Skip to content

Commit e386173

Browse files
committed
scripts+migrate: populate denormalized commit_timestamp on the write path
Both query_measurements writers now stamp the denormalized commit_timestamp (migration 006, the read path's latest-per-series sort key) so new ingests never depend on the post-deploy re-backfill: - post-ingest.py --postgres resolves it in the upsert itself via a scalar subquery against the commits row the same transaction upserted first, on both the INSERT and ON CONFLICT DO UPDATE paths. - The migrate Rust loader runs the 006 backfill UPDATE after the per-table COPYs, inside the same all-or-nothing transaction (the v3 DuckDB source has no such column, so it cannot ride the COPY's 1:1 column mapping). The loader's schema contract and the e2e container init now include migration 006. Tests: a new pytest pins stamping on both upsert paths; the Postgres e2e rehearsal asserts zero NULL and zero drifted commit_timestamp rows after a load. Signed-off-by: "Connor Tsui" <connor@spiraldb.com>
1 parent 15b778b commit e386173

4 files changed

Lines changed: 89 additions & 14 deletions

File tree

benchmarks-website/migrate/src/postgres.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,11 @@ impl std::fmt::Display for LoadSummary {
223223
/// TLS connection (the RDS CA for the prod load); when `None` the connection is
224224
/// plaintext (`NoTls`, the local rehearsal).
225225
///
226-
/// The target schema (`migrations/001_initial_schema.sql`) must already be
227-
/// applied: `load` only `COPY`s into the existing tables and never runs DDL.
226+
/// The target schema must already be applied -- `migrations/001_initial_schema.sql`
227+
/// plus `006_read_path_perf.sql`, whose denormalized
228+
/// `query_measurements.commit_timestamp` column the post-COPY denormalization
229+
/// UPDATE below writes: `load` only `COPY`s into the existing tables and never
230+
/// runs DDL.
228231
pub fn load(duckdb_path: &Path, dsn: &str, ca_cert: Option<&Path>) -> Result<LoadSummary> {
229232
let config = duckdb::Config::default()
230233
.access_mode(duckdb::AccessMode::ReadOnly)
@@ -250,6 +253,23 @@ pub fn load(duckdb_path: &Path, dsn: &str, ca_cert: Option<&Path>) -> Result<Loa
250253
per_table.push((spec.name, rows));
251254
}
252255

256+
// Populate the denormalized `query_measurements.commit_timestamp` (migration
257+
// 006, the read path's latest-per-series sort key). The v3 DuckDB source has
258+
// no such column, so it cannot ride the COPY's 1:1 column mapping; it is
259+
// derived here from the already-loaded `commits` dim (`TABLE_SPECS` loads
260+
// `commits` first), inside the same all-or-nothing transaction.
261+
let stamped = tx
262+
.execute(
263+
"UPDATE query_measurements q
264+
SET commit_timestamp = c.timestamp
265+
FROM commits c
266+
WHERE c.commit_sha = q.commit_sha
267+
AND q.commit_timestamp IS NULL",
268+
&[],
269+
)
270+
.context("denormalizing commit_timestamp onto query_measurements")?;
271+
info!(rows = stamped, "denormalized commit_timestamp");
272+
253273
tx.commit().context("committing the load transaction")?;
254274
Ok(LoadSummary { per_table })
255275
}

benchmarks-website/migrate/tests/postgres_e2e.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,15 @@ use vortex_bench_migrate::verify::run_postgres_value_verify;
3131
use vortex_bench_server::family;
3232
use vortex_bench_server::schema::COMMITS_DDL;
3333

34-
/// The authoritative Postgres schema, applied to the container at init.
35-
const SCHEMA_SQL: &str = include_str!("../../../migrations/001_initial_schema.sql");
34+
/// The authoritative Postgres schema, applied to the container at init: the base
35+
/// DDL plus the 006 read-path migration, whose denormalized
36+
/// `query_measurements.commit_timestamp` column the loader's post-COPY
37+
/// denormalization UPDATE requires (matching the prod target, where every
38+
/// migration is applied before a load).
39+
const SCHEMA_SQL: &str = concat!(
40+
include_str!("../../../migrations/001_initial_schema.sql"),
41+
include_str!("../../../migrations/006_read_path_perf.sql"),
42+
);
3643

3744
/// Per-table row counts the fixture loads. Drives the count assertions.
3845
const FIXTURE_COUNTS: &[(&str, u64)] = &[
@@ -161,6 +168,25 @@ fn rehearsal_load_then_verify_is_clean() -> Result<()> {
161168
"target count for {table}"
162169
);
163170
}
171+
172+
// The loader denormalized `commit_timestamp` onto every `query_measurements`
173+
// row (migration 006, the read path's latest-per-series sort key): no NULLs
174+
// remain and each value equals the joined `commits.timestamp`.
175+
let unstamped: i64 = client
176+
.query_one(
177+
"SELECT count(*) FROM query_measurements WHERE commit_timestamp IS NULL",
178+
&[],
179+
)?
180+
.get(0);
181+
assert_eq!(unstamped, 0, "rows missing denormalized commit_timestamp");
182+
let mismatched: i64 = client
183+
.query_one(
184+
"SELECT count(*) FROM query_measurements q JOIN commits c USING (commit_sha)
185+
WHERE q.commit_timestamp <> c.timestamp",
186+
&[],
187+
)?
188+
.get(0);
189+
assert_eq!(mismatched, 0, "denormalized commit_timestamp drifted");
164190
Ok(())
165191
}
166192

scripts/post-ingest.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -670,17 +670,19 @@ def _insert_query_measurement(conn, mid_mod, r: dict) -> bool:
670670
query_idx, storage, engine, format,
671671
value_ns, all_runtimes_ns,
672672
peak_physical, peak_virtual, physical_delta, virtual_delta,
673-
env_triple
674-
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::bigint[], %s, %s, %s, %s, %s)
673+
env_triple, commit_timestamp
674+
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::bigint[], %s, %s, %s, %s, %s,
675+
(SELECT timestamp FROM commits WHERE commit_sha = %s))
675676
ON CONFLICT (measurement_id) DO UPDATE SET
676-
commit_sha = excluded.commit_sha,
677-
value_ns = excluded.value_ns,
678-
all_runtimes_ns = excluded.all_runtimes_ns,
679-
peak_physical = excluded.peak_physical,
680-
peak_virtual = excluded.peak_virtual,
681-
physical_delta = excluded.physical_delta,
682-
virtual_delta = excluded.virtual_delta,
683-
env_triple = excluded.env_triple
677+
commit_sha = excluded.commit_sha,
678+
value_ns = excluded.value_ns,
679+
all_runtimes_ns = excluded.all_runtimes_ns,
680+
peak_physical = excluded.peak_physical,
681+
peak_virtual = excluded.peak_virtual,
682+
physical_delta = excluded.physical_delta,
683+
virtual_delta = excluded.virtual_delta,
684+
env_triple = excluded.env_triple,
685+
commit_timestamp = excluded.commit_timestamp
684686
RETURNING (xmax = 0) AS inserted
685687
""",
686688
(
@@ -700,6 +702,10 @@ def _insert_query_measurement(conn, mid_mod, r: dict) -> bool:
700702
r.get("physical_delta"),
701703
r.get("virtual_delta"),
702704
r.get("env_triple"),
705+
# The denormalized `commit_timestamp` (migration 006) is resolved from the
706+
# `commits` row this same transaction upserted first, so the read path's
707+
# latest-per-series summary never sees a NULL from this writer.
708+
r["commit_sha"],
703709
),
704710
)
705711

scripts/test_post_ingest_postgres.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,29 @@ def test_ingest_inserts_then_updates(schema_conn: psycopg.Connection) -> None:
303303
assert _count(schema_conn, table) == 1, f"{kind} -> {table} row count drifted"
304304

305305

306+
def test_query_measurement_insert_populates_commit_timestamp(schema_conn: psycopg.Connection) -> None:
307+
"""The denormalized `commit_timestamp` (migration 006, the read path's latest-per-series sort
308+
key) is stamped from the envelope's `commits` row on BOTH upsert paths, so rows written by this
309+
writer never depend on the post-deploy re-backfill."""
310+
commit = _sample_commit()
311+
records = [r for r in _sample_records() if r["kind"] == "query_measurement"]
312+
313+
post_ingest.ingest_postgres(schema_conn, commit, records)
314+
row = schema_conn.execute(
315+
"SELECT q.commit_timestamp, c.timestamp FROM query_measurements q"
316+
" JOIN commits c USING (commit_sha)"
317+
).fetchone()
318+
assert row[0] is not None
319+
assert row[0] == row[1]
320+
321+
# The update path re-stamps as well: scrub the column, re-ingest the same envelope (an
322+
# ON CONFLICT DO UPDATE), and the timestamp must come back.
323+
schema_conn.execute("UPDATE query_measurements SET commit_timestamp = NULL")
324+
post_ingest.ingest_postgres(schema_conn, commit, records)
325+
restamped = schema_conn.execute("SELECT commit_timestamp FROM query_measurements").fetchone()[0]
326+
assert restamped == row[1]
327+
328+
306329
# Per-kind mutation of every value/side-counter/env column each table's ON
307330
# CONFLICT DO UPDATE SET list owns (dim columns deliberately excluded). Mirrors
308331
# the SET lists in benchmarks-website/server/src/ingest.rs; a stale/incorrect SET

0 commit comments

Comments
 (0)