diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index b2f00961e5ce1..37eab4cf79094 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -392,6 +392,12 @@ def get_variable_system_parameters( VariableSystemParameter( "persist_state_update_lease_timeout", "1s", ["0s", "1s", "10s"] ), + VariableSystemParameter( + "arrangement_size_collection_interval", "1h", ["1s", "10s", "1h"] + ), + VariableSystemParameter( + "arrangement_size_retention_period", "7d", ["1m", "1h", "7d"] + ), VariableSystemParameter( "persist_validate_part_bounds_on_read", "false", ["true", "false"] ), diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index f80884e9a1845..7c4e9471a1cf5 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -1533,6 +1533,16 @@ def __init__( "'1s'", "'10s'", ] + self.flags_with_values["arrangement_size_collection_interval"] = [ + "'1s'", + "'10s'", + "'1h'", + ] + self.flags_with_values["arrangement_size_retention_period"] = [ + "'1m'", + "'1h'", + "'7d'", + ] # Note: it's not safe to re-enable this flag after writing with `persist_validate_part_bounds_on_write`, # since those new-style parts may fail our old-style validation. self.flags_with_values["persist_validate_part_bounds_on_read"] = ["FALSE"] diff --git a/src/adapter-types/src/dyncfgs.rs b/src/adapter-types/src/dyncfgs.rs index 4ed2e5a0f4cf4..0f2474d23513e 100644 --- a/src/adapter-types/src/dyncfgs.rs +++ b/src/adapter-types/src/dyncfgs.rs @@ -214,6 +214,20 @@ pub const CONSOLE_OIDC_SCOPES: Config<&'static str> = Config::new( "Space-separated OIDC scopes requested by the web console.", ); +/// Interval at which to collect per-object arrangement size snapshots for the history table. +pub const ARRANGEMENT_SIZE_COLLECTION_INTERVAL: Config = Config::new( + "arrangement_size_collection_interval", + Duration::from_secs(60 * 60), + "Interval at which to collect and snapshot per-object arrangement sizes.", +); + +/// How long to retain per-object arrangement size history. +pub const ARRANGEMENT_SIZE_RETENTION_PERIOD: Config = Config::new( + "arrangement_size_retention_period", + Duration::from_secs(7 * 24 * 60 * 60), + "How long to retain per-object arrangement size history.", +); + /// Adds the full set of all adapter `Config`s. pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs @@ -245,4 +259,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&USER_ID_POOL_BATCH_SIZE) .add(&CONSOLE_OIDC_CLIENT_ID) .add(&CONSOLE_OIDC_SCOPES) + .add(&ARRANGEMENT_SIZE_COLLECTION_INTERVAL) + .add(&ARRANGEMENT_SIZE_RETENTION_PERIOD) } diff --git a/src/adapter/src/catalog/open/builtin_schema_migration.rs b/src/adapter/src/catalog/open/builtin_schema_migration.rs index 1b8d32d902323..f7116174f9ab7 100644 --- a/src/adapter/src/catalog/open/builtin_schema_migration.rs +++ b/src/adapter/src/catalog/open/builtin_schema_migration.rs @@ -37,7 +37,8 @@ use futures::future::BoxFuture; use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO}; use mz_catalog::builtin::{ BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION, - MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL, + MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, + RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL, }; use mz_catalog::config::BuiltinItemMigrationConfig; use mz_catalog::durable::objects::SystemObjectUniqueIdentifier; @@ -486,6 +487,14 @@ impl Migration { "mz_storage_usage_by_shard cannot be migrated or else the table will be truncated" ); + // Same hazard as `mz_storage_usage_by_shard`: the startup pruner + // (`Coordinator::prune_arrangement_sizes_history_on_startup`) assumes it is + // the only source of retractions, so migration-driven truncation would break it. + assert_ne!( + &*MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, object, + "mz_object_arrangement_size_history cannot be migrated or else the table will be truncated" + ); + // `mz_catalog_raw` cannot be migrated because it contains the durable catalog and it // wouldn't be very durable if we allowed it to be truncated. assert_ne!( diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index c9c4c4c7cc409..4b3cebf4cadae 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -347,6 +347,9 @@ pub enum Message { StorageUsageFetch, StorageUsageUpdate(ShardsUsageReferenced), StorageUsagePrune(Vec), + ArrangementSizesSchedule, + ArrangementSizesSnapshot, + ArrangementSizesPrune(Vec), /// Performs any cleanup and logging actions necessary for /// finalizing a statement execution. RetireExecute { @@ -483,6 +486,9 @@ impl Message { Message::StorageUsageFetch => "storage_usage_fetch", Message::StorageUsageUpdate(_) => "storage_usage_update", Message::StorageUsagePrune(_) => "storage_usage_prune", + Message::ArrangementSizesSchedule => "arrangement_sizes_schedule", + Message::ArrangementSizesSnapshot => "arrangement_sizes_snapshot", + Message::ArrangementSizesPrune(_) => "arrangement_sizes_prune", Message::RetireExecute { .. } => "retire_execute", Message::ExecuteSingleStatementTransaction { .. } => { "execute_single_statement_transaction" @@ -1925,6 +1931,11 @@ pub struct Coordinator { /// The interval at which to collect storage usage information. storage_usage_collection_interval: Duration, + /// Set once all compute objects have been observed as hydrated, gating + /// the first write into `mz_object_arrangement_size_history`. Sticky: + /// later partial re-hydrations (e.g. replica restart) don't re-arm it. + arrangement_sizes_hydration_observed: bool, + /// Segment analytics client. #[derivative(Debug = "ignore")] segment_client: Option, @@ -3618,6 +3629,7 @@ impl Coordinator { }); self.schedule_storage_usage_collection().await; + self.schedule_arrangement_sizes_collection().await; self.spawn_privatelink_vpc_endpoints_watch_task(); self.spawn_statement_logging_task(); flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle); @@ -4234,6 +4246,49 @@ impl Coordinator { }); } + /// Retracts `mz_object_arrangement_size_history` rows older than the + /// `arrangement_size_retention_period` dyncfg. + /// + /// Must only run at startup: it reads at the oracle read timestamp and + /// writes retractions at the current write timestamp, which is only safe + /// when no other writes are in flight. See [the equivalent storage-usage + /// pruner](Self::prune_storage_usage_events_on_startup) for the same + /// reasoning. + async fn prune_arrangement_sizes_history_on_startup(&self) { + // The catalog server is not writable in read-only mode. + if self.controller.read_only() { + return; + } + + let retention_period = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_RETENTION_PERIOD + .get(self.catalog().system_config().dyncfgs()); + let item_id = self + .catalog() + .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY); + let global_id = self.catalog.get_entry(&item_id).latest_global_id(); + let read_ts = self.get_local_read_ts().await; + let current_contents_fut = self + .controller + .storage_collections + .snapshot(global_id, read_ts); + let internal_cmd_tx = self.internal_cmd_tx.clone(); + spawn(|| "arrangement_sizes_history_prune", async move { + let mut current_contents = current_contents_fut + .await + .unwrap_or_terminate("cannot fail to fetch snapshot"); + differential_dataflow::consolidation::consolidate(&mut current_contents); + + let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis()); + let expired = + arrangement_sizes_expired_retractions(current_contents, cutoff_ts, item_id); + + // TODO(arrangement-sizes): when the writeable-catalog-server + // plumbing in https://github.com/MaterializeInc/materialize/pull/35436 + // lands, retract directly on `mz_catalog_server`. + let _ = internal_cmd_tx.send(Message::ArrangementSizesPrune(expired)); + }); + } + fn current_credit_consumption_rate(&self) -> Numeric { self.catalog() .user_cluster_replicas() @@ -4253,6 +4308,40 @@ impl Coordinator { } } +/// Returns retraction updates for rows in a consolidated +/// `mz_object_arrangement_size_history` snapshot whose `collection_timestamp` +/// (column 3) is strictly before `cutoff_ts`. +/// +/// Panics if any input row has `diff != 1`: the caller must consolidate first, +/// and a consolidated history table should never contain retractions because +/// the only source of retractions is this function itself. +fn arrangement_sizes_expired_retractions( + rows: impl IntoIterator, + cutoff_ts: u128, + item_id: CatalogItemId, +) -> Vec { + let mut expired = Vec::new(); + for (row, diff) in rows { + assert_eq!( + diff, 1, + "consolidated contents should not contain retractions: ({row:#?}, {diff:#?})" + ); + let collection_timestamp = row + .unpack() + .get(3) + .expect("definition of mz_object_arrangement_size_history changed") + .unwrap_timestamptz() + .timestamp_millis(); + let collection_timestamp: u128 = collection_timestamp + .try_into() + .expect("all collections happen after Jan 1 1970"); + if collection_timestamp < cutoff_ts { + expired.push(BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE)); + } + } + expired +} + #[cfg(test)] impl Coordinator { #[allow(dead_code)] @@ -4714,6 +4803,7 @@ pub fn serve( cloud_resource_controller, storage_usage_client, storage_usage_collection_interval, + arrangement_sizes_hydration_observed: false, segment_client, metrics, optimizer_metrics, @@ -4760,6 +4850,8 @@ pub fn serve( .await; } + coord.prune_arrangement_sizes_history_on_startup().await; + Ok(()) }); let ok = bootstrap.is_ok(); @@ -5177,3 +5269,56 @@ mod id_pool_tests { pool.refill(10, 5); } } + +#[cfg(test)] +mod arrangement_sizes_pruner_tests { + use mz_repr::catalog_item_id::CatalogItemId; + use mz_repr::{Datum, Row}; + + use super::arrangement_sizes_expired_retractions; + + // Pack a row shaped like `mz_object_arrangement_size_history`: the pruner + // only cares about column 3 (`collection_timestamp`), but we stuff the + // other three columns with realistic values so shape changes would fail. + fn history_row(ts_ms: i64) -> Row { + let dt = mz_ore::now::to_datetime(ts_ms.try_into().expect("non-negative")); + Row::pack_slice(&[ + Datum::String("r1"), + Datum::String("u1"), + Datum::Int64(123), + Datum::TimestampTz(dt.try_into().expect("fits in TimestampTz")), + ]) + } + + fn item_id() -> CatalogItemId { + // Any CatalogItemId will do; tests don't dispatch on it. + CatalogItemId::User(42) + } + + #[mz_ore::test] + fn empty_input_produces_no_retractions() { + let out = arrangement_sizes_expired_retractions(Vec::new(), 1_000, item_id()); + assert!(out.is_empty()); + } + + #[mz_ore::test] + fn retracts_only_rows_strictly_before_cutoff() { + // Mixes both sides of the filter and includes a row at exactly + // the cutoff timestamp to pin down the strict-less-than boundary. + let rows = vec![ + (history_row(100), 1), + (history_row(500), 1), + (history_row(1_000), 1), // at cutoff: kept (strict <) + (history_row(5_000), 1), + ]; + let out = arrangement_sizes_expired_retractions(rows, 1_000, item_id()); + assert_eq!(out.len(), 2); + } + + #[mz_ore::test] + #[should_panic(expected = "consolidated contents should not contain retractions")] + fn retraction_in_input_panics() { + let rows = vec![(history_row(100), -1)]; + let _ = arrangement_sizes_expired_retractions(rows, 1_000, item_id()); + } +} diff --git a/src/adapter/src/coord/introspection.rs b/src/adapter/src/coord/introspection.rs index 1d9a5f638529b..5b87d8a3d078b 100644 --- a/src/adapter/src/coord/introspection.rs +++ b/src/adapter/src/coord/introspection.rs @@ -566,4 +566,39 @@ const SUBSCRIBES: &[SubscribeSpec] = &[ GROUP BY export_id, lir_id )", }, + // Per-object arrangement sizes, one row per `(object_id, replica)`. + // + // `mz_arrangement_heap_size_raw` and `mz_arrangement_batcher_size_raw` are + // differential logs where each `+1` row represents one byte of heap delta; + // after consolidation, `COUNT(*)` is the current arrangement size in bytes. + // + // Objects smaller than 10 MiB report exact bytes; larger ones are rounded + // to the nearest 10 MiB to suppress byte-level churn in the collection. + // + // `mz_dataflow_addresses.address[1]` is the root of each operator's address + // tree, which equals the owning `dataflow_id` — so we can go addresses → + // operator → dataflow without joining `mz_dataflow_operator_dataflows`. + SubscribeSpec { + introspection_type: IntrospectionType::ComputeObjectArrangementSizes, + sql: "SUBSCRIBE ( + SELECT + ce.export_id AS object_id, + CASE + WHEN COUNT(*) < 10485760 THEN COUNT(*)::int8 + ELSE ((COUNT(*) + 5242880) / 10485760 * 10485760)::int8 + END AS size + FROM mz_introspection.mz_compute_exports AS ce + JOIN ( + SELECT addrs.address[1] AS dataflow_id, addrs.id AS operator_id + FROM mz_introspection.mz_dataflow_addresses addrs + ) AS od ON od.dataflow_id = ce.dataflow_id + JOIN ( + SELECT operator_id FROM mz_introspection.mz_arrangement_heap_size_raw + UNION ALL + SELECT operator_id FROM mz_introspection.mz_arrangement_batcher_size_raw + ) AS rs ON rs.operator_id = od.operator_id + GROUP BY ce.export_id + OPTIONS (AGGREGATE INPUT GROUP SIZE = 1000) + )", + }, ]; diff --git a/src/adapter/src/coord/message_handler.rs b/src/adapter/src/coord/message_handler.rs index 1f76f452b605b..e541353f9d58e 100644 --- a/src/adapter/src/coord/message_handler.rs +++ b/src/adapter/src/coord/message_handler.rs @@ -18,6 +18,7 @@ use maplit::btreemap; use mz_catalog::memory::objects::ClusterReplicaProcessStatus; use mz_controller::ControllerResponse; use mz_controller::clusters::{ClusterEvent, ClusterStatus}; +use mz_ore::cast::CastFrom; use mz_ore::instrument; use mz_ore::now::EpochMillis; use mz_ore::option::OptionExt; @@ -130,6 +131,17 @@ impl Coordinator { Message::StorageUsagePrune(expired) => { self.storage_usage_prune(expired).boxed_local().await; } + Message::ArrangementSizesSchedule => { + self.schedule_arrangement_sizes_collection() + .boxed_local() + .await; + } + Message::ArrangementSizesSnapshot => { + self.arrangement_sizes_snapshot().boxed_local().await; + } + Message::ArrangementSizesPrune(expired) => { + self.arrangement_sizes_prune(expired).boxed_local().await; + } Message::RetireExecute { otel_ctx, data, @@ -340,6 +352,238 @@ impl Coordinator { }); } + /// Schedules the next per-object arrangement sizes snapshot. + /// + /// Re-reads the interval dyncfg on every call so operators can retune + /// cadence without a restart. Aligns to an `organization_id`-seeded offset + /// within the interval so collections stay consistent across restarts and + /// don't synchronize across environments. + pub async fn schedule_arrangement_sizes_collection(&self) { + let interval_duration = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_COLLECTION_INTERVAL + .get(self.catalog().system_config().dyncfgs()); + + const SEED_LEN: usize = 32; + let mut seed = [0; SEED_LEN]; + for (i, byte) in self + .catalog() + .state() + .config() + .environment_id + .organization_id() + .as_bytes() + .into_iter() + .take(SEED_LEN) + .enumerate() + { + seed[i] = *byte; + } + let interval_ms: EpochMillis = EpochMillis::try_from(interval_duration.as_millis()) + .expect("arrangement_size_collection_interval must fit into u64"); + // `rand::random_range` panics on an empty range. + let interval_ms = interval_ms.max(1); + let offset = rngs::SmallRng::from_seed(seed).random_range(0..interval_ms); + let now_ts: EpochMillis = self.peek_local_write_ts().await.into(); + + let previous_collection_ts = (now_ts - (now_ts % interval_ms)) + offset; + let next_collection_ts = if previous_collection_ts > now_ts { + previous_collection_ts + } else { + previous_collection_ts + interval_ms + }; + let sleep_for = Duration::from_millis(next_collection_ts - now_ts); + + let internal_cmd_tx = self.internal_cmd_tx.clone(); + task::spawn(|| "arrangement_sizes_collection", async move { + tokio::time::sleep(sleep_for).await; + // Best-effort: if the coordinator is shutting down, just drop. + let _ = internal_cmd_tx.send(Message::ArrangementSizesSnapshot); + }); + } + + /// Snapshots the current contents of `mz_object_arrangement_sizes` and + /// appends them to `mz_object_arrangement_size_history`, tagged with a + /// shared `collection_timestamp`. Reschedules on completion. + #[mz_ore::instrument(level = "debug")] + async fn arrangement_sizes_snapshot(&mut self) { + // The catalog server is not writable in read-only mode. + if self.controller.read_only() { + self.schedule_arrangement_sizes_collection().await; + return; + } + + // Delay writes until every compute object has hydrated, so we + // don't record partial sizes from still-building dataflows. + // Sticky: later snapshots skip this check. Gate retries run at + // the full collection interval, so the first snapshot can lag + // hydration by up to one interval. + if !self.arrangement_sizes_hydration_observed { + if !self.check_arrangement_sizes_hydration().await { + self.schedule_arrangement_sizes_collection().await; + return; + } + self.arrangement_sizes_hydration_observed = true; + } + + let collection_timer = self + .metrics + .arrangement_sizes_collection_time_seconds + .start_timer(); + + let live_item_id = self.catalog().resolve_builtin_storage_collection( + &mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZES_UNIFIED, + ); + let live_global_id = self.catalog.get_entry(&live_item_id).latest_global_id(); + let history_item_id = self + .catalog() + .resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY); + + let read_ts = self.get_local_read_ts().await; + let snapshot = match self + .controller + .storage_collections + .snapshot(live_global_id, read_ts) + .await + { + Ok(s) => s, + Err(e) => { + tracing::warn!("arrangement sizes snapshot failed: {e:?}"); + drop(collection_timer); + self.schedule_arrangement_sizes_collection().await; + return; + } + }; + + // `collection_ts` is stamped after the snapshot so it's always >= the + // state the rows describe, and monotone across restarts. The snapshot + // read and this stamp aren't atomic, but the resulting skew is bounded + // by snapshot latency and negligible at this cadence. + let collection_ts: EpochMillis = self.get_local_write_ts().await.timestamp.into(); + let collection_datum = Datum::TimestampTz( + mz_ore::now::to_datetime(collection_ts) + .try_into() + .expect("collection_timestamp must fit into TimestampTz"), + ); + + let mut consolidated = snapshot; + differential_dataflow::consolidation::consolidate(&mut consolidated); + + // Column positions in `mz_object_arrangement_sizes`. + const LIVE_COL_REPLICA_ID: usize = 0; + const LIVE_COL_OBJECT_ID: usize = 1; + const LIVE_COL_SIZE: usize = 2; + const LIVE_COL_COUNT: usize = 3; + + let mut skipped_malformed: u64 = 0; + let mut skipped_null_size: u64 = 0; + let updates: Vec = consolidated + .into_iter() + .filter_map(|(row, diff)| { + if diff != 1 { + return None; + } + let datums = row.unpack(); + // Surface schema drift via a warn log below rather than silently + // skipping entire snapshots. + if datums.len() != LIVE_COL_COUNT { + skipped_malformed += 1; + return None; + } + let replica_id = datums[LIVE_COL_REPLICA_ID].unwrap_str(); + let object_id = datums[LIVE_COL_OBJECT_ID].unwrap_str(); + let size_datum = datums[LIVE_COL_SIZE]; + // The history table's `size` is non-null; fabricating zero would + // be misleading, so drop. + if size_datum.is_null() { + skipped_null_size += 1; + return None; + } + let size = size_datum.unwrap_int64(); + let new_row = Row::pack_slice(&[ + Datum::String(replica_id), + Datum::String(object_id), + Datum::Int64(size), + collection_datum, + ]); + Some(BuiltinTableUpdate::row(history_item_id, new_row, Diff::ONE)) + }) + .collect(); + if skipped_malformed > 0 { + warn!( + "mz_object_arrangement_sizes schema drift: skipped {skipped_malformed} rows \ + with unexpected arity" + ); + } + if skipped_null_size > 0 { + tracing::debug!("skipped {skipped_null_size} live rows with null size"); + } + + let row_count = updates.len(); + // Captures snapshot + row construction. The async table-apply below + // is captured separately by `mz_append_table_duration_seconds`. + collection_timer.observe_duration(); + + if !updates.is_empty() { + self.metrics + .arrangement_sizes_rows_written + .inc_by(u64::cast_from(row_count)); + // TODO(arrangement-sizes): when the writeable-catalog-server plumbing + // in https://github.com/MaterializeInc/materialize/pull/35436 lands, + // append directly on `mz_catalog_server` instead of going through + // the environmentd builtin-table-update path. + let (fut, _) = self.builtin_table_update().execute(updates).await; + let internal_cmd_tx = self.internal_cmd_tx.clone(); + let task_span = + info_span!(parent: None, "coord::arrangement_sizes_snapshot::table_updates"); + OpenTelemetryContext::obtain().attach_as_parent_to(&task_span); + task::spawn(|| "arrangement_sizes_snapshot_apply", async move { + fut.instrument(task_span).await; + if let Err(e) = internal_cmd_tx.send(Message::ArrangementSizesSchedule) { + warn!("internal_cmd_rx dropped before we could send: {e:?}"); + } + }); + } else { + self.schedule_arrangement_sizes_collection().await; + } + + tracing::debug!( + "appended {row_count} rows to mz_object_arrangement_size_history at ts {collection_ts}" + ); + } + + #[mz_ore::instrument(level = "debug")] + async fn arrangement_sizes_prune(&mut self, expired: Vec) { + let (fut, _) = self.builtin_table_update().execute(expired).await; + task::spawn(|| "arrangement_sizes_pruning_apply", async move { + fut.await; + }); + } + + /// Returns `true` when every row in `mz_compute_hydration_times` has a + /// non-null `time_ns` (i.e. every compute object on every replica has + /// finished its initial hydration). An empty collection also returns + /// `true`. On snapshot failure, returns `false` so the caller retries. + async fn check_arrangement_sizes_hydration(&self) -> bool { + let item_id = self + .catalog() + .resolve_builtin_storage_collection(&mz_catalog::builtin::MZ_COMPUTE_HYDRATION_TIMES); + let global_id = self.catalog.get_entry(&item_id).latest_global_id(); + let read_ts = self.get_local_read_ts().await; + let mut snapshot = match self + .controller + .storage_collections + .snapshot(global_id, read_ts) + .await + { + Ok(s) => s, + Err(e) => { + tracing::warn!("arrangement-sizes hydration gate snapshot failed: {e:?}"); + return false; + } + }; + differential_dataflow::consolidation::consolidate(&mut snapshot); + arrangement_sizes_all_hydrated(&snapshot) + } + #[mz_ore::instrument(level = "debug")] async fn message_command(&mut self, cmd: Command) { self.handle_command(cmd).await; @@ -821,3 +1065,82 @@ impl Coordinator { } } } + +/// Returns `true` when every `+1` row in a consolidated snapshot of +/// `mz_compute_hydration_times` has a non-null `time_ns` (column 2), i.e. +/// every compute object on every replica has finished hydrating at least +/// once. Empty input returns `true`. Unexpected row shape returns `false` +/// so the caller re-polls rather than crashing on schema drift. +fn arrangement_sizes_all_hydrated(snapshot: &[(mz_repr::Row, i64)]) -> bool { + const HYDRATION_COL_TIME_NS: usize = 2; + for (row, diff) in snapshot { + if *diff != 1 { + continue; + } + let datums = row.unpack(); + let Some(time_ns) = datums.get(HYDRATION_COL_TIME_NS) else { + return false; + }; + if time_ns.is_null() { + return false; + } + } + true +} + +#[cfg(test)] +mod arrangement_sizes_hydration_tests { + use mz_repr::{Datum, Row}; + + use super::arrangement_sizes_all_hydrated; + + // Columns: (replica_id, object_id, time_ns). `time_ns` is the value + // under test; the other columns are packed with realistic defaults so + // shape drift would surface. + fn hydration_row(time_ns: Option) -> Row { + Row::pack_slice(&[ + Datum::String("r1"), + Datum::String("u1"), + match time_ns { + Some(ns) => Datum::UInt64(ns), + None => Datum::Null, + }, + ]) + } + + #[mz_ore::test] + fn empty_snapshot_is_hydrated() { + assert!(arrangement_sizes_all_hydrated(&[])); + } + + #[mz_ore::test] + fn all_non_null_is_hydrated() { + let rows = vec![(hydration_row(Some(100)), 1), (hydration_row(Some(200)), 1)]; + assert!(arrangement_sizes_all_hydrated(&rows)); + } + + #[mz_ore::test] + fn any_null_blocks_hydration() { + let rows = vec![ + (hydration_row(Some(100)), 1), + (hydration_row(None), 1), + (hydration_row(Some(300)), 1), + ]; + assert!(!arrangement_sizes_all_hydrated(&rows)); + } + + #[mz_ore::test] + fn retractions_are_ignored() { + // A -1 row represents stale state that consolidation would remove. + // We skip it so a retracted null doesn't veto hydration. + let rows = vec![(hydration_row(None), -1), (hydration_row(Some(100)), 1)]; + assert!(arrangement_sizes_all_hydrated(&rows)); + } + + #[mz_ore::test] + fn malformed_row_blocks_hydration() { + let malformed = Row::pack_slice(&[Datum::String("r1"), Datum::String("u1")]); + let rows = vec![(malformed, 1)]; + assert!(!arrangement_sizes_all_hydrated(&rows)); + } +} diff --git a/src/adapter/src/metrics.rs b/src/adapter/src/metrics.rs index c6ea0fa94e39d..2333fe4543dfd 100644 --- a/src/adapter/src/metrics.rs +++ b/src/adapter/src/metrics.rs @@ -27,6 +27,8 @@ pub struct Metrics { pub timestamp_difference_for_strict_serializable_ms: HistogramVec, pub commands: IntCounterVec, pub storage_usage_collection_time_seconds: Histogram, + pub arrangement_sizes_collection_time_seconds: Histogram, + pub arrangement_sizes_rows_written: IntCounter, pub subscribe_outputs: IntCounterVec, pub canceled_peeks: IntCounter, pub linearize_message_seconds: HistogramVec, @@ -105,6 +107,15 @@ impl Metrics { help: "The number of seconds the coord spends collecting usage metrics from storage.", buckets: histogram_seconds_buckets(0.000_128, 8.0) )), + arrangement_sizes_collection_time_seconds: registry.register(metric!( + name: "mz_arrangement_sizes_collection_time_seconds", + help: "Seconds to read mz_object_arrangement_sizes and prepare history-table updates for one snapshot.", + buckets: histogram_seconds_buckets(0.000_128, 8.0) + )), + arrangement_sizes_rows_written: registry.register(metric!( + name: "mz_arrangement_sizes_rows_written_total", + help: "Total rows appended to mz_object_arrangement_size_history since process start.", + )), subscribe_outputs: registry.register(metric!( name: "mz_subscribe_outputs", help: "The total number of different subscribe outputs used", diff --git a/src/catalog/src/builtin.rs b/src/catalog/src/builtin.rs index c959eebb41fca..fed12e31d9f99 100644 --- a/src/catalog/src/builtin.rs +++ b/src/catalog/src/builtin.rs @@ -8641,6 +8641,119 @@ pub static MZ_COMPUTE_HYDRATION_TIMES_IND: LazyLock = is_retained_metrics_object: true, }); +pub static MZ_OBJECT_ARRANGEMENT_SIZES_UNIFIED: LazyLock = LazyLock::new(|| { + BuiltinSource { + name: "mz_object_arrangement_sizes", + schema: MZ_INTERNAL_SCHEMA, + oid: oid::SOURCE_MZ_OBJECT_ARRANGEMENT_SIZES_OID, + desc: RelationDesc::builder() + .with_column("replica_id", SqlScalarType::String.nullable(false)) + .with_column("object_id", SqlScalarType::String.nullable(false)) + .with_column("size", SqlScalarType::Int64.nullable(true)) + .finish(), + data_source: IntrospectionType::ComputeObjectArrangementSizes.into(), + column_comments: BTreeMap::from_iter([ + ( + "replica_id", + "The ID of the cluster replica. Corresponds to `mz_cluster_replicas.id`.", + ), + ( + "object_id", + "The ID of the compute object (index or materialized view). Corresponds to `mz_objects.id`.", + ), + ( + "size", + "The total arrangement heap and batcher size in bytes for this object on this replica. \ + Objects smaller than 10 MiB are reported at their exact size; objects 10 MiB or larger \ + are rounded to the nearest 10 MiB boundary to reduce per-byte churn in the differential \ + collection.", + ), + ]), + is_retained_metrics_object: true, + access: vec![PUBLIC_SELECT], + } +}); + +pub static MZ_OBJECT_ARRANGEMENT_SIZES_IND: LazyLock = + LazyLock::new(|| BuiltinIndex { + name: "mz_object_arrangement_sizes_ind", + schema: MZ_INTERNAL_SCHEMA, + oid: oid::INDEX_MZ_OBJECT_ARRANGEMENT_SIZES_IND_OID, + sql: "IN CLUSTER mz_catalog_server + ON mz_internal.mz_object_arrangement_sizes (replica_id)", + is_retained_metrics_object: true, + }); + +/// Identifies [`MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY`] for the schema-migration +/// guard in `builtin_schema_migration.rs`, which forbids migrating this table +/// because its startup pruner assumes it is the only source of retractions. +pub static MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION: LazyLock = + LazyLock::new(|| SystemObjectDescription { + schema_name: MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY.schema.to_string(), + object_type: CatalogItemType::Table, + object_name: MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY.name.to_string(), + }); + +pub static MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY: LazyLock = LazyLock::new(|| { + BuiltinTable { + name: "mz_object_arrangement_size_history", + schema: MZ_INTERNAL_SCHEMA, + oid: oid::TABLE_MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_OID, + desc: RelationDesc::builder() + .with_column("replica_id", SqlScalarType::String.nullable(false)) + .with_column("object_id", SqlScalarType::String.nullable(false)) + .with_column("size", SqlScalarType::Int64.nullable(false)) + .with_column( + "collection_timestamp", + SqlScalarType::TimestampTz { precision: None }.nullable(false), + ) + .finish(), + column_comments: BTreeMap::from_iter([ + ( + "replica_id", + "The ID of the cluster replica. Corresponds to `mz_cluster_replicas.id`.", + ), + ( + "object_id", + "The ID of the compute object (index or materialized view). Corresponds to `mz_objects.id`.", + ), + ( + "size", + "The total arrangement heap and batcher size in bytes for this object on this replica \ + at `collection_timestamp`. Objects smaller than 10 MiB are reported at their exact size; \ + objects 10 MiB or larger are rounded to the nearest 10 MiB boundary to reduce per-byte \ + churn in the underlying differential collection.", + ), + ( + "collection_timestamp", + "The timestamp when this snapshot was collected.", + ), + ]), + is_retained_metrics_object: true, + access: vec![PUBLIC_SELECT], + } +}); + +pub static MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_OBJECT_IND: LazyLock = + LazyLock::new(|| BuiltinIndex { + name: "mz_object_arrangement_size_history_object_ind", + schema: MZ_INTERNAL_SCHEMA, + oid: oid::INDEX_MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_OBJECT_IND_OID, + sql: "IN CLUSTER mz_catalog_server + ON mz_internal.mz_object_arrangement_size_history (object_id)", + is_retained_metrics_object: true, + }); + +pub static MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_TS_IND: LazyLock = + LazyLock::new(|| BuiltinIndex { + name: "mz_object_arrangement_size_history_ts_ind", + schema: MZ_INTERNAL_SCHEMA, + oid: oid::INDEX_MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_TS_IND_OID, + sql: "IN CLUSTER mz_catalog_server + ON mz_internal.mz_object_arrangement_size_history (collection_timestamp)", + is_retained_metrics_object: true, + }); + pub static MZ_COMPUTE_HYDRATION_STATUSES: LazyLock = LazyLock::new(|| BuiltinView { name: "mz_compute_hydration_statuses", schema: MZ_INTERNAL_SCHEMA, @@ -14635,6 +14748,11 @@ pub static BUILTINS_STATIC: LazyLock>> = LazyLock::ne Builtin::View(&MZ_COMPUTE_ERROR_COUNTS), Builtin::Source(&MZ_COMPUTE_ERROR_COUNTS_RAW_UNIFIED), Builtin::Source(&MZ_COMPUTE_HYDRATION_TIMES), + Builtin::Source(&MZ_OBJECT_ARRANGEMENT_SIZES_UNIFIED), + Builtin::Index(&MZ_OBJECT_ARRANGEMENT_SIZES_IND), + Builtin::Table(&MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY), + Builtin::Index(&MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_OBJECT_IND), + Builtin::Index(&MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_TS_IND), Builtin::Log(&MZ_COMPUTE_LIR_MAPPING_PER_WORKER), Builtin::View(&MZ_LIR_MAPPING), Builtin::Source(&MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES), diff --git a/src/pgrepr-consts/src/oid.rs b/src/pgrepr-consts/src/oid.rs index bd2261cf1d45c..a0088f0f8f572 100644 --- a/src/pgrepr-consts/src/oid.rs +++ b/src/pgrepr-consts/src/oid.rs @@ -792,3 +792,8 @@ pub const VIEW_MZ_MCP_DATA_PRODUCT_DETAILS_OID: u32 = 17071; pub const VIEW_MZ_BUILTIN_MATERIALIZED_VIEWS_OID: u32 = 17072; pub const FUNC_PARSE_CATALOG_CREATE_SQL_OID: u32 = 17073; pub const FUNC_REDACT_SQL_OID: u32 = 17074; +pub const SOURCE_MZ_OBJECT_ARRANGEMENT_SIZES_OID: u32 = 17075; +pub const INDEX_MZ_OBJECT_ARRANGEMENT_SIZES_IND_OID: u32 = 17076; +pub const TABLE_MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_OID: u32 = 17077; +pub const INDEX_MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_OBJECT_IND_OID: u32 = 17078; +pub const INDEX_MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_TS_IND_OID: u32 = 17079; diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 5d565e037e23e..f299346ce642c 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -108,6 +108,7 @@ pub enum IntrospectionType { ComputeMaterializedViewRefreshes, ComputeErrorCounts, ComputeHydrationTimes, + ComputeObjectArrangementSizes, // Written by the Adapter for tracking AWS PrivateLink Connection Status History PrivatelinkConnectionStatusHistory, diff --git a/src/storage-controller/src/collection_mgmt.rs b/src/storage-controller/src/collection_mgmt.rs index dc852c8ddb396..cb067744e9c06 100644 --- a/src/storage-controller/src/collection_mgmt.rs +++ b/src/storage-controller/src/collection_mgmt.rs @@ -678,7 +678,8 @@ where | IntrospectionType::ComputeOperatorHydrationStatus | IntrospectionType::ComputeMaterializedViewRefreshes | IntrospectionType::ComputeErrorCounts - | IntrospectionType::ComputeHydrationTimes => { + | IntrospectionType::ComputeHydrationTimes + | IntrospectionType::ComputeObjectArrangementSizes => { // Differential collections start with an empty // desired state. No need to manually reset. } @@ -1096,7 +1097,8 @@ impl AppendOnlyWriteTask { | Some(introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus) | Some(introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes) | Some(introspection_type @ IntrospectionType::ComputeErrorCounts) - | Some(introspection_type @ IntrospectionType::ComputeHydrationTimes) => { + | Some(introspection_type @ IntrospectionType::ComputeHydrationTimes) + | Some(introspection_type @ IntrospectionType::ComputeObjectArrangementSizes) => { unreachable!("not append-only collection: {introspection_type:?}") } }; @@ -1273,7 +1275,8 @@ impl AppendOnlyWriteTask { | introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus | introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes | introspection_type @ IntrospectionType::ComputeErrorCounts - | introspection_type @ IntrospectionType::ComputeHydrationTimes => { + | introspection_type @ IntrospectionType::ComputeHydrationTimes + | introspection_type @ IntrospectionType::ComputeObjectArrangementSizes => { unreachable!("not append-only collection: {introspection_type:?}") } }; diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index cfee2041538dd..60fd4da7aff32 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -3771,7 +3771,10 @@ impl From<&IntrospectionType> for CollectionManagerKind { | IntrospectionType::ComputeOperatorHydrationStatus | IntrospectionType::ComputeMaterializedViewRefreshes | IntrospectionType::ComputeErrorCounts - | IntrospectionType::ComputeHydrationTimes => CollectionManagerKind::Differential, + | IntrospectionType::ComputeHydrationTimes + | IntrospectionType::ComputeObjectArrangementSizes => { + CollectionManagerKind::Differential + } IntrospectionType::SourceStatusHistory | IntrospectionType::SinkStatusHistory diff --git a/test/sqllogictest/arrangement_sizes_quantization.slt b/test/sqllogictest/arrangement_sizes_quantization.slt new file mode 100644 index 0000000000000..d41c61b55f1e3 --- /dev/null +++ b/test/sqllogictest/arrangement_sizes_quantization.slt @@ -0,0 +1,53 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Exercises the `size` quantization formula used by the introspection subscribe +# that populates `mz_internal.mz_object_arrangement_sizes`. The formula lives in +# `src/adapter/src/coord/introspection.rs`. +# +# Semantics: +# - byte counts below 10 MiB (10485760) are reported exactly +# - byte counts >= 10 MiB round to the nearest 10 MiB boundary using +# `((n + 5 MiB) / 10 MiB) * 10 MiB` + +mode cockroach + +# Applies the quantization formula to a spread of raw byte counts. +statement ok +CREATE VIEW arrangement_size_quantize AS +SELECT + n, + CASE + WHEN n < 10485760 THEN n::int8 + ELSE ((n + 5242880) / 10485760 * 10485760)::int8 + END AS size +FROM (VALUES + (0), -- zero bytes + (1), -- tiny arrangement + (10485759), -- one byte below the threshold: reported exactly + (10485760), -- exactly 10 MiB: already on a boundary + (10485761), -- one byte past threshold: rounds down to 10 MiB + (15728640), -- 15 MiB (boundary midpoint): rounds up to 20 MiB + (15728639), -- one byte below midpoint: rounds down to 10 MiB + (20971520), -- 20 MiB: already on a boundary + (104857600) -- 100 MiB: already on a boundary +) AS v(n); + +query II +SELECT n, size FROM arrangement_size_quantize ORDER BY n +---- +0 0 +1 1 +10485759 10485759 +10485760 10485760 +10485761 10485760 +15728639 10485760 +15728640 20971520 +20971520 20971520 +104857600 104857600 diff --git a/test/testdrive/arrangement-sizes.td b/test/testdrive/arrangement-sizes.td new file mode 100644 index 0000000000000..c04b5e55081b2 --- /dev/null +++ b/test/testdrive/arrangement-sizes.td @@ -0,0 +1,119 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Test the live `mz_object_arrangement_sizes` introspection source and the +# periodically-snapshotted `mz_object_arrangement_size_history` table. +# +# Relies on testdrive's retry behavior since both relations are populated +# asynchronously. + +$ set-sql-timeout duration=90s +$ set-arg-default default-replica-size=scale=1,workers=1 + +# `arrangement_size_collection_interval` is set to 2s at bootstrap by the +# testdrive mzcompose harness so the first snapshot arrives within this +# test's timeout rather than at the 1h production default. + +# Schema check. +> SELECT c.name, c.type + FROM mz_columns c + JOIN mz_tables t ON t.id = c.id + JOIN mz_schemas s ON s.id = t.schema_id + WHERE s.name = 'mz_internal' + AND t.name = 'mz_object_arrangement_size_history' + ORDER BY c.position +replica_id text +object_id text +size bigint +collection_timestamp "timestamp with time zone" + +# Set up a cluster with two replicas and some objects. +> CREATE CLUSTER test SIZE '${arg.default-replica-size}', REPLICATION FACTOR 2 +> SET cluster = test + +> CREATE TABLE t (a int, b int) +> INSERT INTO t SELECT g, g % 10 FROM generate_series(1, 1000) g + +> CREATE MATERIALIZED VIEW mv_agg AS + SELECT b, COUNT(*) AS cnt FROM t GROUP BY b +> CREATE INDEX idx_mv_agg ON mv_agg (b) +> CREATE INDEX idx_t ON t (a) +# Filter-only MV: no arrangement, should not appear in the live table. +> CREATE MATERIALIZED VIEW mv_filter AS SELECT * FROM t WHERE a > 0 + +# Live table: each arranged object shows up on both replicas with size > 0. +> SELECT o.name, r.name, s.size > 0 + FROM mz_internal.mz_object_arrangement_sizes s + JOIN mz_objects o ON o.id = s.object_id + JOIN mz_cluster_replicas r ON r.id = s.replica_id + WHERE o.name IN ('mv_agg', 'idx_mv_agg', 'idx_t') + ORDER BY o.name, r.name +idx_mv_agg r1 true +idx_mv_agg r2 true +idx_t r1 true +idx_t r2 true +mv_agg r1 true +mv_agg r2 true + +# Filter-only MV eventually drops out of the live table. During initial +# install the persist-sink pipeline can briefly register transient state +# in the raw heap/batcher tables; testdrive's automatic retry covers this. +> SELECT count(*) + FROM mz_internal.mz_object_arrangement_sizes s + JOIN mz_objects o ON o.id = s.object_id + WHERE o.name = 'mv_filter' +0 + +# History table: at least one snapshot row per (object_id, replica_id) pair +# for each arranged object, with a recent collection_timestamp. +> SELECT o.name, r.name, count(*) > 0, bool_and(h.size > 0) + FROM mz_internal.mz_object_arrangement_size_history h + JOIN mz_objects o ON o.id = h.object_id + JOIN mz_cluster_replicas r ON r.id = h.replica_id + WHERE o.name IN ('mv_agg', 'idx_mv_agg', 'idx_t') + GROUP BY o.name, r.name + ORDER BY o.name, r.name +idx_mv_agg r1 true true +idx_mv_agg r2 true true +idx_t r1 true true +idx_t r2 true true +mv_agg r1 true true +mv_agg r2 true true + +> SELECT bool_and(h.collection_timestamp IS NOT NULL + AND h.collection_timestamp > now() - INTERVAL '60 seconds') + FROM mz_internal.mz_object_arrangement_size_history h + JOIN mz_objects o ON o.id = h.object_id + WHERE o.name IN ('mv_agg', 'idx_mv_agg', 'idx_t') +true + +# Drop test: live table retracts the dropped MV; existing history rows +# remain but no new rows should be appended for it. +$ set-from-sql var=dropped-id +SELECT id FROM mz_objects WHERE name = 'mv_agg' + +> DROP MATERIALIZED VIEW mv_agg CASCADE + +> SELECT count(*) + FROM mz_internal.mz_object_arrangement_sizes + WHERE object_id = '${dropped-id}' +0 + +$ set-from-sql var=last-history-ts +SELECT COALESCE(max(collection_timestamp)::text, '1970-01-01') +FROM mz_internal.mz_object_arrangement_size_history +WHERE object_id = '${dropped-id}' + +> SELECT count(*) + FROM mz_internal.mz_object_arrangement_size_history + WHERE object_id = '${dropped-id}' + AND collection_timestamp > '${last-history-ts}'::timestamptz +0 + +> DROP CLUSTER test CASCADE diff --git a/test/testdrive/mzcompose.py b/test/testdrive/mzcompose.py index d047d01b0fee8..a60a8bb8f30af 100644 --- a/test/testdrive/mzcompose.py +++ b/test/testdrive/mzcompose.py @@ -136,7 +136,13 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: else: dependencies += ["zookeeper", "kafka", "schema-registry"] - additional_system_parameter_defaults = {"default_cluster_replication_factor": "1"} + additional_system_parameter_defaults = { + "default_cluster_replication_factor": "1", + # Shorten arrangement-sizes cadence so `arrangement-sizes.td` can + # verify snapshots land within the testdrive timeout rather than + # the 1h default. Harmless for every other testdrive file. + "arrangement_size_collection_interval": "2s", + } for val in args.system_param or []: x = val[0].split("=", maxsplit=1) assert len(x) == 2, f"--system-param '{val}' should be the format ="