Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,12 @@ def get_variable_system_parameters(
VariableSystemParameter(
"persist_state_update_lease_timeout", "1s", ["0s", "1s", "10s"]
),
VariableSystemParameter(
"arrangement_size_collection_interval", "1h", ["1s", "10s", "1h"]
),
VariableSystemParameter(
"arrangement_size_retention_period", "7d", ["1m", "1h", "7d"]
),
VariableSystemParameter(
"persist_validate_part_bounds_on_read", "false", ["true", "false"]
),
Expand Down
10 changes: 10 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,16 @@ def __init__(
"'1s'",
"'10s'",
]
self.flags_with_values["arrangement_size_collection_interval"] = [
"'1s'",
"'10s'",
"'1h'",
]
self.flags_with_values["arrangement_size_retention_period"] = [
"'1m'",
"'1h'",
"'7d'",
]
# Note: it's not safe to re-enable this flag after writing with `persist_validate_part_bounds_on_write`,
# since those new-style parts may fail our old-style validation.
self.flags_with_values["persist_validate_part_bounds_on_read"] = ["FALSE"]
Expand Down
16 changes: 16 additions & 0 deletions src/adapter-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ pub const CONSOLE_OIDC_SCOPES: Config<&'static str> = Config::new(
"Space-separated OIDC scopes requested by the web console.",
);

/// Interval at which to collect per-object arrangement size snapshots for the history table.
pub const ARRANGEMENT_SIZE_COLLECTION_INTERVAL: Config<Duration> = Config::new(
"arrangement_size_collection_interval",
Duration::from_secs(60 * 60),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Duration::from_secs(60 * 60),
Duration::from_hours(1),

"Interval at which to collect and snapshot per-object arrangement sizes.",
);

/// How long to retain per-object arrangement size history.
pub const ARRANGEMENT_SIZE_RETENTION_PERIOD: Config<Duration> = Config::new(
"arrangement_size_retention_period",
Duration::from_secs(7 * 24 * 60 * 60),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Duration::from_secs(7 * 24 * 60 * 60),
Duration::from_hours(7 * 24),

"How long to retain per-object arrangement size history.",
);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a sentinel value (0s?) to disable the collection.

/// Adds the full set of all adapter `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
Expand Down Expand Up @@ -245,4 +259,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&USER_ID_POOL_BATCH_SIZE)
.add(&CONSOLE_OIDC_CLIENT_ID)
.add(&CONSOLE_OIDC_SCOPES)
.add(&ARRANGEMENT_SIZE_COLLECTION_INTERVAL)
.add(&ARRANGEMENT_SIZE_RETENTION_PERIOD)
}
11 changes: 10 additions & 1 deletion src/adapter/src/catalog/open/builtin_schema_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use futures::future::BoxFuture;
use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
use mz_catalog::builtin::{
BUILTIN_LOOKUP, Builtin, Fingerprint, MZ_CATALOG_RAW, MZ_CATALOG_RAW_DESCRIPTION,
MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION,
RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
};
use mz_catalog::config::BuiltinItemMigrationConfig;
use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
Expand Down Expand Up @@ -486,6 +487,14 @@ impl Migration {
"mz_storage_usage_by_shard cannot be migrated or else the table will be truncated"
);

// Same hazard as `mz_storage_usage_by_shard`: the startup pruner
// (`Coordinator::prune_arrangement_sizes_history_on_startup`) assumes it is
// the only source of retractions, so migration-driven truncation would break it.
assert_ne!(
&*MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY_DESCRIPTION, object,
"mz_object_arrangement_size_history cannot be migrated or else the table will be truncated"
);

