Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/catalog/src/durable/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use mz_ore::metric;
use mz_ore::metrics::{IntCounter, MetricsRegistry};
use mz_ore::stats::histogram_seconds_buckets;
use prometheus::{Counter, Histogram, IntGaugeVec};
use prometheus::{Counter, Histogram, IntGauge, IntGaugeVec};

#[derive(Debug, Clone)]
pub struct Metrics {
Expand All @@ -25,6 +25,8 @@ pub struct Metrics {
pub sync_latency_seconds: Counter,
pub collection_entries: IntGaugeVec,
pub allocate_id_seconds: Histogram,
pub snapshot_consolidations: IntCounter,
pub snapshot_max_entries: IntGauge,
}

impl Metrics {
Expand Down Expand Up @@ -69,6 +71,15 @@ impl Metrics {
help: "The time it takes to allocate IDs in the durable catalog.",
buckets: histogram_seconds_buckets(0.001, 32.0),
)),
snapshot_consolidations: registry.register(metric!(
name: "mz_catalog_snapshot_consolidations",
help: "Count of snapshot consolidation passes.",
)),
snapshot_max_entries: registry.register(metric!(
name: "mz_catalog_snapshot_max_entries",
help: "High-water mark of entries in the unconsolidated in-memory \
snapshot since process start.",
)),
}
}
}
63 changes: 56 additions & 7 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,20 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {

let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();

// Track the snapshot size so we can consolidate when it doubles. This
// bounds memory usage during catch-up: without it, the snapshot grows
// with every retract+insert pair across timestamps. Ideally, we would
// consolidate once after all timestamps, not per-timestamp, but heuristically
// snapshot based on doubling behavior to bound memory
// This behavior is safe because nothing in the loop reads from self.snapshot:
// the update_applier maintains its own internal state (configs, settings,
// fence tokens) independently. Consolidating per-timestamp was O(K * N log N);
// consolidating once is O(N log N).

// Use a minimum threshold to avoid consolidating on every Progress
// event when the snapshot is small or empty (since 0 * 2 = 0).
let mut size_at_last_consolidation = max(self.snapshot.len(), 8);

while self.upper < target_upper {
let listen_events = self.listen.fetch_next().await;
for listen_event in listen_events {
Expand All @@ -586,6 +600,12 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
);
self.apply_updates(updates)?;
}
// Consolidate when the snapshot has doubled in size to
// bound memory.
if self.snapshot.len() >= size_at_last_consolidation * 2 {
self.consolidate();
size_at_last_consolidation = self.snapshot.len();
}
}
ListenEvent::Updates(batch_updates) => {
for update in batch_updates {
Expand All @@ -597,11 +617,35 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
}
}
assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
// Always consolidate at the end to ensure the snapshot is clean.
self.consolidate();
Ok(())
}

/// Apply a batch of updates and then consolidate the snapshot. This is the
/// typical entry point for callers that apply updates in a single batch.
///
/// For hot loops that apply updates across many timestamps (e.g., `sync_inner`),
/// use `apply_updates` directly and call `consolidate()` periodically (e.g.,
/// on snapshot doubling) to bound memory while staying amortized O(N log N).
pub(crate) fn apply_updates_and_consolidate(
&mut self,
updates: impl IntoIterator<Item = StateUpdate<T>>,
) -> Result<(), FenceError> {
self.apply_updates(updates)?;
self.consolidate();
Ok(())
}

/// Apply a batch of updates to the catalog state without consolidating.
///
/// Does NOT consolidate the snapshot afterward. If you are calling this once,
/// prefer `apply_updates_and_consolidate`. This method exists for loops that
/// call it many times — consolidating per call would be O(K * N log N) instead
/// of O(N log N). Callers should consolidate periodically (e.g., on snapshot
/// doubling) to bound memory.
#[mz_ore::instrument(level = "debug")]
pub(crate) fn apply_updates(
fn apply_updates(
&mut self,
updates: impl IntoIterator<Item = StateUpdate<T>>,
) -> Result<(), FenceError> {
Expand Down Expand Up @@ -639,18 +683,23 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
}
}

// Track the high-water mark of the unconsolidated snapshot size.
let len = i64::try_from(self.snapshot.len()).unwrap_or(i64::MAX);
if len > self.metrics.snapshot_max_entries.get() {
self.metrics.snapshot_max_entries.set(len);
}

errors.sort();
if let Some(err) = errors.into_iter().next() {
return Err(err);
}

self.consolidate();

Ok(())
}

