Skip to content

catalog: unified object arrangement across clusters and replicas#36214

Open
leedqin wants to merge 4 commits intoMaterializeInc:mainfrom
leedqin:unified-object-arrangement
Open

catalog: unified object arrangement across clusters and replicas#36214
leedqin wants to merge 4 commits intoMaterializeInc:mainfrom
leedqin:unified-object-arrangement

Conversation

@leedqin
Copy link
Copy Markdown
Contributor

@leedqin leedqin commented Apr 22, 2026

Motivation

We need to query per object memory across clusters and replicas from the catalog for Maintained Objects UI. There is no performant way of doing that without setting up session variables for each query against introspection. This allows the console to get all the object arrangements in the current moment and get historical data as well. This would also help make some other introspection queries querying object arrangements more simple in other parts of the console.

Design doc: doc/developer/design/20260331_unified_object_arrangement_sizes.md.

Linear issue: https://linear.app/materializeinc/issue/CNS-42/per-object-memory-arrangement-for-maintained-objects-in-the-console

Description

Adds per-object arrangement size tracking, following the introspection
subscribe pattern (same as mz_compute_hydration_times and
mz_compute_error_counts_raw_unified): a live
mz_internal.mz_object_arrangement_sizes source populated by a
per-replica subscribe, and a mz_internal.mz_object_arrangement_size_history
table that accumulates hourly snapshots (7-day retention). Both are
queryable from mz_catalog_server with no session variables or cluster
targeting.

Catalog surface

  • New builtin source mz_internal.mz_object_arrangement_sizes (live) and builtin table mz_internal.mz_object_arrangement_size_history (with indexes on object_id and collection_timestamp).
  • Two dyncfgs: arrangement_size_collection_interval (1h default) and arrangement_size_retention_period (7d default).

Runtime

  • A per-replica introspection subscribe populates the live table, wired
    through the storage controller via a new IntrospectionType variant.
  • A coordinator task periodically snapshots the live table and appends
    rows to the history table. Fires at the configured interval with a
    per-environment offset so fleets of envs don't collect in lockstep.
  • A startup pruner retracts history rows older than the retention
    period. A migration guard prevents the history table from being
    truncated by schema migration, which would confuse the pruner.
  • A hydration gate holds off the first snapshot until all compute
    objects have hydrated at least once, so early rows can't capture
    half-built arrangements.
  • Two Prometheus metrics for observability:
    mz_arrangement_sizes_collection_time_seconds (histogram) and
    mz_arrangement_sizes_rows_written_total (counter).

Tests

  • Unit tests for the pruner cutoff logic and the hydration check (10 cases).
  • sqllogictest for the 10 MiB quantization formula (9 edge cases around the boundary).
  • testdrive covering live-table visibility, filter-only MV absence, history accumulation, and drop retraction.

Verification

Verified with 10 Rust unit tests (pruner cutoff + hydration check), a
sqllogictest covering the 10 MiB quantization boundary, and a testdrive
end-to-end (live + history + drop retraction). Also ran locally against
a Postgres source with two replicas and confirmed ~300 history rows
accumulate with stable sizes and advancing collection_timestamp.
Verified with 10 Rust unit tests (pruner cutoff + hydration check), a
sqllogictest covering the 10 MiB quantization boundary, and a testdrive
end-to-end (live + history + drop retraction). Also ran locally against
a Postgres source with two replicas and confirmed ~300 history rows
accumulate with stable sizes and advancing collection_timestamp.

Tips For reviewing:

I would recommend reviewing it per commit

@leedqin leedqin requested review from a team as code owners April 22, 2026 19:45
@leedqin leedqin requested review from DAlperin and SangJunBak April 22, 2026 19:45
@leedqin leedqin force-pushed the unified-object-arrangement branch from 788c684 to 1b5d737 Compare April 22, 2026 19:46
leedqin and others added 4 commits April 22, 2026 15:56
Implements mz_internal.mz_object_arrangement_sizes as a unified introspection
source that continuously reflects arrangement memory per compute object across
all replicas. Queryable from mz_catalog_server without session variables.

