Skip to content

Commit 1b5d737

Browse files
committed
dd hydration gate and tests for arrangement-size collection
* Gate the first snapshot on every compute object being hydrated, so early history rows don't record partial sizes. Sticky flag. * Extract pruner cutoff and hydration check into pure helpers, each covered by new unit tests. * New sqllogictest for the 10 MiB quantization formula. * New testdrive for live + history behavior, plus a 2s cadence default in the testdrive harness so it fits the timeout.
1 parent 9eeb694 commit 1b5d737

7 files changed

Lines changed: 416 additions & 27 deletions

File tree

src/adapter/src/coord.rs

Lines changed: 98 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1931,6 +1931,11 @@ pub struct Coordinator {
19311931
/// The interval at which to collect storage usage information.
19321932
storage_usage_collection_interval: Duration,
19331933

1934+
/// Set once all compute objects have been observed as hydrated, gating
1935+
/// the first write into `mz_object_arrangement_size_history`. Sticky:
1936+
/// later partial re-hydrations (e.g. replica restart) don't re-arm it.
1937+
arrangement_sizes_hydration_observed: bool,
1938+
19341939
/// Segment analytics client.
19351940
#[derivative(Debug = "ignore")]
19361941
segment_client: Option<mz_segment::Client>,
@@ -4274,27 +4279,11 @@ impl Coordinator {
42744279
differential_dataflow::consolidation::consolidate(&mut current_contents);
42754280

42764281
let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
4277-
let mut expired = Vec::new();
4278-
for (row, diff) in current_contents {
4279-
assert_eq!(
4280-
diff, 1,
4281-
"consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4282-
);
4283-
// Column 3 is `collection_timestamp`.
4284-
let collection_timestamp = row
4285-
.unpack()
4286-
.get(3)
4287-
.expect("definition of mz_object_arrangement_size_history changed")
4288-
.unwrap_timestamptz();
4289-
let collection_timestamp = collection_timestamp.timestamp_millis();
4290-
let collection_timestamp: u128 = collection_timestamp
4291-
.try_into()
4292-
.expect("all collections happen after Jan 1 1970");
4293-
if collection_timestamp < cutoff_ts {
4294-
let builtin_update = BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE);
4295-
expired.push(builtin_update);
4296-
}
4297-
}
4282+
let expired = arrangement_sizes_expired_retractions(
4283+
current_contents,
4284+
cutoff_ts,
4285+
item_id,
4286+
);
42984287

42994288
// TODO(arrangement-sizes): when the writeable-catalog-server
43004289
// plumbing in https://github.com/MaterializeInc/materialize/pull/35436
@@ -4322,6 +4311,40 @@ impl Coordinator {
43224311
}
43234312
}
43244313

