Skip to content

Commit 7d80262

Browse files
jubradclaude
andcommitted
sql: add CREATE/DROP CLUSTER REPLICA SIZE DDL
Add SQL DDL for creating and dropping user-defined cluster replica sizes. Sizes are immutable once created; drop and recreate to change. Syntax: CREATE CLUSTER REPLICA SIZE <name> ( CREDITS PER HOUR = '<numeric>', -- required WORKERS = <n>, -- default 1 SCALE = <n>, -- default 1 MEMORY LIMIT = '<size>', -- e.g. '4GiB', '512MiB', '1GB' CPU LIMIT = '<cpu>', -- e.g. '0.5' (cores), '500m' (millicpus) DISK LIMIT = '<size>', CPU EXCLUSIVE = <bool>, DISABLED = <bool>, IS CC = <bool>, -- default true SWAP ENABLED = <bool>, NODE SELECTORS = '<json>' -- e.g. '{"kubernetes.io/arch": "arm64"}' ); DROP CLUSTER REPLICA SIZE <name>; Access control: - Gated behind enable_custom_cluster_replica_sizes feature flag - mz_system bypasses the feature flag (always allowed) - RBAC requires superuser for both CREATE and DROP - Cannot drop builtin sizes (from env var) or sizes in use by replicas - Cannot create a size with a name that already exists Human-readable units: - Memory/disk: GiB, MiB, GB, MB, kB, or raw bytes - CPU: cores (0.5), millicpus (500m), or raw nanocpus Structured errors for drop rejection (ClusterReplicaSizeInUse, ReadOnlyClusterReplicaSize). Audit log events for create/drop with ObjectType::ClusterReplicaSize. Tests: - SLT end-to-end test covering feature flag gating, create, use with cluster, in-use drop rejection, builtin drop rejection, human- readable units (GiB, millicpus), and cleanup - Parser roundtrip test - Updated snapshot tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d97f6e7 commit 7d80262

31 files changed

Lines changed: 736 additions & 57 deletions

File tree

src/adapter/src/catalog/apply.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1476,10 +1476,21 @@ impl CatalogState {
14761476
.expect("could not pack audit log update"),
14771477
]
14781478
}
1479+
StateUpdateKind::ClusterReplicaSize(ref size) => {
1480+
if size.allocation.disabled {
1481+
Vec::new()
1482+
} else {
1483+
CatalogState::pack_replica_size_update(
1484+
&size.name,
1485+
&size.allocation,
1486+
size.builtin,
1487+
diff,
1488+
)
1489+
}
1490+
}
14791491
StateUpdateKind::Database(_)
14801492
| StateUpdateKind::Schema(_)
14811493
| StateUpdateKind::NetworkPolicy(_)
1482-
| StateUpdateKind::ClusterReplicaSize(_)
14831494
| StateUpdateKind::StorageCollectionMetadata(_)
14841495
| StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
14851496
}

src/adapter/src/catalog/builtin_table_updates.rs

Lines changed: 67 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,18 @@ use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, Versione
1616
use mz_catalog::SYSTEM_CONN_ID;
1717
use mz_catalog::builtin::{
1818
BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19-
MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20-
MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_CONNECTIONS, MZ_CONTINUAL_TASKS,
21-
MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES,
22-
MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS,
23-
MZ_KAFKA_SOURCE_TABLES, MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
24-
MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
25-
MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
26-
MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SECRETS,
27-
MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES,
28-
MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
29-
MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
19+
MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZE_DETAILS,
20+
MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS, MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS,
21+
MZ_COMMENTS, MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
22+
MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES,
23+
MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES, MZ_KAFKA_SOURCES,
24+
MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES, MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
25+
MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS,
26+
MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH,
27+
MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES,
28+
MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
29+
MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
30+
MZ_WEBHOOKS_SOURCES,
3031
};
3132
use mz_catalog::config::AwsPrincipalContext;
3233
use mz_catalog::durable::SourceReferences;
@@ -35,6 +36,7 @@ use mz_catalog::memory::objects::{
3536
CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
3637
Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
3738
};
39+
use mz_controller::clusters::ReplicaAllocation;
3840
use mz_controller::clusters::{
3941
ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
4042
};
@@ -1933,34 +1935,67 @@ impl CatalogState {
19331935
continue;
19341936
}
19351937

1936-
// Just invent something when the limits are `None`, which only happens in non-prod
1937-
// environments (tests, process orchestrator, etc.)
1938-
let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1939-
let MemoryLimit(ByteSize(memory_bytes)) =
1940-
(alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1941-
let DiskLimit(ByteSize(disk_bytes)) =
1942-
(alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1943-
1944-
let row = Row::pack_slice(&[
1945-
size.as_str().into(),
1946-
u64::cast_from(alloc.scale).into(),
1947-
u64::cast_from(alloc.workers).into(),
1948-
cpu_limit.as_nanocpus().into(),
1949-
memory_bytes.into(),
1950-
disk_bytes.into(),
1951-
(alloc.credits_per_hour).into(),
1952-
]);
1953-
1954-
updates.push(BuiltinTableUpdate::row(
1955-
&*MZ_CLUSTER_REPLICA_SIZES,
1956-
row,
1938+
// The builtin flag isn't available from ClusterReplicaSizeMap; the details
1939+
// table is populated correctly via state updates in apply.rs which has
1940+
// access to the full ClusterReplicaSize with the builtin flag.
1941+
updates.extend(Self::pack_replica_size_update(
1942+
size,
1943+
alloc,
1944+
false,
19571945
Diff::ONE,
19581946
));
19591947
}
19601948

19611949
updates
19621950
}
19631951

1952+
pub fn pack_replica_size_update(
1953+
size: &str,
1954+
alloc: &ReplicaAllocation,
1955+
builtin: bool,
1956+
diff: Diff,
1957+
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1958+
// Just invent something when the limits are `None`, which only happens in non-prod
1959+
// environments (tests, process orchestrator, etc.)
1960+
let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1961+
let MemoryLimit(ByteSize(memory_bytes)) = (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1962+
let DiskLimit(ByteSize(disk_bytes)) = (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1963+
1964+
let public_row = Row::pack_slice(&[
1965+
size.into(),
1966+
u64::cast_from(alloc.scale).into(),
1967+
u64::cast_from(alloc.workers).into(),
1968+
cpu_limit.as_nanocpus().into(),
1969+
memory_bytes.into(),
1970+
disk_bytes.into(),
1971+
(alloc.credits_per_hour).into(),
1972+
]);
1973+
1974+
let selectors_value =
1975+
serde_json::to_value(&alloc.selectors).expect("selectors serializable");
1976+
let selectors_jsonb = Jsonb::from_serde_json(selectors_value).expect("valid jsonb");
1977+
let details_row = Row::pack_slice(&[
1978+
size.into(),
1979+
u64::cast_from(alloc.scale).into(),
1980+
u64::cast_from(alloc.workers).into(),
1981+
cpu_limit.as_nanocpus().into(),
1982+
memory_bytes.into(),
1983+
disk_bytes.into(),
1984+
(alloc.credits_per_hour).into(),
1985+
alloc.cpu_exclusive.into(),
1986+
alloc.is_cc.into(),
1987+
alloc.swap_enabled.into(),
1988+
alloc.disabled.into(),
1989+
builtin.into(),
1990+
selectors_jsonb.into_row().into_element(),
1991+
]);
1992+
1993+
vec![
1994+
BuiltinTableUpdate::row(&*MZ_CLUSTER_REPLICA_SIZES, public_row, diff),
1995+
BuiltinTableUpdate::row(&*MZ_CLUSTER_REPLICA_SIZE_DETAILS, details_row, diff),
1996+
]
1997+
}
1998+
19641999
pub fn pack_subscribe_update(
19652000
&self,
19662001
id: GlobalId,

src/adapter/src/catalog/transact.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,23 @@ pub enum Op {
165165
name: String,
166166
owner_id: RoleId,
167167
},
168+
CreateClusterReplicaSize {
169+
name: String,
170+
workers: usize,
171+
scale: u16,
172+
credits_per_hour: mz_repr::adt::numeric::Numeric,
173+
memory_limit: Option<u64>,
174+
cpu_limit: Option<u64>,
175+
disk_limit: Option<u64>,
176+
cpu_exclusive: bool,
177+
disabled: bool,
178+
selectors: BTreeMap<String, String>,
179+
is_cc: bool,
180+
swap_enabled: bool,
181+
},
182+
DropClusterReplicaSize {
183+
name: String,
184+
},
168185
Comment {
169186
object_id: CommentObjectId,
170187
sub_component: Option<usize>,
@@ -1664,6 +1681,91 @@ impl Catalog {
16641681

16651682
info!("created network policy {name} ({id})");
16661683
}
1684+
Op::CreateClusterReplicaSize {
1685+
name,
1686+
workers,
1687+
scale,
1688+
credits_per_hour,
1689+
memory_limit,
1690+
cpu_limit,
1691+
disk_limit,
1692+
cpu_exclusive,
1693+
disabled,
1694+
selectors,
1695+
is_cc,
1696+
swap_enabled,
1697+
} => {
1698+
// Build ReplicaAllocation from the fields
1699+
use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
1700+
use std::num::NonZero;
1701+
1702+
let allocation = mz_controller::clusters::ReplicaAllocation {
1703+
memory_limit: memory_limit.map(|b| MemoryLimit(bytesize::ByteSize(b))),
1704+
memory_request: None,
1705+
cpu_limit: cpu_limit.map(|n| {
1706+
CpuLimit::from_millicpus(usize::try_from(n / 1_000_000).expect("cpu fits"))
1707+
}),
1708+
cpu_request: None,
1709+
disk_limit: disk_limit.map(|b| DiskLimit(bytesize::ByteSize(b))),
1710+
scale: NonZero::new(scale).expect("scale must be non-zero"),
1711+
workers: NonZero::new(workers).expect("workers must be non-zero"),
1712+
credits_per_hour,
1713+
cpu_exclusive,
1714+
is_cc,
1715+
swap_enabled,
1716+
disabled,
1717+
selectors,
1718+
};
1719+
tx.insert_cluster_replica_size(name.clone(), allocation, false)?;
1720+
1721+
CatalogState::add_to_audit_log(
1722+
&state.system_configuration,
1723+
oracle_write_ts,
1724+
session,
1725+
tx,
1726+
audit_events,
1727+
EventType::Create,
1728+
mz_audit_log::ObjectType::ClusterReplicaSize,
1729+
EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1730+
id: name.clone(),
1731+
name: name.clone(),
1732+
}),
1733+
)?;
1734+
}
1735+
Op::DropClusterReplicaSize { name } => {
1736+
// Check that it exists and is not builtin
1737+
let sizes: Vec<_> = tx.get_cluster_replica_sizes().collect();
1738+
let size = sizes.iter().find(|s| s.name == name);
1739+
match size {
1740+
Some(s) if s.builtin => {
1741+
return Err(AdapterError::Catalog(Error::new(
1742+
ErrorKind::ReadOnlyClusterReplicaSize(name),
1743+
)));
1744+
}
1745+
Some(_) => {
1746+
tx.remove_cluster_replica_size(&name)?;
1747+
}
1748+
None => {
1749+
return Err(AdapterError::PlanError(PlanError::Catalog(
1750+
SqlCatalogError::UnknownClusterReplicaSize(name),
1751+
)));
1752+
}
1753+
}
1754+
1755+
CatalogState::add_to_audit_log(
1756+
&state.system_configuration,
1757+
oracle_write_ts,
1758+
session,
1759+
tx,
1760+
audit_events,
1761+
EventType::Drop,
1762+
mz_audit_log::ObjectType::ClusterReplicaSize,
1763+
EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1764+
id: name.clone(),
1765+
name: name.clone(),
1766+
}),
1767+
)?;
1768+
}
16671769
Op::Comment {
16681770
object_id,
16691771
sub_component,

src/adapter/src/command.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,10 @@ pub enum ExecuteResponse {
589589
CreatedType,
590590
/// The requested network policy was created.
591591
CreatedNetworkPolicy,
592+
/// The requested cluster replica size was created.
593+
CreatedClusterReplicaSize,
594+
/// The requested cluster replica size was dropped.
595+
DroppedClusterReplicaSize,
592596
/// The requested prepared statement was removed.
593597
Deallocate { all: bool },
594598
/// The requested cursor was declared.
@@ -755,6 +759,12 @@ impl TryInto<ExecuteResponse> for ExecuteResponseKind {
755759
Ok(ExecuteResponse::CreatedMaterializedView)
756760
}
757761
ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
762+
ExecuteResponseKind::CreatedClusterReplicaSize => {
763+
Ok(ExecuteResponse::CreatedClusterReplicaSize)
764+
}
765+
ExecuteResponseKind::DroppedClusterReplicaSize => {
766+
Ok(ExecuteResponse::DroppedClusterReplicaSize)
767+
}
758768
ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
759769
ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
760770
ExecuteResponseKind::Deallocate => Err(()),
@@ -820,6 +830,8 @@ impl ExecuteResponse {
820830
CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
821831
CreatedType => Some("CREATE TYPE".into()),
822832
CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
833+
CreatedClusterReplicaSize => Some("CREATE CLUSTER REPLICA SIZE".into()),
834+
DroppedClusterReplicaSize => Some("DROP CLUSTER REPLICA SIZE".into()),
823835
Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
824836
DeclaredCursor => Some("DECLARE CURSOR".into()),
825837
Deleted(n) => Some(format!("DELETE {}", n)),
@@ -914,6 +926,8 @@ impl ExecuteResponse {
914926
CreateType => &[CreatedType],
915927
PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
916928
CreateNetworkPolicy => &[CreatedNetworkPolicy],
929+
CreateClusterReplicaSize => &[CreatedClusterReplicaSize],
930+
DropClusterReplicaSize => &[DroppedClusterReplicaSize],
917931
Declare => &[DeclaredCursor],
918932
DiscardTemp => &[DiscardedTemp],
919933
DiscardAll => &[DiscardedAll],

src/adapter/src/coord/appends.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -957,6 +957,8 @@ pub(crate) fn waiting_on_startup_appends(
957957
| Plan::CreateSchema(_)
958958
| Plan::CreateRole(_)
959959
| Plan::CreateNetworkPolicy(_)
960+
| Plan::CreateClusterReplicaSize(_)
961+
| Plan::DropClusterReplicaSize(_)
960962
| Plan::CreateCluster(_)
961963
| Plan::CreateClusterReplica(_)
962964
| Plan::CreateContinualTask(_)

src/adapter/src/coord/catalog_serving.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>(
8282
| Plan::CreateSchema(_)
8383
| Plan::CreateRole(_)
8484
| Plan::CreateNetworkPolicy(_)
85+
| Plan::CreateClusterReplicaSize(_)
86+
| Plan::DropClusterReplicaSize(_)
8587
| Plan::CreateCluster(_)
8688
| Plan::CreateClusterReplica(_)
8789
| Plan::CreateContinualTask(_)

src/adapter/src/coord/command_handler.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,8 @@ impl Coordinator {
12961296
| Statement::CreateView(_)
12971297
| Statement::CreateWebhookSource(_)
12981298
| Statement::CreateNetworkPolicy(_)
1299+
| Statement::CreateClusterReplicaSize(_)
1300+
| Statement::DropClusterReplicaSize(_)
12991301
| Statement::Delete(_)
13001302
| Statement::DropObjects(_)
13011303
| Statement::DropOwned(_)

src/adapter/src/coord/ddl.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1291,7 +1291,9 @@ impl Coordinator {
12911291
| Op::ResetAllSystemConfiguration { .. }
12921292
| Op::Comment { .. }
12931293
| Op::WeirdStorageUsageUpdates { .. }
1294-
| Op::InjectAuditEvents { .. } => {}
1294+
| Op::InjectAuditEvents { .. }
1295+
| Op::CreateClusterReplicaSize { .. }
1296+
| Op::DropClusterReplicaSize { .. } => {}
12951297
}
12961298
}
12971299

src/adapter/src/coord/sequencer.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,18 @@ impl Coordinator {
302302
.await;
303303
ctx.retire(res);
304304
}
305+
Plan::CreateClusterReplicaSize(plan) => {
306+
let res = self
307+
.sequence_create_cluster_replica_size(ctx.session(), plan)
308+
.await;
309+
ctx.retire(res);
310+
}
311+
Plan::DropClusterReplicaSize(plan) => {
312+
let res = self
313+
.sequence_drop_cluster_replica_size(ctx.session(), plan)
314+
.await;
315+
ctx.retire(res);
316+
}
305317
Plan::Comment(plan) => {
306318
let result = self.sequence_comment_on(ctx.session(), plan).await;
307319
ctx.retire(result);

0 commit comments

Comments
 (0)