Skip to content

Commit 31ebed7

Browse files
authored
[catalog] fix quadratic consolidation during catalog sync (#36217)
SQL-159 / incident-970 Problem: There was O(K * N log N) behavior in catalog sync where consolidate() was called on the entire snapshot for each of K timestamps. This was the root cause of incident-970: a read-only envd restarted due to DDL in the old env, and during the restart more DDL happened across many timestamps, causing the quadratic consolidation to take ~1000s. Solution: Now consolidate is called once after all timestamps are processed, making sync O(N log N). This is correct because the snapshot is append-only during the sync loop, andl nothing else reads from it between timestamps. Therefore, deferring consolidation to the end produces the same final state Add snapshot_consolidations metric to track consolidation passes Test: - Writer creates 100 databases across 100 distinct timestamps - Reader syncs through all timestamps - Asserts exactly 1 consolidation during sync (was 100 before the fix)
1 parent 85fe1c5 commit 31ebed7

4 files changed

Lines changed: 258 additions & 11 deletions

File tree

src/catalog/src/durable/metrics.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
use mz_ore::metric;
1313
use mz_ore::metrics::{IntCounter, MetricsRegistry};
1414
use mz_ore::stats::histogram_seconds_buckets;
15-
use prometheus::{Counter, Histogram, IntGaugeVec};
15+
use prometheus::{Counter, Histogram, IntGauge, IntGaugeVec};
1616

1717
#[derive(Debug, Clone)]
1818
pub struct Metrics {
@@ -25,6 +25,8 @@ pub struct Metrics {
2525
pub sync_latency_seconds: Counter,
2626
pub collection_entries: IntGaugeVec,
2727
pub allocate_id_seconds: Histogram,
28+
pub snapshot_consolidations: IntCounter,
29+
pub snapshot_max_entries: IntGauge,
2830
}
2931

3032
impl Metrics {
@@ -69,6 +71,15 @@ impl Metrics {
6971
help: "The time it takes to allocate IDs in the durable catalog.",
7072
buckets: histogram_seconds_buckets(0.001, 32.0),
7173
)),
74+
snapshot_consolidations: registry.register(metric!(
75+
name: "mz_catalog_snapshot_consolidations",
76+
help: "Count of snapshot consolidation passes.",
77+
)),
78+
snapshot_max_entries: registry.register(metric!(
79+
name: "mz_catalog_snapshot_max_entries",
80+
help: "High-water mark of entries in the unconsolidated in-memory \
81+
snapshot since process start.",
82+
)),
7283
}
7384
}
7485
}

src/catalog/src/durable/persist.rs

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,20 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
560560

561561
let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
562562

563+
// Track the snapshot size so we can consolidate when it doubles. This
564+
// bounds memory usage during catch-up: without it, the snapshot grows
565+
// with every retract+insert pair across timestamps. Ideally, we would
566+
// consolidate once after all timestamps, not per-timestamp, but heuristically
567+
// snapshot based on doubling behavior to bound memory
568+
// This behavior is safe because nothing in the loop reads from self.snapshot:
569+
// the update_applier maintains its own internal state (configs, settings,
570+
// fence tokens) independently. Consolidating per-timestamp was O(K * N log N);
571+
// consolidating once is O(N log N).
572+
573+
// Use a minimum threshold to avoid consolidating on every Progress
574+
// event when the snapshot is small or empty (since 0 * 2 = 0).
575+
let mut size_at_last_consolidation = max(self.snapshot.len(), 8);
576+
563577
while self.upper < target_upper {
564578
let listen_events = self.listen.fetch_next().await;
565579
for listen_event in listen_events {
@@ -586,6 +600,12 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
586600
);
587601
self.apply_updates(updates)?;
588602
}
603+
// Consolidate when the snapshot has doubled in size to
604+
// bound memory.
605+
if self.snapshot.len() >= size_at_last_consolidation * 2 {
606+
self.consolidate();
607+
size_at_last_consolidation = self.snapshot.len();
608+
}
589609
}
590610
ListenEvent::Updates(batch_updates) => {
591611
for update in batch_updates {
@@ -597,11 +617,35 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
597617
}
598618
}
599619
assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
620+
// Always consolidate at the end to ensure the snapshot is clean.
621+
self.consolidate();
622+
Ok(())
623+
}
624+
625+
/// Apply a batch of updates and then consolidate the snapshot. This is the
626+
/// typical entry point for callers that apply updates in a single batch.
627+
///
628+
/// For hot loops that apply updates across many timestamps (e.g., `sync_inner`),
629+
/// use `apply_updates` directly and call `consolidate()` periodically (e.g.,
630+
/// on snapshot doubling) to bound memory while staying amortized O(N log N).
631+
pub(crate) fn apply_updates_and_consolidate(
632+
&mut self,
633+
updates: impl IntoIterator<Item = StateUpdate<T>>,
634+
) -> Result<(), FenceError> {
635+
self.apply_updates(updates)?;
636+
self.consolidate();
600637
Ok(())
601638
}
602639