// `mz_catalog_raw` cannot be migrated because it contains the durable catalog and it
// wouldn't be very durable if we allowed it to be truncated.
assert_ne!(
Expand Down
145 changes: 145 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ pub enum Message {
StorageUsageFetch,
StorageUsageUpdate(ShardsUsageReferenced),
StorageUsagePrune(Vec<BuiltinTableUpdate>),
ArrangementSizesSchedule,
ArrangementSizesSnapshot,
ArrangementSizesPrune(Vec<BuiltinTableUpdate>),
/// Performs any cleanup and logging actions necessary for
/// finalizing a statement execution.
RetireExecute {
Expand Down Expand Up @@ -483,6 +486,9 @@ impl Message {
Message::StorageUsageFetch => "storage_usage_fetch",
Message::StorageUsageUpdate(_) => "storage_usage_update",
Message::StorageUsagePrune(_) => "storage_usage_prune",
Message::ArrangementSizesSchedule => "arrangement_sizes_schedule",
Message::ArrangementSizesSnapshot => "arrangement_sizes_snapshot",
Message::ArrangementSizesPrune(_) => "arrangement_sizes_prune",
Message::RetireExecute { .. } => "retire_execute",
Message::ExecuteSingleStatementTransaction { .. } => {
"execute_single_statement_transaction"
Expand Down Expand Up @@ -1925,6 +1931,11 @@ pub struct Coordinator {
/// The interval at which to collect storage usage information.
storage_usage_collection_interval: Duration,

/// Set once all compute objects have been observed as hydrated, gating
/// the first write into `mz_object_arrangement_size_history`. Sticky:
/// later partial re-hydrations (e.g. replica restart) don't re-arm it.
arrangement_sizes_hydration_observed: bool,

/// Segment analytics client.
#[derivative(Debug = "ignore")]
segment_client: Option<mz_segment::Client>,
Expand Down Expand Up @@ -3618,6 +3629,7 @@ impl Coordinator {
});

self.schedule_storage_usage_collection().await;
self.schedule_arrangement_sizes_collection().await;
self.spawn_privatelink_vpc_endpoints_watch_task();
self.spawn_statement_logging_task();
flags::tracing_config(self.catalog.system_config()).apply(&self.tracing_handle);
Expand Down Expand Up @@ -4234,6 +4246,49 @@ impl Coordinator {
});
}

/// Retracts `mz_object_arrangement_size_history` rows older than the
/// `arrangement_size_retention_period` dyncfg.
///
/// Must only run at startup: it reads at the oracle read timestamp and
/// writes retractions at the current write timestamp, which is only safe
/// when no other writes are in flight. See [the equivalent storage-usage
/// pruner](Self::prune_storage_usage_events_on_startup) for the same
/// reasoning.
async fn prune_arrangement_sizes_history_on_startup(&self) {
// The catalog server is not writable in read-only mode.
if self.controller.read_only() {
return;
}

let retention_period = mz_adapter_types::dyncfgs::ARRANGEMENT_SIZE_RETENTION_PERIOD
.get(self.catalog().system_config().dyncfgs());
let item_id = self
.catalog()
.resolve_builtin_table(&mz_catalog::builtin::MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY);
let global_id = self.catalog.get_entry(&item_id).latest_global_id();
let read_ts = self.get_local_read_ts().await;
let current_contents_fut = self
.controller
.storage_collections
.snapshot(global_id, read_ts);
let internal_cmd_tx = self.internal_cmd_tx.clone();
spawn(|| "arrangement_sizes_history_prune", async move {
let mut current_contents = current_contents_fut
.await
.unwrap_or_terminate("cannot fail to fetch snapshot");
differential_dataflow::consolidation::consolidate(&mut current_contents);

let cutoff_ts = u128::from(read_ts).saturating_sub(retention_period.as_millis());
let expired =
arrangement_sizes_expired_retractions(current_contents, cutoff_ts, item_id);

// TODO(arrangement-sizes): when the writeable-catalog-server
// plumbing in https://github.com/MaterializeInc/materialize/pull/35436
// lands, retract directly on `mz_catalog_server`.
let _ = internal_cmd_tx.send(Message::ArrangementSizesPrune(expired));
});
}

fn current_credit_consumption_rate(&self) -> Numeric {
self.catalog()
.user_cluster_replicas()
Expand All @@ -4253,6 +4308,40 @@ impl Coordinator {
}
}

/// Returns retraction updates for rows in a consolidated
/// `mz_object_arrangement_size_history` snapshot whose `collection_timestamp`
/// (column 3) is strictly before `cutoff_ts`.
///
/// Panics if any input row has `diff != 1`: the caller must consolidate first,
/// and a consolidated history table should never contain retractions because
/// the only source of retractions is this function itself.
fn arrangement_sizes_expired_retractions(
rows: impl IntoIterator<Item = (mz_repr::Row, i64)>,
cutoff_ts: u128,
item_id: CatalogItemId,
) -> Vec<BuiltinTableUpdate> {
let mut expired = Vec::new();
for (row, diff) in rows {
assert_eq!(
diff, 1,
"consolidated contents should not contain retractions: ({row:#?}, {diff:#?})"
);
let collection_timestamp = row
.unpack()
.get(3)
.expect("definition of mz_object_arrangement_size_history changed")
.unwrap_timestamptz()
.timestamp_millis();
let collection_timestamp: u128 = collection_timestamp
.try_into()
.expect("all collections happen after Jan 1 1970");
if collection_timestamp < cutoff_ts {
expired.push(BuiltinTableUpdate::row(item_id, row, Diff::MINUS_ONE));
}
}
expired
}

#[cfg(test)]
impl Coordinator {
#[allow(dead_code)]
Expand Down Expand Up @@ -4714,6 +4803,7 @@ pub fn serve(
cloud_resource_controller,
storage_usage_client,
storage_usage_collection_interval,
arrangement_sizes_hydration_observed: false,
segment_client,
metrics,
optimizer_metrics,
Expand Down Expand Up @@ -4760,6 +4850,8 @@ pub fn serve(
.await;
}

coord.prune_arrangement_sizes_history_on_startup().await;

Ok(())
});
let ok = bootstrap.is_ok();
Expand Down Expand Up @@ -5177,3 +5269,56 @@ mod id_pool_tests {
pool.refill(10, 5);
}
}

#[cfg(test)]
mod arrangement_sizes_pruner_tests {
use mz_repr::catalog_item_id::CatalogItemId;
use mz_repr::{Datum, Row};

use super::arrangement_sizes_expired_retractions;

// Pack a row shaped like `mz_object_arrangement_size_history`: the pruner
// only cares about column 3 (`collection_timestamp`), but we stuff the
// other three columns with realistic values so shape changes would fail.
fn history_row(ts_ms: i64) -> Row {
let dt = mz_ore::now::to_datetime(ts_ms.try_into().expect("non-negative"));
Row::pack_slice(&[
Datum::String("r1"),
Datum::String("u1"),
Datum::Int64(123),
Datum::TimestampTz(dt.try_into().expect("fits in TimestampTz")),
])
}

fn item_id() -> CatalogItemId {
// Any CatalogItemId will do; tests don't dispatch on it.
CatalogItemId::User(42)
}

#[mz_ore::test]
fn empty_input_produces_no_retractions() {
let out = arrangement_sizes_expired_retractions(Vec::new(), 1_000, item_id());
assert!(out.is_empty());
}

#[mz_ore::test]
fn retracts_only_rows_strictly_before_cutoff() {
// Mixes both sides of the filter and includes a row at exactly
// the cutoff timestamp to pin down the strict-less-than boundary.
let rows = vec![
(history_row(100), 1),
(history_row(500), 1),
(history_row(1_000), 1), // at cutoff: kept (strict <)
(history_row(5_000), 1),
];
let out = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
assert_eq!(out.len(), 2);
}

#[mz_ore::test]
#[should_panic(expected = "consolidated contents should not contain retractions")]
fn retraction_in_input_panics() {
let rows = vec![(history_row(100), -1)];
let _ = arrangement_sizes_expired_retractions(rows, 1_000, item_id());
}
}
35 changes: 35 additions & 0 deletions src/adapter/src/coord/introspection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,4 +566,39 @@ const SUBSCRIBES: &[SubscribeSpec] = &[
GROUP BY export_id, lir_id
)",
},
// Per-object arrangement sizes, one row per `(object_id, replica)`.
//
// `mz_arrangement_heap_size_raw` and `mz_arrangement_batcher_size_raw` are
// differential logs where each `+1` row represents one byte of heap delta;
// after consolidation, `COUNT(*)` is the current arrangement size in bytes.
//
// Objects smaller than 10 MiB report exact bytes; larger ones are rounded
// to the nearest 10 MiB to suppress byte-level churn in the collection.
Comment on lines +575 to +576
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will have a lot of noise independently of the 10MiB gate. Running the subscribe on an otherwise empty cluster shows an update for at least one export every second. You could filter temporary exports t*, as they won't be reflected anywhere, but that doesn't solve the problem.

//
// `mz_dataflow_addresses.address[1]` is the root of each operator's address
// tree, which equals the owning `dataflow_id` — so we can go addresses →
// operator → dataflow without joining `mz_dataflow_operator_dataflows`.
SubscribeSpec {
introspection_type: IntrospectionType::ComputeObjectArrangementSizes,
sql: "SUBSCRIBE (
SELECT
ce.export_id AS object_id,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this bakes in an assumption that's not guaranteed to be true in the future, which is that one dataflow has one export. If it'd change, we'd double-count. Just calling it out. Once it happens, we'd need to re-think what the granularity for this information is instead.

CASE
WHEN COUNT(*) < 10485760 THEN COUNT(*)::int8
ELSE ((COUNT(*) + 5242880) / 10485760 * 10485760)::int8
END AS size
FROM mz_introspection.mz_compute_exports AS ce
JOIN (
SELECT addrs.address[1] AS dataflow_id, addrs.id AS operator_id
FROM mz_introspection.mz_dataflow_addresses addrs
) AS od ON od.dataflow_id = ce.dataflow_id
JOIN (
SELECT operator_id FROM mz_introspection.mz_arrangement_heap_size_raw
UNION ALL
SELECT operator_id FROM mz_introspection.mz_arrangement_batcher_size_raw
Comment on lines +596 to +598
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit unfortunate you'll need to go through the raw collections, but right now this seems to be the best approach. The views we define on top of the raw collections do more than what you need here, and certainly will be more expensive to maintain.

) AS rs ON rs.operator_id = od.operator_id
GROUP BY ce.export_id
OPTIONS (AGGREGATE INPUT GROUP SIZE = 1000)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Count don't need the aggregate input group size annotation (it's only for hierarchical reductions, not for simple ones.)

)",
},
];
Loading
Loading