4314+
/// Returns retraction updates for rows in a consolidated
4315+
/// `mz_object_arrangement_size_history` snapshot whose `collection_timestamp`
4316+
/// (column 3) is strictly before `cutoff_ts`.
4317+
///
4318+
/// Panics if any input row has `diff != 1`: the caller must consolidate first,
4319+
/// and a consolidated history table should never contain retractions because
4320+
/// the only source of retractions is this function itself.
4321+
fn arrangement_sizes_expired_retractions(
4322+
rows: impl IntoIterator<Item = (mz_repr::Row, i64)>,
4323+
cutoff_ts: u128,
4324+
item_id: CatalogItemId,
4325+
) -> Vec<BuiltinTableUpdate> {
4326+
let mut expired = Vec::new();
4327+
for (row, diff) in rows {
4328+
assert_eq!(
4329+
diff, 1,
4330+
"consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
4331+
);
4332+
let collection_timestamp = row
4333+
.unpack()
4334+
.get(3)
4335+
.expect("definition of mz_object_arrangement_size_history changed")
4336+
.unwrap_timestamptz()
4337+
.timestamp_millis();
4338+
let collection_timestamp: u128 = collection_timestamp
4339+
.try_into()
4340+
.expect("all collections happen after Jan 1 1970");
4341+
if collection_timestamp < cutoff_ts {
4342+
expired.push(BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE));
4343+
}
4344+
}
4345+
expired
4346+
}
4347+
43254348
#[cfg(test)]
43264349
impl Coordinator {
43274350
#[allow(dead_code)]
@@ -4783,6 +4806,7 @@ pub fn serve(
47834806
cloud_resource_controller,
47844807
storage_usage_client,
47854808
storage_usage_collection_interval,
4809+
arrangement_sizes_hydration_observed: false,
47864810
segment_client,
47874811
metrics,
47884812
optimizer_metrics,
@@ -5248,3 +5272,56 @@ mod id_pool_tests {
52485272
pool.refill(10, 5);
52495273
}
52505274
}
5275+
5276+
#[cfg(test)]
5277+
mod arrangement_sizes_pruner_tests {
5278+
use mz_repr::catalog_item_id::CatalogItemId;
5279+
use mz_repr::{Datum, Row};
5280+
5281+
use super::arrangement_sizes_expired_retractions;
5282+
5283+
// Pack a row shaped like `mz_object_arrangement_size_history`: the pruner
5284+
// only cares about column 3 (`collection_timestamp`), but we stuff the
5285+
// other three columns with realistic values so shape changes would fail.
5286+
fn history_row(ts_ms: i64) -> Row {
5287+
let dt = mz_ore::now::to_datetime(ts_ms.try_into().expect("non-negative"));
5288+
Row::pack_slice(&[
5289+
Datum::String("r1"),
5290+
Datum::String("u1"),
5291+
Datum::Int64(123),
5292+
Datum::TimestampTz(dt.try_into().expect("fits in TimestampTz")),
5293+
])
5294+
}
5295+
5296+
fn item_id() -> CatalogItemId {
5297+
// Any CatalogItemId will do; tests don't dispatch on it.
5298+
CatalogItemId::User(42)
5299+
}
5300+
5301+
#[mz_ore::test]
5302+
fn empty_input_produces_no_retractions() {
5303+
let out = arrangement_sizes_expired_retractions(Vec::new(), 1_000, item_id());
5304+
assert!(out.is_empty());
5305+
}
5306+
5307+
#[mz_ore::test]
5308+
fn retracts_only_rows_strictly_before_cutoff() {
5309+
// Mixes both sides of the filter and includes a row at exactly
5310+
// the cutoff timestamp to pin down the strict-less-than boundary.
5311+
let rows = vec![
5312+
(history_row(100), 1),
5313+
(history_row(500), 1),
5314+
(history_row(1_000), 1), // at cutoff: kept (strict <)
5315+
(history_row(5_000), 1),
5316+
];
5317+
let out = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5318+
assert_eq!(out.len(), 2);
5319+
}
5320+
5321+
#[mz_ore::test]
5322+
#[should_panic(expected = "consolidated contents should not contain retractions")]
5323+
fn retraction_in_input_panics() {
5324+
let rows = vec![(history_row(100), -1)];
5325+
let _ = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
5326+
}
5327+
}

src/adapter/src/coord/introspection.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -566,11 +566,18 @@ const SUBSCRIBES: &[SubscribeSpec] = &[
566566
GROUP BY export_id, lir_id
567567
)",
568568
},
569-
// Per-object arrangement sizes via unified introspection subscribe.
570-
// Quantization strategy: objects <10MB show real size for precise monitoring;
571-
// objects ≥10MB round to nearest 10MB to suppress byte-level differential churn.
572-
// Uses mz_dataflow_addresses.address[1] to extract dataflow_id, avoiding an extra join.
573-
// Per-worker raw tables aggregate bytes across all workers for total replica memory per object.
569+
// Per-object arrangement sizes, one row per `(object_id, replica)`.
570+
//
571+
// `mz_arrangement_heap_size_raw` and `mz_arrangement_batcher_size_raw` are
572+
// differential logs where each `+1` row represents one byte of heap delta;
573+
// after consolidation, `COUNT(*)` is the current arrangement size in bytes.
574+
//
575+
// Objects smaller than 10 MiB report exact bytes; larger ones are rounded
576+
// to the nearest 10 MiB to suppress byte-level churn in the collection.
577+
//
578+
// `mz_dataflow_addresses.address[1]` is the root of each operator's address
579+
// tree, which equals the owning `dataflow_id` — so we can go addresses →
580+
// operator → dataflow without joining `mz_dataflow_operator_dataflows`.
574581
SubscribeSpec {
575582
introspection_type: IntrospectionType::ComputeObjectArrangementSizes,
576583
sql: "SUBSCRIBE (

src/adapter/src/coord/message_handler.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,19 @@ impl Coordinator {
411411
return;
412412
}
413413

414+
// Delay writes until every compute object has hydrated, so we
415+
// don't record partial sizes from still-building dataflows.
416+
// Sticky: later snapshots skip this check. Gate retries run at
417+
// the full collection interval, so the first snapshot can lag
418+
// hydration by up to one interval.
419+
if !self.arrangement_sizes_hydration_observed {
420+
if !self.check_arrangement_sizes_hydration().await {
421+
self.schedule_arrangement_sizes_collection().await;
422+
return;
423+
}
424+
self.arrangement_sizes_hydration_observed = true;
425+
}
426+
414427
let collection_timer = self
415428
.metrics
416429
.arrangement_sizes_collection_time_seconds
@@ -547,6 +560,32 @@ impl Coordinator {
547560
});
548561
}
549562

