diff --git a/src/catalog/src/durable/metrics.rs b/src/catalog/src/durable/metrics.rs index c69c4cc6a30bf..fc3eedaa400eb 100644 --- a/src/catalog/src/durable/metrics.rs +++ b/src/catalog/src/durable/metrics.rs @@ -12,7 +12,7 @@ use mz_ore::metric; use mz_ore::metrics::{IntCounter, MetricsRegistry}; use mz_ore::stats::histogram_seconds_buckets; -use prometheus::{Counter, Histogram, IntGaugeVec}; +use prometheus::{Counter, Histogram, IntGauge, IntGaugeVec}; #[derive(Debug, Clone)] pub struct Metrics { @@ -25,6 +25,8 @@ pub struct Metrics { pub sync_latency_seconds: Counter, pub collection_entries: IntGaugeVec, pub allocate_id_seconds: Histogram, + pub snapshot_consolidations: IntCounter, + pub snapshot_max_entries: IntGauge, } impl Metrics { @@ -69,6 +71,15 @@ impl Metrics { help: "The time it takes to allocate IDs in the durable catalog.", buckets: histogram_seconds_buckets(0.001, 32.0), )), + snapshot_consolidations: registry.register(metric!( + name: "mz_catalog_snapshot_consolidations", + help: "Count of snapshot consolidation passes.", + )), + snapshot_max_entries: registry.register(metric!( + name: "mz_catalog_snapshot_max_entries", + help: "High-water mark of entries in the unconsolidated in-memory \ + snapshot since process start.", + )), } } } diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs index caceeaa3be47f..e4b674ccb46cb 100644 --- a/src/catalog/src/durable/persist.rs +++ b/src/catalog/src/durable/persist.rs @@ -560,6 +560,20 @@ impl> PersistHandle { let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new(); + // Track the snapshot size so we can consolidate when it doubles. This + // bounds memory usage during catch-up: without it, the snapshot grows + // with every retract+insert pair across timestamps. Ideally, we would + // consolidate once after all timestamps, not per-timestamp, but heuristically + // snapshot based on doubling behavior to bound memory + // This behavior is safe because nothing in the loop reads from self.snapshot: + // the update_applier maintains its own internal state (configs, settings, + // fence tokens) independently. Consolidating per-timestamp was O(K * N log N); + // consolidating once is O(N log N). + + // Use a minimum threshold to avoid consolidating on every Progress + // event when the snapshot is small or empty (since 0 * 2 = 0). + let mut size_at_last_consolidation = max(self.snapshot.len(), 8); + while self.upper < target_upper { let listen_events = self.listen.fetch_next().await; for listen_event in listen_events { @@ -586,6 +600,12 @@ impl> PersistHandle { ); self.apply_updates(updates)?; } + // Consolidate when the snapshot has doubled in size to + // bound memory. + if self.snapshot.len() >= size_at_last_consolidation * 2 { + self.consolidate(); + size_at_last_consolidation = self.snapshot.len(); + } } ListenEvent::Updates(batch_updates) => { for update in batch_updates { @@ -597,11 +617,35 @@ impl> PersistHandle { } } assert_eq!(updates, BTreeMap::new(), "all updates should be applied"); + // Always consolidate at the end to ensure the snapshot is clean. + self.consolidate(); + Ok(()) + } + + /// Apply a batch of updates and then consolidate the snapshot. This is the + /// typical entry point for callers that apply updates in a single batch. + /// + /// For hot loops that apply updates across many timestamps (e.g., `sync_inner`), + /// use `apply_updates` directly and call `consolidate()` periodically (e.g., + /// on snapshot doubling) to bound memory while staying amortized O(N log N). + pub(crate) fn apply_updates_and_consolidate( + &mut self, + updates: impl IntoIterator>, + ) -> Result<(), FenceError> { + self.apply_updates(updates)?; + self.consolidate(); Ok(()) } + /// Apply a batch of updates to the catalog state without consolidating. + /// + /// Does NOT consolidate the snapshot afterward. If you are calling this once, + /// prefer `apply_updates_and_consolidate`. This method exists for loops that + /// call it many times — consolidating per call would be O(K * N log N) instead + /// of O(N log N). Callers should consolidate periodically (e.g., on snapshot + /// doubling) to bound memory. #[mz_ore::instrument(level = "debug")] - pub(crate) fn apply_updates( + fn apply_updates( &mut self, updates: impl IntoIterator>, ) -> Result<(), FenceError> { @@ -639,18 +683,23 @@ impl> PersistHandle { } } + // Track the high-water mark of the unconsolidated snapshot size. + let len = i64::try_from(self.snapshot.len()).unwrap_or(i64::MAX); + if len > self.metrics.snapshot_max_entries.get() { + self.metrics.snapshot_max_entries.set(len); + } + errors.sort(); if let Some(err) = errors.into_iter().next() { return Err(err); } - self.consolidate(); - Ok(()) } #[mz_ore::instrument] pub(crate) fn consolidate(&mut self) { + self.metrics.snapshot_consolidations.inc(); soft_assert_no_log!( self.snapshot .windows(2) @@ -1042,7 +1091,7 @@ impl UnopenedPersistCatalogState { let updates = snapshot .into_iter() .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff }); - handle.apply_updates(updates)?; + handle.apply_updates_and_consolidate(updates)?; info!( "startup: envd serve: catalog init: apply updates complete in {:?}", apply_start.elapsed() @@ -1238,7 +1287,7 @@ impl UnopenedPersistCatalogState { let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error"); StateUpdate { kind, ts, diff } }); - catalog.apply_updates(updates)?; + catalog.apply_updates_and_consolidate(updates)?; let catalog_content_version = catalog.catalog_content_version.to_string(); let txn = if is_initialized { @@ -1284,7 +1333,7 @@ impl UnopenedPersistCatalogState { let (txn_batch, _) = txn.into_parts(); // The upper here doesn't matter because we are only applying the updates in memory. let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper); - catalog.apply_updates(updates)?; + catalog.apply_updates_and_consolidate(updates)?; } else { txn.commit_internal(commit_ts).await?; } @@ -1787,7 +1836,7 @@ impl DurableCatalogState for PersistCatalogState { ts: commit_ts, diff, }); - catalog.apply_updates(updates)?; + catalog.apply_updates_and_consolidate(updates)?; catalog.upper = commit_ts.step_forward(); catalog.upper } diff --git a/src/catalog/src/durable/upgrade.rs b/src/catalog/src/durable/upgrade.rs index 18471d0710338..bf86fd8bfdb23 100644 --- a/src/catalog/src/durable/upgrade.rs +++ b/src/catalog/src/durable/upgrade.rs @@ -469,7 +469,7 @@ async fn run_versioned_upgrade = snapshot + .databases + .values() + .filter(|d| d.name.starts_with("churn_db")) + .collect(); + assert_eq!(churn_dbs.len(), 1, "{churn_dbs:#?}"); + assert_eq!(churn_dbs[0].name, format!("churn_db_{}", num_renames - 1)); + + // The key assertion: the snapshot high-water mark should stay bounded, + // not grow proportionally to num_renames. The doubling consolidation + // keeps it within ~2x the live catalog size. + let peak_after = metrics.snapshot_max_entries.get(); + let peak_delta = peak_after - peak_before; + // With doubling consolidation, the snapshot stays within ~2x the live + // catalog size. Without consolidation this would grow by ~387 for 200 + // renames; with it, the delta should be much smaller. + let bounded = peak_before * 2; + assert!( + peak_delta < bounded, + "peak unconsolidated snapshot grew by {peak_delta} over {num_renames} \ + renames (peak_before={peak_before}, peak_after={peak_after}); \ + expected < {bounded}" + ); + + Box::new(writer).expire().await; + Box::new(reader).expire().await; +}