#[mz_ore::instrument]
pub(crate) fn consolidate(&mut self) {
self.metrics.snapshot_consolidations.inc();
soft_assert_no_log!(
self.snapshot
.windows(2)
Expand Down Expand Up @@ -1042,7 +1091,7 @@ impl UnopenedPersistCatalogState {
let updates = snapshot
.into_iter()
.map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
handle.apply_updates(updates)?;
handle.apply_updates_and_consolidate(updates)?;
info!(
"startup: envd serve: catalog init: apply updates complete in {:?}",
apply_start.elapsed()
Expand Down Expand Up @@ -1238,7 +1287,7 @@ impl UnopenedPersistCatalogState {
let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
StateUpdate { kind, ts, diff }
});
catalog.apply_updates(updates)?;
catalog.apply_updates_and_consolidate(updates)?;

let catalog_content_version = catalog.catalog_content_version.to_string();
let txn = if is_initialized {
Expand Down Expand Up @@ -1284,7 +1333,7 @@ impl UnopenedPersistCatalogState {
let (txn_batch, _) = txn.into_parts();
// The upper here doesn't matter because we are only applying the updates in memory.
let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
catalog.apply_updates(updates)?;
catalog.apply_updates_and_consolidate(updates)?;
} else {
txn.commit_internal(commit_ts).await?;
}
Expand Down Expand Up @@ -1787,7 +1836,7 @@ impl DurableCatalogState for PersistCatalogState {
ts: commit_ts,
diff,
});
catalog.apply_updates(updates)?;
catalog.apply_updates_and_consolidate(updates)?;
catalog.upper = commit_ts.step_forward();
catalog.upper
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/durable/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ async fn run_versioned_upgrade<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateK
.into_iter()
.map(|(kind, diff)| StateUpdate { kind, ts, diff });
commit_ts = commit_ts.step_forward();
unopened_catalog_state.apply_updates(updates)?;
unopened_catalog_state.apply_updates_and_consolidate(updates)?;
}

// 5. Consolidate snapshot to remove old versions.
Expand Down
191 changes: 189 additions & 2 deletions src/catalog/tests/read-write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@
#![recursion_limit = "256"]

use std::collections::BTreeMap;
use std::sync::Arc;

use insta::assert_debug_snapshot;
use itertools::Itertools;
use mz_audit_log::{EventDetails, EventType, EventV1, IdNameV1, VersionedEvent};
use mz_catalog::durable::objects::serialization::proto;
use mz_catalog::durable::objects::{DurableType, IdAlloc};
use mz_catalog::durable::{
CatalogError, DurableCatalogError, FenceError, Item, TestCatalogStateBuilder,
USER_ITEM_ALLOC_KEY, test_bootstrap_args,
CatalogError, Database, DurableCatalogError, FenceError, Item, Metrics,
TestCatalogStateBuilder, USER_ITEM_ALLOC_KEY, test_bootstrap_args,
};
use mz_ore::assert_ok;
use mz_ore::collections::HashSet;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::SYSTEM_TIME;
use mz_persist_client::PersistClient;
use mz_proto::RustType;
Expand Down Expand Up @@ -553,3 +555,188 @@ async fn test_persist_ddl_detection_with_batch_allocated_ids() {

Box::new(state).expire().await;
}

/// Regression test for incident-970: quadratic consolidation during catalog sync.
///
/// When a reader syncs through K timestamps, apply_updates() was calling
/// consolidate() on the entire snapshot for each timestamp, resulting in
/// O(K * N log N) work instead of O(N log N). This test verifies that syncing
/// through many timestamps only consolidates the snapshot a constant number of
/// times, not once per timestamp.
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)]
async fn test_persist_sync_consolidation_not_quadratic() {
let persist_client = PersistClient::new_for_tests().await;
let metrics = Arc::new(Metrics::new(&MetricsRegistry::new()));
let state_builder =
TestCatalogStateBuilder::new(persist_client).with_default_deploy_generation();
// Share metrics between writer and reader so we can observe consolidation counts.
let state_builder = state_builder.with_metrics(Arc::clone(&metrics));

// Open a writer catalog.
let mut writer = state_builder
.clone()
.unwrap_build()
.await
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.unwrap()
.0;
let _ = writer.sync_to_current_updates().await.unwrap();

// Open a read-only catalog, caught up to the current upper.
let mut reader = state_builder
.clone()
.unwrap_build()
.await
.open_read_only(&test_bootstrap_args())
.await
.unwrap();
let _ = reader.sync_to_current_updates().await.unwrap();

// Writer creates many databases, each in its own transaction at a distinct
// timestamp. This mirrors the incident scenario where DDL happened across
// many timestamps while a read-only envd was restarting.
let num_timestamps: u64 = 100;
for i in 0..num_timestamps {
let mut txn = writer.transaction().await.unwrap();
txn.insert_user_database(
&format!("db_{i}"),
RoleId::User(1),
Vec::new(),
&HashSet::new(),
)
.unwrap();
let _ = txn.get_and_commit_op_updates();
let commit_ts = txn.upper();
txn.commit(commit_ts).await.unwrap();
}

// Record the consolidation counter before the reader syncs.
let consolidations_before = metrics.snapshot_consolidations.get();

// Reader syncs through all timestamps. With the quadratic bug, this would
// call consolidate() once per timestamp (num_timestamps times). With the
// fix, it should consolidate only once after processing all timestamps.
let updates = reader.sync_to_current_updates().await.unwrap();
let consolidations_after = metrics.snapshot_consolidations.get();
let consolidations_during_sync = consolidations_after - consolidations_before;

// Verify correctness: reader received updates and can see all databases.
assert!(
!updates.is_empty(),
"reader should have received updates from writer"
);
let snapshot = reader.snapshot().await.unwrap();
for i in 0..num_timestamps {
let db_name = format!("db_{i}");
let found = snapshot.databases.values().any(|db| db.name == db_name);
assert!(found, "database {db_name} not found in reader snapshot");
}

// The key assertion: consolidation should happen O(log N) times during
// the sync (from the doubling strategy), NOT once per timestamp (which
// would be num_timestamps = 100). We allow a generous bound here.
assert!(
consolidations_during_sync < 10,
"sync through {num_timestamps} timestamps triggered {consolidations_during_sync} \
snapshot consolidations, suggesting quadratic behavior (expected < 10)"
);

Box::new(writer).expire().await;
Box::new(reader).expire().await;
}