563+
/// Returns `true` when every row in `mz_compute_hydration_times` has a
564+
/// non-null `time_ns` (i.e. every compute object on every replica has
565+
/// finished its initial hydration). An empty collection also returns
566+
/// `true`. On snapshot failure, returns `false` so the caller retries.
567+
async fn check_arrangement_sizes_hydration(&self) -> bool {
568+
let item_id = self
569+
.catalog()
570+
.resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_COMPUTE_HYDRATION_TIMES);
571+
let global_id = self.catalog.get_entry(&item_id).latest_global_id();
572+
let read_ts = self.get_local_read_ts().await;
573+
let mut snapshot = match self
574+
.controller
575+
.storage_collections
576+
.snapshot(global_id, read_ts)
577+
.await
578+
{
579+
Ok(s) => s,
580+
Err(e) => {
581+
tracing::warn!("arrangement-sizes hydration gate snapshot failed: {e:?}");
582+
return false;
583+
}
584+
};
585+
differential_dataflow::consolidation::consolidate(&mut snapshot);
586+
arrangement_sizes_all_hydrated(&snapshot)
587+
}
588+
550589
#[mz_ore::instrument(level = "debug")]
551590
async fn message_command(&mut self, cmd: Command) {
552591
self.handle_command(cmd).await;
@@ -1028,3 +1067,88 @@ impl Coordinator {
10281067
}
10291068
}
10301069
}
1070+
1071+
/// Returns `true` when every `+1` row in a consolidated snapshot of
1072+
/// `mz_compute_hydration_times` has a non-null `time_ns` (column 2), i.e.
1073+
/// every compute object on every replica has finished hydrating at least
1074+
/// once. Empty input returns `true`. Unexpected row shape returns `false`
1075+
/// so the caller re-polls rather than crashing on schema drift.
1076+
fn arrangement_sizes_all_hydrated(snapshot: &[(mz_repr::Row, i64)]) -> bool {
1077+
const HYDRATION_COL_TIME_NS: usize = 2;
1078+
for (row, diff) in snapshot {
1079+
if *diff != 1 {
1080+
continue;
1081+
}
1082+
let datums = row.unpack();
1083+
let Some(time_ns) = datums.get(HYDRATION_COL_TIME_NS) else {
1084+
return false;
1085+
};
1086+
if time_ns.is_null() {
1087+
return false;
1088+
}
1089+
}
1090+
true
1091+
}
1092+
1093+
#[cfg(test)]
1094+
mod arrangement_sizes_hydration_tests {
1095+
use mz_repr::{Datum, Row};
1096+
1097+
use super::arrangement_sizes_all_hydrated;
1098+
1099+
// Columns: (replica_id, object_id, time_ns). `time_ns` is the value
1100+
// under test; the other columns are packed with realistic defaults so
1101+
// shape drift would surface.
1102+
fn hydration_row(time_ns: Option<u64>) -> Row {
1103+
Row::pack_slice(&[
1104+
Datum::String("r1"),
1105+
Datum::String("u1"),
1106+
match time_ns {
1107+
Some(ns) => Datum::UInt64(ns),
1108+
None => Datum::Null,
1109+
},
1110+
])
1111+
}
1112+
1113+
#[mz_ore::test]
1114+
fn empty_snapshot_is_hydrated() {
1115+
assert!(arrangement_sizes_all_hydrated(&[]));
1116+
}
1117+
1118+
#[mz_ore::test]
1119+
fn all_non_null_is_hydrated() {
1120+
let rows = vec![
1121+
(hydration_row(Some(100)), 1),
1122+
(hydration_row(Some(200)), 1),
1123+
];
1124+
assert!(arrangement_sizes_all_hydrated(&rows));
1125+
}
1126+
1127+
#[mz_ore::test]
1128+
fn any_null_blocks_hydration() {
1129+
let rows = vec![
1130+
(hydration_row(Some(100)), 1),
1131+
(hydration_row(None), 1),
1132+
(hydration_row(Some(300)), 1),
1133+
];
1134+
assert!(!arrangement_sizes_all_hydrated(&rows));
1135+
}
1136+
1137+
#[mz_ore::test]
1138+
fn retractions_are_ignored() {
1139+
// A -1 row represents stale state that consolidation would remove.
1140+
// We skip it so a retracted null doesn't veto hydration.
1141+
let rows = vec![
1142+
(hydration_row(None), -1),
1143+
(hydration_row(Some(100)), 1),
1144+
];
1145+
assert!(arrangement_sizes_all_hydrated(&rows));
1146+
}
1147+
1148+
#[mz_ore::test]
1149+
fn malformed_row_blocks_hydration() {
1150+
let malformed = Row::pack_slice(&[Datum::String("r1"), Datum::String("u1")]);
1151+
let rows = vec![(malformed, 1)];
1152+
assert!(!arrangement_sizes_all_hydrated(&rows));
1153+
}
1154+
}