640+
/// Apply a batch of updates to the catalog state without consolidating.
641+
///
642+
/// Does NOT consolidate the snapshot afterward. If you are calling this once,
643+
/// prefer `apply_updates_and_consolidate`. This method exists for loops that
644+
/// call it many times — consolidating per call would be O(K * N log N) instead
645+
/// of O(N log N). Callers should consolidate periodically (e.g., on snapshot
646+
/// doubling) to bound memory.
603647
#[mz_ore::instrument(level = "debug")]
604-
pub(crate) fn apply_updates(
648+
fn apply_updates(
605649
&mut self,
606650
updates: impl IntoIterator<Item = StateUpdate<T>>,
607651
) -> Result<(), FenceError> {
@@ -639,18 +683,23 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
639683
}
640684
}
641685

686+
// Track the high-water mark of the unconsolidated snapshot size.
687+
let len = i64::try_from(self.snapshot.len()).unwrap_or(i64::MAX);
688+
if len > self.metrics.snapshot_max_entries.get() {
689+
self.metrics.snapshot_max_entries.set(len);
690+
}
691+
642692
errors.sort();
643693
if let Some(err) = errors.into_iter().next() {
644694
return Err(err);
645695
}
646696

647-
self.consolidate();
648-
649697
Ok(())
650698
}
651699

652700
#[mz_ore::instrument]
653701
pub(crate) fn consolidate(&mut self) {
702+
self.metrics.snapshot_consolidations.inc();
654703
soft_assert_no_log!(
655704
self.snapshot
656705
.windows(2)
@@ -1042,7 +1091,7 @@ impl UnopenedPersistCatalogState {
10421091
let updates = snapshot
10431092
.into_iter()
10441093
.map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1045-
handle.apply_updates(updates)?;
1094+
handle.apply_updates_and_consolidate(updates)?;
10461095
info!(
10471096
"startup: envd serve: catalog init: apply updates complete in {:?}",
10481097
apply_start.elapsed()
@@ -1238,7 +1287,7 @@ impl UnopenedPersistCatalogState {
12381287
let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
12391288
StateUpdate { kind, ts, diff }
12401289
});
1241-
catalog.apply_updates(updates)?;
1290+
catalog.apply_updates_and_consolidate(updates)?;
12421291

12431292
let catalog_content_version = catalog.catalog_content_version.to_string();
12441293
let txn = if is_initialized {
@@ -1284,7 +1333,7 @@ impl UnopenedPersistCatalogState {
12841333
let (txn_batch, _) = txn.into_parts();
12851334
// The upper here doesn't matter because we are only applying the updates in memory.
12861335
let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1287-
catalog.apply_updates(updates)?;
1336+
catalog.apply_updates_and_consolidate(updates)?;
12881337
} else {
12891338
txn.commit_internal(commit_ts).await?;
12901339
}
@@ -1787,7 +1836,7 @@ impl DurableCatalogState for PersistCatalogState {
17871836
ts: commit_ts,
17881837
diff,
17891838
});
1790-
catalog.apply_updates(updates)?;
1839+
catalog.apply_updates_and_consolidate(updates)?;
17911840
catalog.upper = commit_ts.step_forward();
17921841
catalog.upper
17931842
}

src/catalog/src/durable/upgrade.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ async fn run_versioned_upgrade<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateK
469469
.into_iter()
470470
.map(|(kind, diff)| StateUpdate { kind, ts, diff });
471471
commit_ts = commit_ts.step_forward();
472-
unopened_catalog_state.apply_updates(updates)?;
472+
unopened_catalog_state.apply_updates_and_consolidate(updates)?;
473473
}
474474

475475
// 5. Consolidate snapshot to remove old versions.

src/catalog/tests/read-write.rs

Lines changed: 189 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,20 @@
1010
#![recursion_limit = "256"]
1111

1212
use std::collections::BTreeMap;
13+
use std::sync::Arc;
1314

1415
use insta::assert_debug_snapshot;
1516
use itertools::Itertools;
1617
use mz_audit_log::{EventDetails, EventType, EventV1, IdNameV1, VersionedEvent};
1718
use mz_catalog::durable::objects::serialization::proto;
1819
use mz_catalog::durable::objects::{DurableType, IdAlloc};
1920
use mz_catalog::durable::{
20-
CatalogError, DurableCatalogError, FenceError, Item, TestCatalogStateBuilder,
21-
USER_ITEM_ALLOC_KEY, test_bootstrap_args,
21+
CatalogError, Database, DurableCatalogError, FenceError, Item, Metrics,
22+
TestCatalogStateBuilder, USER_ITEM_ALLOC_KEY, test_bootstrap_args,
2223
};
2324
use mz_ore::assert_ok;
2425
use mz_ore::collections::HashSet;
26+
use mz_ore::metrics::MetricsRegistry;
2527
use mz_ore::now::SYSTEM_TIME;
2628
use mz_persist_client::PersistClient;
2729
use mz_proto::RustType;
@@ -553,3 +555,188 @@ async fn test_persist_ddl_detection_with_batch_allocated_ids() {
553555

554556
Box::new(state).expire().await;
555557
}
558+
559+
/// Regression test for incident-970: quadratic consolidation during catalog sync.
560+
///
561+
/// When a reader syncs through K timestamps, apply_updates() was calling
562+
/// consolidate() on the entire snapshot for each timestamp, resulting in
563+
/// O(K * N log N) work instead of O(N log N). This test verifies that syncing
564+
/// through many timestamps only consolidates the snapshot a constant number of
565+
/// times, not once per timestamp.
566+
#[mz_ore::test(tokio::test)]
567+
#[cfg_attr(miri, ignore)]
568+
async fn test_persist_sync_consolidation_not_quadratic() {
569+
let persist_client = PersistClient::new_for_tests().await;
570+
let metrics = Arc::new(Metrics::new(&MetricsRegistry::new()));
571+
let state_builder =
572+
TestCatalogStateBuilder::new(persist_client).with_default_deploy_generation();
573+
// Share metrics between writer and reader so we can observe consolidation counts.
574+
let state_builder = state_builder.with_metrics(Arc::clone(&metrics));
575+
576+
// Open a writer catalog.
577+
let mut writer = state_builder
578+
.clone()
579+
.unwrap_build()
580+
.await
581+
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
582+
.await
583+
.unwrap()
584+
.0;
585+
let _ = writer.sync_to_current_updates().await.unwrap();
586+
587+
// Open a read-only catalog, caught up to the current upper.
588+
let mut reader = state_builder
589+
.clone()
590+
.unwrap_build()
591+
.await
592+
.open_read_only(&test_bootstrap_args())
593+
.await
594+
.unwrap();
595+
let _ = reader.sync_to_current_updates().await.unwrap();
596+
597+
// Writer creates many databases, each in its own transaction at a distinct
598+
// timestamp. This mirrors the incident scenario where DDL happened across
599+
// many timestamps while a read-only envd was restarting.
600+
let num_timestamps: u64 = 100;
601+
for i in 0..num_timestamps {
602+
let mut txn = writer.transaction().await.unwrap();
603+
txn.insert_user_database(
604+
&format!("db_{i}"),
605+
RoleId::User(1),
606+
Vec::new(),
607+
&HashSet::new(),
608+
)
609+
.unwrap();
610+
let _ = txn.get_and_commit_op_updates();
611+
let commit_ts = txn.upper();
612+
txn.commit(commit_ts).await.unwrap();
613+
}
614+
615+
// Record the consolidation counter before the reader syncs.
616+
let consolidations_before = metrics.snapshot_consolidations.get();
617+
618+
// Reader syncs through all timestamps. With the quadratic bug, this would
619+
// call consolidate() once per timestamp (num_timestamps times). With the
620+
// fix, it should consolidate only once after processing all timestamps.
621+
let updates = reader.sync_to_current_updates().await.unwrap();
622+
let consolidations_after = metrics.snapshot_consolidations.get();
623+
let consolidations_during_sync = consolidations_after - consolidations_before;
624+
625+
// Verify correctness: reader received updates and can see all databases.
626+
assert!(
627+
!updates.is_empty(),
628+
"reader should have received updates from writer"
629+
);
630+
let snapshot = reader.snapshot().await.unwrap();
631+
for i in 0..num_timestamps {
632+
let db_name = format!("db_{i}");
633+
let found = snapshot.databases.values().any(|db| db.name == db_name);
634+
assert!(found, "database {db_name} not found in reader snapshot");
635+
}
636+
637+
// The key assertion: consolidation should happen O(log N) times during
638+
// the sync (from the doubling strategy), NOT once per timestamp (which
639+
// would be num_timestamps = 100). We allow a generous bound here.
640+
assert!(
641+
consolidations_during_sync < 10,
642+
"sync through {num_timestamps} timestamps triggered {consolidations_during_sync} \
643+
snapshot consolidations, suggesting quadratic behavior (expected < 10)"
644+
);
645+
646+
Box::new(writer).expire().await;
647+
Box::new(reader).expire().await;
648+
}
649+
650+
/// Verify that the reader's snapshot stays bounded during sync catch-up, even
651+
/// when the writer churns the same object many times across timestamps. Without
652+
/// the doubling consolidation in `sync_inner`, the snapshot would grow with
653+
/// every retract+insert pair; with it, the snapshot stays within ~2x the live
654+
/// catalog size.
655+
#[mz_ore::test(tokio::test)]
656+
#[cfg_attr(miri, ignore)]
657+
async fn test_persist_sync_snapshot_stays_bounded_under_churn() {
658+
let persist_client = PersistClient::new_for_tests().await;
659+
let metrics = Arc::new(Metrics::new(&MetricsRegistry::new()));
660+
let state_builder = TestCatalogStateBuilder::new(persist_client)
661+
.with_default_deploy_generation()
662+
.with_metrics(Arc::clone(&metrics));
663+
664+
// Open writer, create one database to churn.
665+
let mut writer = state_builder
666+
.clone()
667+
.unwrap_build()
668+
.await
669+
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
670+
.await
671+
.unwrap()
672+
.0;
673+
let _ = writer.sync_to_current_updates().await.unwrap();
674+
675+
let mut txn = writer.transaction().await.unwrap();
676+
let (db_id, db_oid) = txn
677+
.insert_user_database("churn_db", RoleId::User(1), Vec::new(), &HashSet::new())
678+
.unwrap();
679+
let _ = txn.get_and_commit_op_updates();
680+
let commit_ts = txn.upper();
681+
txn.commit(commit_ts).await.unwrap();
682+
683+
// Open reader, sync to current state.
684+
let mut reader = state_builder
685+
.unwrap_build()
686+
.await
687+
.open_read_only(&test_bootstrap_args())
688+
.await
689+
.unwrap();
690+
let _ = reader.sync_to_current_updates().await.unwrap();
691+
let peak_before = metrics.snapshot_max_entries.get();
692+
693+
// Rename the same database 200 times, each in its own transaction.
694+
let num_renames: u64 = 200;
695+
let mut db = Database {
696+
id: db_id,
697+
oid: db_oid,
698+
name: "churn_db".to_string(),
699+
owner_id: RoleId::User(1),
700+
privileges: Vec::new(),
701+
};
702+
for i in 0..num_renames {
703+
let mut txn = writer.transaction().await.unwrap();
704+
db.name = format!("churn_db_{i}");
705+
txn.update_database(db.id, db.clone()).unwrap();
706+
let _ = txn.get_and_commit_op_updates();
707+
let commit_ts = txn.upper();
708+
txn.commit(commit_ts).await.unwrap();
709+
}
710+
711+
// Reader syncs through all 200 renames.
712+
let _ = reader.sync_to_current_updates().await.unwrap();
713+
714+
// Verify correctness: only one database, with the final name.
715+
let snapshot = reader.snapshot().await.unwrap();
716+
let churn_dbs: Vec<_> = snapshot
717+
.databases
718+
.values()
719+
.filter(|d| d.name.starts_with("churn_db"))
720+
.collect();
721+
assert_eq!(churn_dbs.len(), 1, "{churn_dbs:#?}");
722+
assert_eq!(churn_dbs[0].name, format!("churn_db_{}", num_renames - 1));
723+
724+
// The key assertion: the snapshot high-water mark should stay bounded,
725+
// not grow proportionally to num_renames. The doubling consolidation
726+
// keeps it within ~2x the live catalog size.
727+
let peak_after = metrics.snapshot_max_entries.get();
728+
let peak_delta = peak_after - peak_before;
729+
// With doubling consolidation, the snapshot stays within ~2x the live
730+
// catalog size. Without consolidation this would grow by ~387 for 200
731+
// renames; with it, the delta should be much smaller.
732+
let bounded = peak_before * 2;
733+
assert!(
734+
peak_delta < bounded,
735+
"peak unconsolidated snapshot grew by {peak_delta} over {num_renames} \
736+
renames (peak_before={peak_before}, peak_after={peak_after}); \
737+
expected < {bounded}"
738+
);
739+
740+
Box::new(writer).expire().await;
741+
Box::new(reader).expire().await;
742+
}

0 commit comments

Comments
 (0)