The subscribe query uses mz_dataflow_addresses with address[1] extraction to map
operators to dataflows, then joins with mz_compute_exports to get export IDs.
Aggregate per-object memory with hybrid quantization: real precision for objects
<10MB to enable precise monitoring of small objects, rounded to nearest 10MB for
larger objects to suppress byte-level differential churn.

Includes:
- IntrospectionType::ComputeObjectArrangementSizes variant
- BuiltinSource with columns (replica_id, object_id, size)
- BuiltinIndex on replica_id for fast replica-scoped lookups
- Storage controller wiring for differential collection management
- SubscribeSpec with optimized query and OPTIONS hint

This is the live table (differential collection) portion of the unified per-object
arrangement sizes feature. History table and periodic collection tasks follow in
subsequent commits.

Refs: CNS-42

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Introduces a builtin history table that records periodic snapshots of
mz_object_arrangement_sizes so users can see arrangement memory usage
over time.

What's added:
* The history table and two indexes (on object_id and
  collection_timestamp).
* Dyncfgs for collection interval (1h) and retention (7d).
* A coordinator task that snapshots the live table every interval
  and appends rows, plus a startup pruner that drops rows past
  retention.
* Metrics for collection latency and rows written.

Notes for review:
* Collection timing is offset by organization_id so environments
  don't all collect at the same instant.
* A schema-migration guard prevents the history table from being
  truncated, which would break the pruner.
* Gate the first snapshot on every compute object being hydrated, so
  early history rows don't record partial sizes. Sticky flag.
* Extract pruner cutoff and hydration check into pure helpers, each
  covered by new unit tests.
* New sqllogictest for the 10 MiB quantization formula.
* New testdrive for live + history behavior, plus a 2s cadence
  default in the testdrive harness so it fits the timeout.
@leedqin leedqin force-pushed the unified-object-arrangement branch from 1b5d737 to 05915ff Compare April 22, 2026 20:05
@leedqin leedqin requested a review from a team as a code owner April 22, 2026 20:05
@leedqin leedqin changed the title catalog: Unified object arrangements catalog: unified object arrangement across clusters and replicas Apr 22, 2026
Copy link
Copy Markdown
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After restarting Materialize, mz_object_arrangement_size_history can contain partial samples (collection_timestamps with rows for only a subset of the expected (object_id, replica_id) pairs) during the rehydration window. I'm not sure how bad that is?
With this test:

diff --git a/test/restart/mzcompose.py b/test/restart/mzcompose.py
index c8e4d4ab17..872619992d 100644
--- a/test/restart/mzcompose.py
+++ b/test/restart/mzcompose.py
@@ -976,6 +976,107 @@ def workflow_user_id_no_reuse_after_restart(c: Composition) -> None:
     c.sql("DROP TABLE idreuse_t1")


