catalog: unified object arrangement across clusters and replicas#36214
catalog: unified object arrangement across clusters and replicas#36214leedqin wants to merge 4 commits intoMaterializeInc:mainfrom
Conversation
788c684 to
1b5d737
Compare
Implements mz_internal.mz_object_arrangement_sizes as a unified introspection source that continuously reflects arrangement memory per compute object across all replicas. Queryable from mz_catalog_server without session variables. The subscribe query uses mz_dataflow_addresses with address[1] extraction to map operators to dataflows, then joins with mz_compute_exports to get export IDs. Aggregate per-object memory with hybrid quantization: real precision for objects <10MB to enable precise monitoring of small objects, rounded to nearest 10MB for larger objects to suppress byte-level differential churn. Includes: - IntrospectionType::ComputeObjectArrangementSizes variant - BuiltinSource with columns (replica_id, object_id, size) - BuiltinIndex on replica_id for fast replica-scoped lookups - Storage controller wiring for differential collection management - SubscribeSpec with optimized query and OPTIONS hint This is the live table (differential collection) portion of the unified per-object arrangement sizes feature. History table and periodic collection tasks follow in subsequent commits. Refs: CNS-42 Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
Introduces a builtin history table that records periodic snapshots of mz_object_arrangement_sizes so users can see arrangement memory usage over time. What's added: * The history table and two indexes (on object_id and collection_timestamp). * Dyncfgs for collection interval (1h) and retention (7d). * A coordinator task that snapshots the live table every interval and appends rows, plus a startup pruner that drops rows past retention. * Metrics for collection latency and rows written. Notes for review: * Collection timing is offset by organization_id so environments don't all collect at the same instant. * A schema-migration guard prevents the history table from being truncated, which would break the pruner.
* Gate the first snapshot on every compute object being hydrated, so early history rows don't record partial sizes. Sticky flag. * Extract pruner cutoff and hydration check into pure helpers, each covered by new unit tests. * New sqllogictest for the 10 MiB quantization formula. * New testdrive for live + history behavior, plus a 2s cadence default in the testdrive harness so it fits the timeout.
1b5d737 to
05915ff
Compare
def-
left a comment
There was a problem hiding this comment.
After restarting Materialize, mz_object_arrangement_size_history can contain partial samples (collection_timestamps with rows for only a subset of the expected (object_id, replica_id) pairs) during the rehydration window. I'm not sure how bad that is?
With this test:
diff --git a/test/restart/mzcompose.py b/test/restart/mzcompose.py
index c8e4d4ab17..872619992d 100644
--- a/test/restart/mzcompose.py
+++ b/test/restart/mzcompose.py
@@ -976,6 +976,107 @@ def workflow_user_id_no_reuse_after_restart(c: Composition) -> None:
c.sql("DROP TABLE idreuse_t1")
+def workflow_arrangement_sizes_hydration_gate(c: Composition) -> None:
+ """Regression test: arrangement-sizes hydration gate must hold across restart.
+
+ The race between the introspection-subscribe install and the
+ arrangement-size collection timer is narrow, so we repeat the
+ kill/up/observe cycle several times to keep the test reliable.
+ """
+
+ num_replicas = 2
+ arranged_names = tuple(f"mv{i}" for i in range(1, 11)) + tuple(
+ f"idx{i}" for i in range(1, 11)
+ )
+ expected_count = len(arranged_names) * num_replicas
+ name_filter = "(" + ", ".join(f"'{n}'" for n in arranged_names) + ")"
+ restart_rounds = 5
+
+ c.down(destroy_volumes=True)
+
+ with c.override(
+ Materialized(
+ additional_system_parameter_defaults={
+ "arrangement_size_collection_interval": "500ms",
+ },
+ sanity_restart=False,
+ )
+ ):
+ c.up("materialized")
+
+ mv_ddls = "\n".join(
+ f"CREATE MATERIALIZED VIEW mv{i} IN CLUSTER test AS "
+ f"SELECT a % {i + 1} AS k, SUM(b) AS s FROM t GROUP BY a % {i + 1};"
+ f"CREATE INDEX idx{i} IN CLUSTER test ON mv{i} (k);"
+ for i in range(1, 11)
+ )
+ c.sql(
+ dedent(
+ f"""
+ CREATE CLUSTER test SIZE 'scale=1,workers=1', REPLICATION FACTOR {num_replicas};
+ CREATE TABLE t (a int, b int);
+ INSERT INTO t SELECT g, g % 100 FROM generate_series(1, 100000) g;
+ {mv_ddls}
+ """
+ )
+ )
+
+ def wait_for_full_sample() -> str:
+ deadline = time.time() + 120
+ while time.time() < deadline:
+ rows = c.sql_query(
+ f"""
+ SELECT max(collection_timestamp)::text FROM (
+ SELECT collection_timestamp
+ FROM mz_internal.mz_object_arrangement_size_history h
+ JOIN mz_objects o ON o.id = h.object_id
+ WHERE o.name IN {name_filter}
+ GROUP BY collection_timestamp
+ HAVING count(*) = {expected_count}
+ ) AS s
+ """
+ )
+ if rows and rows[0][0] is not None:
+ return rows[0][0]
+ time.sleep(0.5)
+ raise UIError(
+ f"timed out waiting for a sample with {expected_count} rows"
+ )
+
+ all_partials: list[tuple[int, str, int]] = []
+ for round_idx in range(restart_rounds):
+ pre_
restart_ts = wait_for_full_sample()
+
+ c.kill("materialized")
+ c.up("materialized")
+
+ time.sleep(8)
+
+ samples = c.sql_query(
+ f"""
+ SELECT collection_timestamp::text, count(*)
+ FROM mz_internal.mz_object_arrangement_size_history h
+ JOIN mz_objects o ON o.id = h.object_id
+ WHERE o.name IN {name_filter}
+ AND collection_timestamp > '{pre_restart_ts}'::timestamptz
+ GROUP BY collection_timestamp
+ ORDER BY collection_timestamp
+ """
+ )
+ assert (
+ samples
+ ), f"round {round_idx}: no post-restart collection_timestamps found"
+
+ for ts, cnt in samples:
+ if cnt != expected_count:
+ all_partials.append((round_idx, ts, cnt))
+
+ assert not all_partials, (
+ f"{len(all_partials)} post-restart samples (across {restart_rounds} "
+ f"rounds) had row counts != {expected_count}: {all_partials}"
+ )
+
+
def workflow_default(c: Composition) -> None:
def process(name: str) -> None:
if name == "default":We can run bin/mzcompose --find restart down && bin/mzcompose --find restart run arrangement-sizes-hydration-gate:
==> mzcompose: test case workflow-arrangement-sizes-hydration-gate failed: builtins.AssertionError: 5 post-restart samples (across 5 rounds) had row counts != 40: [(1, '2026-04-23 04:24:37.167+00', 20), (2, '2026-04-23 04:24:55.667+00', 19), (2, '2026-04-23 04:24:56.408+00', 38), (3, '2026-04-23 04:25:11.167+00', 3), (3, '2026-04-23 04:25:11.877+00', 7)]
Traceback (most recent call last):
File "/home/deen/git/materialize2/misc/python/materialize/mzcompose/composition.py", line 696, in test_case
yield
File "/home/deen/git/materialize2/misc/python/materialize/cli/mzcompose.py", line 857, in handle_composition
composition.workflow(
~~~~~~~~~~~~~~~~~~~~^
workflow_name, *args.unknown_subargs[1:], *extra_args
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/home/deen/git/materialize2/misc/python/materialize/mzcompose/composition.py", line 584, in workflow
func(self)
~~~~^^^^^^
File "/home/deen/git/materialize2/test/restart/mzcompose.py", line 1074, in workflow_arrangement_sizes_hydration_gate
assert not all_partials, (
^^^^^^^^^^^^^^^^
AssertionError: 5 post-restart samples (across 5 rounds) had row counts != 40: [(1, '2026-04-23 04:24:37.167+00', 20), (2, '2026-04-23 04:24:55.667+00', 19), (2, '2026-04-23 04:24:56.408+00', 38), (3, '2026-04-23 04:25:11.167+00', 3), (3, '2026-04-23 04:25:11.877+00', 7)]
mzcompose: error: at least one test case failed
antiguru
left a comment
There was a problem hiding this comment.
Left some comments inline. I wonder about the < 10Mib/> 10MiB split. Small objects can have just as much churn as larger ones, why treat them differently? What is the problem we're trying to solve for?
| SELECT operator_id FROM mz_introspection.mz_arrangement_heap_size_raw | ||
| UNION ALL | ||
| SELECT operator_id FROM mz_introspection.mz_arrangement_batcher_size_raw |
There was a problem hiding this comment.
It's a bit unfortunate you'll need to go through the raw collections, but right now this seems to be the best approach. The views we define on top of the raw collections do more than what you need here, and certainly will be more expensive to maintain.
| SELECT operator_id FROM mz_introspection.mz_arrangement_batcher_size_raw | ||
| ) AS rs ON rs.operator_id = od.operator_id | ||
| GROUP BY ce.export_id | ||
| OPTIONS (AGGREGATE INPUT GROUP SIZE = 1000) |
There was a problem hiding this comment.
Count don't need the aggregate input group size annotation (it's only for hierarchical reductions, not for simple ones.)
| introspection_type: IntrospectionType::ComputeObjectArrangementSizes, | ||
| sql: "SUBSCRIBE ( | ||
| SELECT | ||
| ce.export_id AS object_id, |
There was a problem hiding this comment.
Note this bakes in an assumption that's not guaranteed to be true in the future, which is that one dataflow has one export. If it'd change, we'd double-count. Just calling it out. Once it happens, we'd need to re-think what the granularity for this information is instead.
| // Objects smaller than 10 MiB report exact bytes; larger ones are rounded | ||
| // to the nearest 10 MiB to suppress byte-level churn in the collection. |
There was a problem hiding this comment.
This will have a lot of noise independently of the 10MiB gate. Running the subscribe on an otherwise empty cluster shows an update for at least one export every second. You could filter temporary exports t*, as they won't be reflected anywhere, but that doesn't solve the problem.
| let interval_duration = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_COLLECTION_INTERVAL | ||
| .get(self.catalog().system_config().dyncfgs()); |
There was a problem hiding this comment.
Make sure the collection task reacts to change to the interval (without going through an envd restart)
| if diff != 1 { | ||
| return None; | ||
| } | ||
| let datums = row.unpack(); |
There was a problem hiding this comment.
Please use a DatumVecBorrow. Row::unpack needs to allocate.
| // the full collection interval, so the first snapshot can lag | ||
| // hydration by up to one interval. | ||
| if !self.arrangement_sizes_hydration_observed { | ||
| if !self.check_arrangement_sizes_hydration().await { |
There was a problem hiding this comment.
In the limit this'll mean we never write any data as there can always be an object somewhere that isn't hydrated. This checks for all objects, correct? Alternatively, just write down the size for objects that are hydrated.
Why gate on hydration status?
| if *diff != 1 { | ||
| continue; | ||
| } | ||
| let datums = row.unpack(); |
There was a problem hiding this comment.
Use a DatumVecBorrow here too.
| /// Interval at which to collect per-object arrangement size snapshots for the history table. | ||
| pub const ARRANGEMENT_SIZE_COLLECTION_INTERVAL: Config<Duration> = Config::new( | ||
| "arrangement_size_collection_interval", | ||
| Duration::from_secs(60 * 60), |
There was a problem hiding this comment.
| Duration::from_secs(60 * 60), | |
| Duration::from_hours(1), |
| /// How long to retain per-object arrangement size history. | ||
| pub const ARRANGEMENT_SIZE_RETENTION_PERIOD: Config<Duration> = Config::new( | ||
| "arrangement_size_retention_period", | ||
| Duration::from_secs(7 * 24 * 60 * 60), |
There was a problem hiding this comment.
| Duration::from_secs(7 * 24 * 60 * 60), | |
| Duration::from_hours(7 * 24), |
| Duration::from_secs(7 * 24 * 60 * 60), | ||
| "How long to retain per-object arrangement size history.", | ||
| ); | ||
|
|
There was a problem hiding this comment.
Add a sentinel value (0s?) to disable the collection.
Motivation
We need to query per object memory across clusters and replicas from the catalog for Maintained Objects UI. There is no performant way of doing that without setting up session variables for each query against introspection. This allows the console to get all the object arrangements in the current moment and get historical data as well. This would also help make some other introspection queries querying object arrangements more simple in other parts of the console.
Design doc: doc/developer/design/20260331_unified_object_arrangement_sizes.md.
Linear issue: https://linear.app/materializeinc/issue/CNS-42/per-object-memory-arrangement-for-maintained-objects-in-the-console
Description
Adds per-object arrangement size tracking, following the introspection
subscribe pattern (same as
mz_compute_hydration_timesandmz_compute_error_counts_raw_unified): a livemz_internal.mz_object_arrangement_sizessource populated by aper-replica subscribe, and a
mz_internal.mz_object_arrangement_size_historytable that accumulates hourly snapshots (7-day retention). Both are
queryable from
mz_catalog_serverwith no session variables or clustertargeting.
Catalog surface
mz_internal.mz_object_arrangement_sizes(live) and builtin tablemz_internal.mz_object_arrangement_size_history(with indexes onobject_idandcollection_timestamp).arrangement_size_collection_interval(1h default) andarrangement_size_retention_period(7d default).Runtime
through the storage controller via a new
IntrospectionTypevariant.rows to the history table. Fires at the configured interval with a
per-environment offset so fleets of envs don't collect in lockstep.
period. A migration guard prevents the history table from being
truncated by schema migration, which would confuse the pruner.
objects have hydrated at least once, so early rows can't capture
half-built arrangements.
mz_arrangement_sizes_collection_time_seconds(histogram) andmz_arrangement_sizes_rows_written_total(counter).Tests
Verification
Verified with 10 Rust unit tests (pruner cutoff + hydration check), a
sqllogictest covering the 10 MiB quantization boundary, and a testdrive
end-to-end (live + history + drop retraction). Also ran locally against
a Postgres source with two replicas and confirmed ~300 history rows
accumulate with stable sizes and advancing
collection_timestamp.Verified with 10 Rust unit tests (pruner cutoff + hydration check), a
sqllogictest covering the 10 MiB quantization boundary, and a testdrive
end-to-end (live + history + drop retraction). Also ran locally against
a Postgres source with two replicas and confirmed ~300 history rows
accumulate with stable sizes and advancing
collection_timestamp.Tips For reviewing:
I would recommend reviewing it per commit