/// Verify that the reader's snapshot stays bounded during sync catch-up, even
/// when the writer churns the same object many times across timestamps. Without
/// the doubling consolidation in `sync_inner`, the snapshot would grow with
/// every retract+insert pair; with it, the snapshot stays within ~2x the live
/// catalog size.
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)]
async fn test_persist_sync_snapshot_stays_bounded_under_churn() {
let persist_client = PersistClient::new_for_tests().await;
let metrics = Arc::new(Metrics::new(&MetricsRegistry::new()));
let state_builder = TestCatalogStateBuilder::new(persist_client)
.with_default_deploy_generation()
.with_metrics(Arc::clone(&metrics));

// Open writer, create one database to churn.
let mut writer = state_builder
.clone()
.unwrap_build()
.await
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.unwrap()
.0;
let _ = writer.sync_to_current_updates().await.unwrap();

let mut txn = writer.transaction().await.unwrap();
let (db_id, db_oid) = txn
.insert_user_database("churn_db", RoleId::User(1), Vec::new(), &HashSet::new())
.unwrap();
let _ = txn.get_and_commit_op_updates();
let commit_ts = txn.upper();
txn.commit(commit_ts).await.unwrap();

// Open reader, sync to current state.
let mut reader = state_builder
.unwrap_build()
.await
.open_read_only(&test_bootstrap_args())
.await
.unwrap();
let _ = reader.sync_to_current_updates().await.unwrap();
let peak_before = metrics.snapshot_max_entries.get();

// Rename the same database 200 times, each in its own transaction.
let num_renames: u64 = 200;
let mut db = Database {
id: db_id,
oid: db_oid,
name: "churn_db".to_string(),
owner_id: RoleId::User(1),
privileges: Vec::new(),
};
for i in 0..num_renames {
let mut txn = writer.transaction().await.unwrap();
db.name = format!("churn_db_{i}");
txn.update_database(db.id, db.clone()).unwrap();
let _ = txn.get_and_commit_op_updates();
let commit_ts = txn.upper();
txn.commit(commit_ts).await.unwrap();
}

// Reader syncs through all 200 renames.
let _ = reader.sync_to_current_updates().await.unwrap();

// Verify correctness: only one database, with the final name.
let snapshot = reader.snapshot().await.unwrap();
let churn_dbs: Vec<_> = snapshot
.databases
.values()
.filter(|d| d.name.starts_with("churn_db"))
.collect();
assert_eq!(churn_dbs.len(), 1, "{churn_dbs:#?}");
assert_eq!(churn_dbs[0].name, format!("churn_db_{}", num_renames - 1));

// The key assertion: the snapshot high-water mark should stay bounded,
// not grow proportionally to num_renames. The doubling consolidation
// keeps it within ~2x the live catalog size.
let peak_after = metrics.snapshot_max_entries.get();
let peak_delta = peak_after - peak_before;
// With doubling consolidation, the snapshot stays within ~2x the live
// catalog size. Without consolidation this would grow by ~387 for 200
// renames; with it, the delta should be much smaller.
let bounded = peak_before * 2;
assert!(
peak_delta < bounded,
"peak unconsolidated snapshot grew by {peak_delta} over {num_renames} \
renames (peak_before={peak_before}, peak_after={peak_after}); \
expected < {bounded}"
);

Box::new(writer).expire().await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume rust async shenanigans?

Box::new(reader).expire().await;
}
Loading