src/catalog/src/builtin.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8684,6 +8684,9 @@ pub static MZ_OBJECT_ARRANGEMENT_SIZES_IND: LazyLock<BuiltinIndex> =
86848684
is_retained_metrics_object: true,
86858685
});
86868686

8687+
/// Identifies [`MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY`] for the schema-migration
8688+
/// guard in `builtin_schema_migration.rs`, which forbids migrating this table
8689+
/// because its startup pruner assumes it is the only source of retractions.
86878690
pub static MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION: LazyLock<SystemObjectDescription> =
86888691
LazyLock::new(|| SystemObjectDescription {
86898692
schema_name: MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY.schema.to_string(),
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the LICENSE file at the root of this repository.
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0.
9+
10+
# Exercises the `size` quantization formula used by the introspection subscribe
11+
# that populates `mz_internal.mz_object_arrangement_sizes`. The formula lives in
12+
# `src/adapter/src/coord/introspection.rs`.
13+
#
14+
# Semantics:
15+
# - byte counts below 10 MiB (10485760) are reported exactly
16+
# - byte counts >= 10 MiB round to the nearest 10 MiB boundary using
17+
# `((n + 5 MiB) / 10 MiB) * 10 MiB`
18+
19+
mode cockroach
20+
21+
# Applies the quantization formula to a spread of raw byte counts.
22+
statement ok
23+
CREATE VIEW arrangement_size_quantize AS
24+
SELECT
25+
n,
26+
CASE
27+
WHEN n < 10485760 THEN n::int8
28+
ELSE ((n + 5242880) / 10485760 * 10485760)::int8
29+
END AS size
30+
FROM (VALUES
31+
(0), -- zero bytes
32+
(1), -- tiny arrangement
33+
(10485759), -- one byte below the threshold: reported exactly
34+
(10485760), -- exactly 10 MiB: already on a boundary
35+
(10485761), -- one byte past threshold: rounds down to 10 MiB
36+
(15728640), -- 15 MiB (boundary midpoint): rounds up to 20 MiB
37+
(15728639), -- one byte below midpoint: rounds down to 10 MiB
38+
(20971520), -- 20 MiB: already on a boundary
39+
(104857600) -- 100 MiB: already on a boundary
40+
) AS v(n);
41+
42+
query II
43+
SELECT n, size FROM arrangement_size_quantize ORDER BY n
44+
----
45+
0 0
46+
1 1
47+
10485759 10485759
48+
10485760 10485760
49+
10485761 10485760
50+
15728639 10485760
51+
15728640 20971520
52+
20971520 20971520
53+
104857600 104857600

0 commit comments

Comments
 (0)