+def workflow_arrangement_sizes_hydration_gate(c: Composition) -> None:
+    """Regression test: arrangement-sizes hydration gate must hold across restart.
+
+    The race between the introspection-subscribe install and the
+    arrangement-size collection timer is narrow, so we repeat the
+    kill/up/observe cycle several times to keep the test reliable.
+    """
+
+    num_replicas = 2
+    arranged_names = tuple(f"mv{i}" for i in range(1, 11)) + tuple(
+        f"idx{i}" for i in range(1, 11)
+    )
+    expected_count = len(arranged_names) * num_replicas
+    name_filter = "(" + ", ".join(f"'{n}'" for n in arranged_names) + ")"
+    restart_rounds = 5
+
+    c.down(destroy_volumes=True)
+
+    with c.override(
+        Materialized(
+            additional_system_parameter_defaults={
+                "arrangement_size_collection_interval": "500ms",
+            },
+            sanity_restart=False,
+        )
+    ):
+        c.up("materialized")
+
+        mv_ddls = "\n".join(
+            f"CREATE MATERIALIZED VIEW mv{i} IN CLUSTER test AS "
+            f"SELECT a % {i + 1} AS k, SUM(b) AS s FROM t GROUP BY a % {i + 1};"
+            f"CREATE INDEX idx{i} IN CLUSTER test ON mv{i} (k);"
+            for i in range(1, 11)
+        )
+        c.sql(
+            dedent(
+                f"""
+                CREATE CLUSTER test SIZE 'scale=1,workers=1', REPLICATION FACTOR {num_replicas};
+                CREATE TABLE t (a int, b int);
+                INSERT INTO t SELECT g, g % 100 FROM generate_series(1, 100000) g;
+                {mv_ddls}
+                """
+            )
+        )
+
+        def wait_for_full_sample() -> str:
+            deadline = time.time() + 120
+            while time.time() < deadline:
+                rows = c.sql_query(
+                    f"""
+                    SELECT max(collection_timestamp)::text FROM (
+                        SELECT collection_timestamp
+                          FROM mz_internal.mz_object_arrangement_size_history h
+                          JOIN mz_objects o ON o.id = h.object_id
+                         WHERE o.name IN {name_filter}
+                         GROUP BY collection_timestamp
+                        HAVING count(*) = {expected_count}
+                    ) AS s
+                    """
+                )
+                if rows and rows[0][0] is not None:
+                    return rows[0][0]
+                time.sleep(0.5)
+            raise UIError(
+                f"timed out waiting for a sample with {expected_count} rows"
+            )
+
+        all_partials: list[tuple[int, str, int]] = []
+        for round_idx in range(restart_rounds):
+            pre_
restart_ts = wait_for_full_sample()
+
+            c.kill("materialized")
+            c.up("materialized")
+
+            time.sleep(8)
+
+            samples = c.sql_query(
+                f"""
+                SELECT collection_timestamp::text, count(*)
+                  FROM mz_internal.mz_object_arrangement_size_history h
+                  JOIN mz_objects o ON o.id = h.object_id
+                 WHERE o.name IN {name_filter}
+                   AND collection_timestamp > '{pre_restart_ts}'::timestamptz
+                 GROUP BY collection_timestamp
+                 ORDER BY collection_timestamp
+                """
+            )
+            assert (
+                samples
+            ), f"round {round_idx}: no post-restart collection_timestamps found"
+
+            for ts, cnt in samples:
+                if cnt != expected_count:
+                    all_partials.append((round_idx, ts, cnt))
+
+        assert not all_partials, (
+            f"{len(all_partials)} post-restart samples (across {restart_rounds} "
+            f"rounds) had row counts != {expected_count}: {all_partials}"
+        )
+
+
 def workflow_default(c: Composition) -> None:
     def process(name: str) -> None:
         if name == "default":

We can run bin/mzcompose --find restart down && bin/mzcompose --find restart run arrangement-sizes-hydration-gate:

