Skip to content

Commit 9c743dc

Browse files
jubradclaude
andcommitted
catalog: fix review issues for cluster replica sizes
Address blocking code review feedback: 1. **Structured errors**: Replace `AdapterError::Unstructured(anyhow!(...))` with proper error variants: - `ErrorKind::ClusterReplicaSizeInUse` for in-use drop attempts - `ErrorKind::ReadOnlyClusterReplicaSize` for builtin drop attempts 2. **Audit log entries**: Add audit log events for CREATE and DROP CLUSTER REPLICA SIZE operations using `EventType::Create`/`Drop` with `ObjectType::ClusterReplicaSize`. Add the new variant to the audit log `ObjectType` enum and proto serialization. 3. **In-use safety on builtin sync**: When a size is removed from the env var but may still be referenced by existing replicas, mark it as `disabled: true` instead of deleting. This prevents panics in `concretize_replica_location` which expects all referenced sizes to exist in the catalog. 4. **Doc comment on PartialEq**: Document that the manual PartialEq impl on ClusterReplicaSize ignores the allocation field, and explain why and how to compare allocations correctly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent cb2ed86 commit 9c743dc

10 files changed

Lines changed: 66 additions & 13 deletions

File tree

src/adapter/src/catalog/open.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,10 +1161,15 @@ fn sync_builtin_cluster_replica_sizes(
11611161
}
11621162
}
11631163

1164-
// Remove builtins that are no longer in config.
1165-
for (name, _) in &durable_sizes {
1164+
// Remove or disable builtins that are no longer in config.
1165+
// We mark them disabled rather than deleting to avoid panics in
1166+
// concretize_replica_location if an existing replica still references the size.
1167+
for (name, existing) in &durable_sizes {
11661168
if !config_sizes.0.contains_key(name) {
1169+
let mut disabled_allocation = existing.allocation.clone();
1170+
disabled_allocation.disabled = true;
11671171
txn.remove_cluster_replica_size(name)?;
1172+
txn.insert_cluster_replica_size(name.clone(), disabled_allocation, true)?;
11681173
}
11691174
}
11701175

src/adapter/src/catalog/transact.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1716,17 +1716,30 @@ impl Catalog {
17161716
disabled,
17171717
selectors,
17181718
};
1719-
tx.insert_cluster_replica_size(name, allocation, false)?;
1719+
tx.insert_cluster_replica_size(name.clone(), allocation, false)?;
1720+
1721+
CatalogState::add_to_audit_log(
1722+
&state.system_configuration,
1723+
oracle_write_ts,
1724+
session,
1725+
tx,
1726+
audit_events,
1727+
EventType::Create,
1728+
mz_audit_log::ObjectType::ClusterReplicaSize,
1729+
EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1730+
id: name.clone(),
1731+
name: name.clone(),
1732+
}),
1733+
)?;
17201734
}
17211735
Op::DropClusterReplicaSize { name } => {
17221736
// Check that it exists and is not builtin
17231737
let sizes: Vec<_> = tx.get_cluster_replica_sizes().collect();
17241738
let size = sizes.iter().find(|s| s.name == name);
17251739
match size {
17261740
Some(s) if s.builtin => {
1727-
return Err(AdapterError::Unstructured(anyhow::anyhow!(
1728-
"cannot drop builtin cluster replica size '{}'",
1729-
name
1741+
return Err(AdapterError::Catalog(Error::new(
1742+
ErrorKind::ReadOnlyClusterReplicaSize(name),
17301743
)));
17311744
}
17321745
Some(_) => {
@@ -1738,6 +1751,20 @@ impl Catalog {
17381751
)));
17391752
}
17401753
}
1754+
1755+
CatalogState::add_to_audit_log(
1756+
&state.system_configuration,
1757+
oracle_write_ts,
1758+
session,
1759+
tx,
1760+
audit_events,
1761+
EventType::Drop,
1762+
mz_audit_log::ObjectType::ClusterReplicaSize,
1763+
EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1764+
id: name.clone(),
1765+
name: name.clone(),
1766+
}),
1767+
)?;
17411768
}
17421769
Op::Comment {
17431770
object_id,

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -942,10 +942,13 @@ impl Coordinator {
942942
.state()
943943
.is_cluster_replica_size_in_use(&plan.name)
944944
{
945-
return Err(AdapterError::Unstructured(anyhow::anyhow!(
946-
"cannot drop cluster replica size '{}' because it is in use by an existing replica",
947-
plan.name
948-
)));
945+
return Err(AdapterError::Catalog(
946+
mz_catalog::memory::error::Error::new(
947+
mz_catalog::memory::error::ErrorKind::ClusterReplicaSizeInUse(
948+
plan.name.clone(),
949+
),
950+
),
951+
));
949952
}
950953
let op = catalog::Op::DropClusterReplicaSize { name: plan.name };
951954
self.catalog_transact(None, vec![op])

src/audit-log/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ pub enum ObjectType {
145145
Func,
146146
Index,
147147
MaterializedView,
148+
ClusterReplicaSize,
148149
NetworkPolicy,
149150
Role,
150151
Secret,
@@ -162,6 +163,7 @@ impl ObjectType {
162163
match self {
163164
ObjectType::Cluster => "Cluster",
164165
ObjectType::ClusterReplica => "Cluster Replica",
166+
ObjectType::ClusterReplicaSize => "Cluster Replica Size",
165167
ObjectType::Connection => "Connection",
166168
ObjectType::ContinualTask => "Continual Task",
167169
ObjectType::Database => "Database",

src/catalog-protos/objects_hashes.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[
22
{
33
"name": "objects.rs",
4-
"md5": "11000173b41468f6ff3f9593b31e5db7"
4+
"md5": "c767ae77aa66bb656081a1535e9e9a62"
55
},
66
{
77
"name": "objects_v74.rs",
@@ -37,6 +37,6 @@
3737
},
3838
{
3939
"name": "objects_v82.rs",
40-
"md5": "11000173b41468f6ff3f9593b31e5db7"
40+
"md5": "c767ae77aa66bb656081a1535e9e9a62"
4141
}
4242
]

src/catalog-protos/src/audit_log.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ impl RustType<crate::objects::audit_log_event_v1::ObjectType> for mz_audit_log::
8282
mz_audit_log::ObjectType::ClusterReplica => {
8383
crate::objects::audit_log_event_v1::ObjectType::ClusterReplica
8484
}
85+
mz_audit_log::ObjectType::ClusterReplicaSize => {
86+
crate::objects::audit_log_event_v1::ObjectType::ClusterReplicaSize
87+
}
8588
mz_audit_log::ObjectType::Connection => {
8689
crate::objects::audit_log_event_v1::ObjectType::Connection
8790
}
@@ -133,6 +136,9 @@ impl RustType<crate::objects::audit_log_event_v1::ObjectType> for mz_audit_log::
133136
crate::objects::audit_log_event_v1::ObjectType::ClusterReplica => {
134137
Ok(mz_audit_log::ObjectType::ClusterReplica)
135138
}
139+
crate::objects::audit_log_event_v1::ObjectType::ClusterReplicaSize => {
140+
Ok(mz_audit_log::ObjectType::ClusterReplicaSize)
141+
}
136142
crate::objects::audit_log_event_v1::ObjectType::Connection => {
137143
Ok(mz_audit_log::ObjectType::Connection)
138144
}

src/catalog-protos/src/objects.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2495,6 +2495,7 @@ pub mod audit_log_event_v1 {
24952495
System = 16,
24962496
ContinualTask = 17,
24972497
NetworkPolicy = 18,
2498+
ClusterReplicaSize = 19,
24982499
}
24992500

25002501
#[derive(

src/catalog-protos/src/objects_v82.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2495,6 +2495,7 @@ pub mod audit_log_event_v1 {
24952495
System = 16,
24962496
ContinualTask = 17,
24972497
NetworkPolicy = 18,
2498+
ClusterReplicaSize = 19,
24982499
}
24992500

25002501
#[derive(

src/catalog/src/durable/objects.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,11 @@ pub struct ClusterReplicaSize {
298298
}
299299

300300
// Manual impls because ReplicaAllocation contains Numeric which doesn't impl Eq/Ord.
301-
// Identity is determined by name, which is the key.
301+
// WARNING: These impls compare only `name` and `builtin`, ignoring `allocation`.
302+
// Two ClusterReplicaSize values with different allocations but the same name will
303+
// compare equal. This is intentional for consolidation in state updates (where identity
304+
// is determined by key), but callers needing to detect allocation changes should
305+
// compare the ClusterReplicaSizeValue (via DurableType::into_key_value) instead.
302306
impl PartialEq for ClusterReplicaSize {
303307
fn eq(&self, other: &Self) -> bool {
304308
self.name == other.name && self.builtin == other.builtin

src/catalog/src/memory/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ pub enum ErrorKind {
4343
ReservedNetworkPolicyName(String),
4444
#[error("system network policy '{0}' cannot be modified")]
4545
ReadOnlyNetworkPolicy(String),
46+
#[error("cannot drop cluster replica size '{}' because it is in use by an existing replica", .0)]
47+
ClusterReplicaSizeInUse(String),
48+
#[error("cannot drop builtin cluster replica size '{}'", .0)]
49+
ReadOnlyClusterReplicaSize(String),
4650
#[error("replica name {} is reserved", .0.quoted())]
4751
ReservedReplicaName(String),
4852
#[error("system cluster '{0}' cannot be modified")]

0 commit comments

Comments
 (0)