Skip to content

[catalog] fix quadratic consolidation during catalog sync#36217

Open
mtabebe wants to merge 1 commit intoMaterializeInc:mainfrom
mtabebe:ma/ddl/sql-159-quadratic
Open

[catalog] fix quadratic consolidation during catalog sync#36217
mtabebe wants to merge 1 commit intoMaterializeInc:mainfrom
mtabebe:ma/ddl/sql-159-quadratic

Conversation

@mtabebe
Copy link
Copy Markdown
Contributor

@mtabebe mtabebe commented Apr 22, 2026

SQL-159 / incident-970

Problem:
There was O(K * N log N) behavior in catalog sync where consolidate() was called on the entire snapshot for each of K timestamps.

This was the root cause of incident-970: a read-only envd restarted due to DDL in the old env, and during the restart more DDL happened across many timestamps, causing the quadratic consolidation to take ~1000s.

Solution:
Now consolidate is called once after all timestamps are processed, making sync O(N log N).

This is correct because the snapshot is append-only during the sync loop, andl nothing else reads from it between timestamps. Therefore, deferring consolidation to the end produces the same final state

Add snapshot_consolidations metric to track consolidation passes

Test:

  • Writer creates 100 databases across 100 distinct timestamps
  • Reader syncs through all timestamps
  • Asserts exactly 1 consolidation during sync (was 100 before the fix)

SQL-159 / incident-970

Problem:
There was O(K * N log N) behavior in catalog sync where consolidate() was
called on the entire snapshot for each of K timestamps.

This was the root cause of incident-970: a read-only envd restarted due
to DDL in the old env, and during the restart more DDL happened across
many timestamps, causing the quadratic consolidation to take ~1000s.

Solution:
Now consolidate is called once after all timestamps are processed,
making sync O(N log N).

This is correct because the snapshot is append-only during the sync loop,
andl nothing else reads from it between timestamps. Therefore,
deferring consolidation to the end produces the same final state

Add snapshot_consolidations metric to track consolidation passes

Test:
- Writer creates 100 databases across 100 distinct timestamps
- Reader syncs through all timestamps
- Asserts exactly 1 consolidation during sync (was 100 before the fix)
@mtabebe mtabebe marked this pull request as ready for review April 23, 2026 00:02
@mtabebe mtabebe requested a review from a team as a code owner April 23, 2026 00:02
@mtabebe mtabebe requested review from aljoscha and ggevay April 23, 2026 00:02
Copy link
Copy Markdown
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During a 0dt deployment, the read-only envd's memory usage grows proportionally to the number of DDL timestamps replayed during catch-up, not the size of the live catalog. I'm not sure if that's bad enough to warrant a change. Reproducer:

diff --git a/src/catalog/src/durable/metrics.rs b/src/catalog/src/durable/metrics.rs
index e58ada644d..0c9e862645 100644
--- a/src/catalog/src/durable/metrics.rs
+++ b/src/catalog/src/durable/metrics.rs
@@ -12,7 +12,7 @@
 use mz_ore::metric;
 use mz_ore::metrics::{IntCounter, MetricsRegistry};
 use mz_ore::stats::histogram_seconds_buckets;
-use prometheus::{Counter, Histogram, IntGaugeVec};
+use prometheus::{Counter, Histogram, IntGauge, IntGaugeVec};

 #[derive(Debug, Clone)]
 pub struct Metrics {
@@ -26,6 +26,7 @@ pub struct Metrics {
     pub collection_entries: IntGaugeVec,
     pub allocate_id_seconds: Histogram,
     pub snapshot_consolidations: IntCounter,
+    pub snapshot_max_entries: IntGauge,
 }

 impl Metrics {
@@ -74,6 +75,14 @@ impl Metrics {
                 name: "mz_catalog_snapshot_consolidations",
                 help: "Count of snapshot consolidation passes.",
             )),
+            snapshot_max_entries: registry.register(metric!(
+                name: "mz_catalog_snapshot_max_entries",
+                help: "High-water mark of entries in the unconsolidated in-memory \
+                       snapshot since process start. Reflects the transient memory \
+                       footprint of catalog sync/catch-up: it can exceed the live \
+                       catalog size when sync_inner replays many retract+insert \
+                       pairs across timestamps before consolidating.",
+            )),
         }
     }
 }
diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs
index 638e006aa0..c093b0b196 100644
--- a/src/catalog/src/durable/persist.rs
+++ b/src/catalog/src/durable/persist.rs
@@ -665,6 +665,16 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
             }
         }