==> mzcompose: test case workflow-arrangement-sizes-hydration-gate failed: builtins.AssertionError: 5 post-restart samples (across 5 rounds) had row counts != 40: [(1, '2026-04-23 04:24:37.167+00', 20), (2, '2026-04-23 04:24:55.667+00', 19), (2, '2026-04-23 04:24:56.408+00', 38), (3, '2026-04-23 04:25:11.167+00', 3), (3, '2026-04-23 04:25:11.877+00', 7)]
Traceback (most recent call last):
  File "/home/deen/git/materialize2/misc/python/materialize/mzcompose/composition.py", line 696, in test_case
    yield
  File "/home/deen/git/materialize2/misc/python/materialize/cli/mzcompose.py", line 857, in handle_composition
    composition.workflow(
    ~~~~~~~~~~~~~~~~~~~~^
        workflow_name, *args.unknown_subargs[1:], *extra_args
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/home/deen/git/materialize2/misc/python/materialize/mzcompose/composition.py", line 584, in workflow
    func(self)
    ~~~~^^^^^^
  File "/home/deen/git/materialize2/test/restart/mzcompose.py", line 1074, in workflow_arrangement_sizes_hydration_gate
    assert not all_partials, (
           ^^^^^^^^^^^^^^^^
AssertionError: 5 post-restart samples (across 5 rounds) had row counts != 40: [(1, '2026-04-23 04:24:37.167+00', 20), (2, '2026-04-23 04:24:55.667+00', 19), (2, '2026-04-23 04:24:56.408+00', 38), (3, '2026-04-23 04:25:11.167+00', 3), (3, '2026-04-23 04:25:11.877+00', 7)]
mzcompose: error: at least one test case failed

Copy link
Copy Markdown
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments inline. I wonder about the < 10Mib/> 10MiB split. Small objects can have just as much churn as larger ones, why treat them differently? What is the problem we're trying to solve for?

Comment on lines +596 to +598
SELECT operator_id FROM mz_introspection.mz_arrangement_heap_size_raw
UNION ALL
SELECT operator_id FROM mz_introspection.mz_arrangement_batcher_size_raw
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit unfortunate you'll need to go through the raw collections, but right now this seems to be the best approach. The views we define on top of the raw collections do more than what you need here, and certainly will be more expensive to maintain.

SELECT operator_id FROM mz_introspection.mz_arrangement_batcher_size_raw
) AS rs ON rs.operator_id = od.operator_id
GROUP BY ce.export_id
OPTIONS (AGGREGATE INPUT GROUP SIZE = 1000)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Count don't need the aggregate input group size annotation (it's only for hierarchical reductions, not for simple ones.)

introspection_type: IntrospectionType::ComputeObjectArrangementSizes,
sql: "SUBSCRIBE (
SELECT
ce.export_id AS object_id,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this bakes in an assumption that's not guaranteed to be true in the future, which is that one dataflow has one export. If it'd change, we'd double-count. Just calling it out. Once it happens, we'd need to re-think what the granularity for this information is instead.

Comment on lines +575 to +576
// Objects smaller than 10 MiB report exact bytes; larger ones are rounded
// to the nearest 10 MiB to suppress byte-level churn in the collection.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will have a lot of noise independently of the 10MiB gate. Running the subscribe on an otherwise empty cluster shows an update for at least one export every second. You could filter temporary exports t*, as they won't be reflected anywhere, but that doesn't solve the problem.

Comment on lines +362 to +363
let interval_duration = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_COLLECTION_INTERVAL
.get(self.catalog().system_config().dyncfgs());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure the collection task reacts to change to the interval (without going through an envd restart)

if diff != 1 {
return None;
}
let datums = row.unpack();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a DatumVecBorrow. Row::unpack needs to allocate.

// the full collection interval, so the first snapshot can lag
// hydration by up to one interval.
if !self.arrangement_sizes_hydration_observed {
if !self.check_arrangement_sizes_hydration().await {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the limit this'll mean we never write any data as there can always be an object somewhere that isn't hydrated. This checks for all objects, correct? Alternatively, just write down the size for objects that are hydrated.

Why gate on hydration status?

if *diff != 1 {
continue;
}
let datums = row.unpack();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a DatumVecBorrow here too.

/// Interval at which to collect per-object arrangement size snapshots for the history table.
pub const ARRANGEMENT_SIZE_COLLECTION_INTERVAL: Config<Duration> = Config::new(
"arrangement_size_collection_interval",
Duration::from_secs(60 * 60),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Duration::from_secs(60 * 60),
Duration::from_hours(1),

/// How long to retain per-object arrangement size history.
pub const ARRANGEMENT_SIZE_RETENTION_PERIOD: Config<Duration> = Config::new(
"arrangement_size_retention_period",
Duration::from_secs(7 * 24 * 60 * 60),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Duration::from_secs(7 * 24 * 60 * 60),
Duration::from_hours(7 * 24),

Duration::from_secs(7 * 24 * 60 * 60),
"How long to retain per-object arrangement size history.",
);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a sentinel value (0s?) to disable the collection.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants