Skip to content

Commit 99403e6

Browse files
jubradclaude
andcommitted
catalog: sync builtin cluster replica sizes and add in-use check
During catalog open, sync the env-var cluster replica sizes into the durable catalog as builtin entries. This ensures sizes survive restarts and can be managed alongside user-defined sizes. Key changes: - `sync_builtin_cluster_replica_sizes()` reconciles env-var sizes with durable state: inserts new sizes, updates changed allocations, and removes sizes no longer in the env var. - `CatalogState.cluster_replica_sizes` is now initialized empty and populated from durable state updates (via apply_cluster_replica_size_update from Commit 1), rather than directly from config. - `is_cluster_replica_size_in_use()` helper scans all cluster replicas to check if a given size name is referenced. This will be used in the DDL drop path (Commit 3) to prevent deletion of in-use sizes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6ca5667 commit 99403e6

2 files changed

Lines changed: 69 additions & 2 deletions

File tree

src/adapter/src/catalog/open.rs

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use mz_catalog::builtin::{
2929
BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
3030
Fingerprint, MZ_CATALOG_RAW, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
3131
};
32-
use mz_catalog::config::StateConfig;
32+
use mz_catalog::config::{ClusterReplicaSizeMap, StateConfig};
3333
use mz_catalog::durable::objects::{
3434
SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
3535
};
@@ -176,7 +176,8 @@ impl Catalog {
176176
},
177177
helm_chart_version: config.helm_chart_version,
178178
},
179-
cluster_replica_sizes: config.cluster_replica_sizes,
179+
// Start empty; populated from durable state updates after builtin sync.
180+
cluster_replica_sizes: ClusterReplicaSizeMap(std::collections::BTreeMap::new()),
180181
availability_zones: config.availability_zones,
181182
egress_addresses: config.egress_addresses,
182183
aws_principal_context: config.aws_principal_context,
@@ -228,6 +229,7 @@ impl Catalog {
228229
config.boot_ts,
229230
)?;
230231
add_new_remove_old_builtin_roles_migration(&mut txn)?;
232+
sync_builtin_cluster_replica_sizes(&mut txn, &config.cluster_replica_sizes)?;
231233
remove_invalid_config_param_role_defaults_migration(&mut txn)?;
232234
remove_pending_cluster_replicas_migration(&mut txn, config.boot_ts)?;
233235

@@ -1118,6 +1120,57 @@ fn add_new_remove_old_builtin_roles_migration(
11181120
Ok(())
11191121
}
11201122

1123+
/// Syncs the builtin cluster replica sizes from the env-var configuration into
1124+
/// the durable catalog. Inserts new sizes, updates changed sizes, and removes
1125+
/// sizes that are no longer in the configuration.
1126+
fn sync_builtin_cluster_replica_sizes(
1127+
txn: &mut Transaction<'_>,
1128+
config_sizes: &ClusterReplicaSizeMap,
1129+
) -> Result<(), AdapterError> {
1130+
use std::collections::BTreeMap;
1131+
1132+
use mz_catalog::durable::objects::DurableType;
1133+
1134+
// Collect current durable builtin sizes.
1135+
let durable_sizes: BTreeMap<String, _> = txn
1136+
.get_cluster_replica_sizes()
1137+
.filter(|s| s.builtin)
1138+
.map(|s| (s.name.clone(), s))
1139+
.collect();
1140+
1141+
// Insert or update builtins from config.
1142+
for (name, allocation) in &config_sizes.0 {
1143+
if let Some(existing) = durable_sizes.get(name) {
1144+
// Check if the allocation changed by comparing the durable value
1145+
// representation (which is Eq-comparable).
1146+
let new_size = mz_catalog::durable::ClusterReplicaSize {
1147+
name: name.clone(),
1148+
allocation: allocation.clone(),
1149+
builtin: true,
1150+
};
1151+
let (_, new_value) = DurableType::into_key_value(new_size);
1152+
let (_, existing_value) = DurableType::into_key_value(existing.clone());
1153+
if new_value != existing_value {
1154+
// Retract and reinsert with updated allocation.
1155+
txn.remove_cluster_replica_size(name)?;
1156+
txn.insert_cluster_replica_size(name.clone(), allocation.clone(), true)?;
1157+
}
1158+
} else {
1159+
// New builtin size, insert it.
1160+
txn.insert_cluster_replica_size(name.clone(), allocation.clone(), true)?;
1161+
}
1162+
}
1163+
1164+
// Remove builtins that are no longer in config.
1165+
for (name, _) in &durable_sizes {
1166+
if !config_sizes.0.contains_key(name) {
1167+
txn.remove_cluster_replica_size(name)?;
1168+
}
1169+
}
1170+
1171+
Ok(())
1172+
}
1173+
11211174
fn add_new_remove_old_builtin_cluster_replicas_migration(
11221175
txn: &mut Transaction<'_>,
11231176
builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,

src/adapter/src/catalog/state.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2446,6 +2446,20 @@ impl CatalogState {
24462446
}
24472447
}
24482448

2449+
/// Returns true if the given cluster replica size name is in use by any
2450+
/// existing cluster replica.
2451+
pub fn is_cluster_replica_size_in_use(&self, size_name: &str) -> bool {
2452+
use mz_controller::clusters::ReplicaLocation;
2453+
self.clusters_by_id.values().any(|cluster| {
2454+
cluster.replicas().any(|replica| {
2455+
matches!(
2456+
&replica.config.location,
2457+
ReplicaLocation::Managed(loc) if loc.size == size_name
2458+
)
2459+
})
2460+
})
2461+
}
2462+
24492463
pub fn ensure_not_reserved_role(&self, role_id: &RoleId) -> Result<(), Error> {
24502464
if role_id.is_builtin() {
24512465
let role = self.get_role(role_id);

0 commit comments

Comments
 (0)