+        // Track the high-water mark of the unconsolidated snapshot. During
+        // sync_inner this metric climbs with replayed history rather than
+        // live state, because consolidate() is deferred to the end of the
+        // whole sync. Tests use this to assert the memory footprint scales
+        // as expected under catch-up workloads.
+        let len = i64::try_from(self.snapshot.len()).unwrap_or(i64::MAX);
+        if len > self.metrics.snapshot_max_entries.get() {
+            self.metrics.snapshot_max_entries.set(len);
+        }
+
         errors.sort();
         if let Some(err) = errors.into_iter().next() {
             return Err(err);
diff --git a/src/catalog/tests/read-write.rs b/src/catalog/tests/read-write.rs
index 4a26c9ae59..b3b4d48265 100644
--- a/src/catalog/tests/read-write.rs
+++ b/src/catalog/tests/read-write.rs
@@ -18,8 +18,8 @@ use mz_audit_log::{EventDetails, EventType, EventV1, IdNameV1, VersionedEvent};
 use mz_catalog::durable::objects::serialization::proto;
 use mz_catalog::durable::objects::{DurableType, IdAlloc};
 use mz_catalog::durable::{
-    CatalogError, DurableCatalogError, FenceError, Item, Metrics, TestCatalogStateBuilder,
-    USER_ITEM_ALLOC_KEY, test_bootstrap_args,
+    CatalogError, Database, DurableCatalogError, FenceError, Item, Metrics,
+    TestCatalogStateBuilder, USER_ITEM_ALLOC_KEY, test_bootstrap_args,
 };
 use mz_ore::assert_ok;
 use mz_ore::collections::HashSet;
@@ -645,3 +645,83 @@ async fn test_persist_sync_consolidation_not_quadratic() {
     Box::new(writer).expire().await;
     Box::new(reader).expire().await;
 }
+
+/// Reader's unconsolidated snapshot during `sync_inner` must stay bounded by
+/// live state, not replayed history. See review of 7da7861c8b.
+#[mz_ore::test(tokio::test)]
+#[cfg_attr(miri, ignore)]
+async fn test_persist_sync_snapshot_stays_bounded_under_churn() {
+    let persist_client = PersistClient::new_for_tests().await;
+    let metrics = Arc::new(Metrics::new(&MetricsRegistry::new()));
+    let state_builder = TestCatalogStateBuilder::new(persist_client)
+        .with_default_deploy_generation()
+        .with_metrics(Arc::clone(&metrics));
+
+    let mut writer = state_builder
+        .clone()
+        .unwrap_build()
+        .await
+        .open(SYSTEM_TIME().into(), &test_bootstrap_args())
+        .await
+        .unwrap()
+        .0;
+    let _ = writer.sync_to_current_updates().await.unwrap();
+
+    let mut txn = writer.transaction().await.unwrap();
+    let (db_id, db_oid) = txn
+        .insert_user_database("churn_db", RoleId::User(1), Vec::new(), &HashSet::new())
+        .unwrap();
+    let _ = txn.get_and_commit_op_updates();
+    let commit_ts = txn.upper();
+    txn.commit(commit_ts).await.unwrap();
+
+    let mut reader = state_builder
+        .unwrap_build()
+        .await
+        .open_read_only(&test_bootstrap_args())
+        .await
+        .unwrap();
+    let _ = reader.sync_to_current_updates().await.unwrap();
+    let peak_before = metrics.snapshot_max_entries.get();
+
+    let num_renames: u64 = 200;
+    let mut db = Database {
+        id: db_id,
+        oid: db_oid,
+        name: "churn_db".to_string(),
+        owner_id: RoleId::User(1),
+        privileges: Vec::new(),
+    };
+    for i in 0..num_renames {
+        let mut txn = writer.transaction().await.unwrap();
+        db.name = format!("churn_db_{i}");
+        txn.update_database(db.id, db.clone()).unwrap();
+        let _ = txn.get_and_commit_op_updates();
+        let commit_ts = txn.upper();
+        txn.commit(commit_ts).await.unwrap();
+    }
+
+    let _ = reader.sync_to_current_updates().await.unwrap();
+
+    let snapshot = reader.snapshot().await.unwrap();
+    let churn_dbs: Vec<_> = snapshot
+        .databases
+        .values()
+        .filter(|d| d.name.starts_with("churn_db"))
+        .collect();
+    assert_eq!(churn_dbs.len(), 1, "{churn_dbs:#?}");
+    assert_eq!(churn_dbs[0].name, format!("churn_db_{}", num_renames - 1));
+
+    let peak_after = metrics.snapshot_max_entries.get();
+    let peak_delta = peak_after - peak_before;
+    let bounded = i64::try_from(num_renames / 4).expect("fits in i64");
+    assert!(
+        peak_delta < bounded,
+        "peak unconsolidated snapshot grew by {peak_delta} over {num_renames} \
+         renames (peak_before={peak_before}, peak_after={peak_after}); \
+         expected < {bounded}"
+    );
+
+    Box::new(writer).expire().await;
+    Box::new(reader).expire().await;
+}

Run with: cargo test -p mz-catalog --test read-write test_persist_sync_snapshot_stays_bounded_under_churn:

thread 'test_persist_sync_snapshot_stays_bounded_under_churn' (3903602) panicked at src/catalog/tests/read-write.rs:718:5:
peak unconsolidated snapshot grew by 387 over 200 renames (peak_before=56, peak_after=443); expected < 50

Copy link
Copy Markdown
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent! It's nice that you added the metric and the test

snapshot consolidations, suggesting quadratic behavior"
);

Box::new(writer).expire().await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume rust async shenanigans?

@ggevay
Copy link
Copy Markdown
Contributor

ggevay commented Apr 23, 2026

During a 0dt deployment, the read-only envd's memory usage grows proportionally to the number of DDL timestamps replayed during catch-up, not the size of the live catalog. I'm not sure if that's bad enough to warrant a change.

This seems ok to me. I think the absolute data sizes that are involved in each catalog object are pretty small. Also, we usually have plenty of free envd memory, including swap. E.g., envd swap usage was 0 bytes across the entire prod us-east-1 for the past several days. (Also, I wouldn't really expect users doing a hot loop that keeps renaming the same object again and again.)

Comment on lines +600 to +603
// Consolidate once after all timestamps, not per-timestamp. This is safe because
// nothing in the loop reads from self.snapshot: the update_applier maintains its
// own internal state (configs, settings, fence tokens) independently. Consolidating
// per-timestamp was O(K * N log N); consolidating once is O(N log N).
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, consolidate whenever we double the size compared to the last consolidated state. This way you still get the amortized $$O(n \cdot \log n)$$ but don't risk of OOMing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants