Skip to content

Commit cb2ed86

Browse files
jubradclaude
andcommitted
sql: add CREATE/DROP CLUSTER REPLICA SIZE DDL
Add SQL DDL for creating and dropping user-defined cluster replica sizes, gated behind the `enable_custom_cluster_replica_sizes` feature flag (default: off). Syntax: CREATE CLUSTER REPLICA SIZE <name> ( CREDITS PER HOUR = '<numeric>', [WORKERS = <n>], [SCALE = <n>], [MEMORY LIMIT = <bytes>], [CPU LIMIT = <nanocpus>], [DISK LIMIT = <bytes>], [CPU EXCLUSIVE = <bool>], [DISABLED = <bool>] ); DROP CLUSTER REPLICA SIZE <name>; Key behaviors: - Feature-flag gated: requires `enable_custom_cluster_replica_sizes` - Cannot create a size with a name that already exists (builtin or user) - Cannot drop builtin sizes (those synced from env var) - Cannot drop a size that is in use by an existing cluster replica - CREDITS PER HOUR is required; WORKERS defaults to 1, SCALE defaults to 1 Implementation follows the established DDL pattern (NetworkPolicy): parser -> planner -> sequencer -> catalog transact, with exhaustive match coverage across all statement/plan/response dispatchers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 99403e6 commit cb2ed86

27 files changed

Lines changed: 672 additions & 48 deletions

File tree

src/adapter/src/catalog/apply.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1476,10 +1476,20 @@ impl CatalogState {
14761476
.expect("could not pack audit log update"),
14771477
]
14781478
}
1479+
StateUpdateKind::ClusterReplicaSize(ref size) => {
1480+
if size.allocation.disabled {
1481+
Vec::new()
1482+
} else {
1483+
vec![CatalogState::pack_replica_size_update(
1484+
&size.name,
1485+
&size.allocation,
1486+
diff,
1487+
)]
1488+
}
1489+
}
14791490
StateUpdateKind::Database(_)
14801491
| StateUpdateKind::Schema(_)
14811492
| StateUpdateKind::NetworkPolicy(_)
1482-
| StateUpdateKind::ClusterReplicaSize(_)
14831493
| StateUpdateKind::StorageCollectionMetadata(_)
14841494
| StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
14851495
}

src/adapter/src/catalog/builtin_table_updates.rs

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use mz_catalog::memory::objects::{
3535
CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
3636
Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
3737
};
38+
use mz_controller::clusters::ReplicaAllocation;
3839
use mz_controller::clusters::{
3940
ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
4041
};
@@ -1933,34 +1934,36 @@ impl CatalogState {
19331934
continue;
19341935
}
19351936

1936-
// Just invent something when the limits are `None`, which only happens in non-prod
1937-
// environments (tests, process orchestrator, etc.)
1938-
let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1939-
let MemoryLimit(ByteSize(memory_bytes)) =
1940-
(alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1941-
let DiskLimit(ByteSize(disk_bytes)) =
1942-
(alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1943-
1944-
let row = Row::pack_slice(&[
1945-
size.as_str().into(),
1946-
u64::cast_from(alloc.scale).into(),
1947-
u64::cast_from(alloc.workers).into(),
1948-
cpu_limit.as_nanocpus().into(),
1949-
memory_bytes.into(),
1950-
disk_bytes.into(),
1951-
(alloc.credits_per_hour).into(),
1952-
]);
1953-
1954-
updates.push(BuiltinTableUpdate::row(
1955-
&*MZ_CLUSTER_REPLICA_SIZES,
1956-
row,
1957-
Diff::ONE,
1958-
));
1937+
updates.push(Self::pack_replica_size_update(size, alloc, Diff::ONE));
19591938
}
19601939

19611940
updates
19621941
}
19631942

1943+
pub fn pack_replica_size_update(
1944+
size: &str,
1945+
alloc: &ReplicaAllocation,
1946+
diff: Diff,
1947+
) -> BuiltinTableUpdate<&'static BuiltinTable> {
1948+
// Just invent something when the limits are `None`, which only happens in non-prod
1949+
// environments (tests, process orchestrator, etc.)
1950+
let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1951+
let MemoryLimit(ByteSize(memory_bytes)) = (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1952+
let DiskLimit(ByteSize(disk_bytes)) = (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1953+
1954+
let row = Row::pack_slice(&[
1955+
size.into(),
1956+
u64::cast_from(alloc.scale).into(),
1957+
u64::cast_from(alloc.workers).into(),
1958+
cpu_limit.as_nanocpus().into(),
1959+
memory_bytes.into(),
1960+
disk_bytes.into(),
1961+
(alloc.credits_per_hour).into(),
1962+
]);
1963+
1964+
BuiltinTableUpdate::row(&*MZ_CLUSTER_REPLICA_SIZES, row, diff)
1965+
}
1966+
19641967
pub fn pack_subscribe_update(
19651968
&self,
19661969
id: GlobalId,

src/adapter/src/catalog/transact.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,23 @@ pub enum Op {
165165
name: String,
166166
owner_id: RoleId,
167167
},
168+
CreateClusterReplicaSize {
169+
name: String,
170+
workers: usize,
171+
scale: u16,
172+
credits_per_hour: mz_repr::adt::numeric::Numeric,
173+
memory_limit: Option<u64>,
174+
cpu_limit: Option<u64>,
175+
disk_limit: Option<u64>,
176+
cpu_exclusive: bool,
177+
disabled: bool,
178+
selectors: BTreeMap<String, String>,
179+
is_cc: bool,
180+
swap_enabled: bool,
181+
},
182+
DropClusterReplicaSize {
183+
name: String,
184+
},
168185
Comment {
169186
object_id: CommentObjectId,
170187
sub_component: Option<usize>,
@@ -1664,6 +1681,64 @@ impl Catalog {
16641681

16651682
info!("created network policy {name} ({id})");
16661683
}
1684+
Op::CreateClusterReplicaSize {
1685+
name,
1686+
workers,
1687+
scale,
1688+
credits_per_hour,
1689+
memory_limit,
1690+
cpu_limit,
1691+
disk_limit,
1692+
cpu_exclusive,
1693+
disabled,
1694+
selectors,
1695+
is_cc,
1696+
swap_enabled,
1697+
} => {
1698+
// Build ReplicaAllocation from the fields
1699+
use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
1700+
use std::num::NonZero;
1701+
1702+
let allocation = mz_controller::clusters::ReplicaAllocation {
1703+
memory_limit: memory_limit.map(|b| MemoryLimit(bytesize::ByteSize(b))),
1704+
memory_request: None,
1705+
cpu_limit: cpu_limit.map(|n| {
1706+
CpuLimit::from_millicpus(usize::try_from(n / 1_000_000).expect("cpu fits"))
1707+
}),
1708+
cpu_request: None,
1709+
disk_limit: disk_limit.map(|b| DiskLimit(bytesize::ByteSize(b))),
1710+
scale: NonZero::new(scale).expect("scale must be non-zero"),
1711+
workers: NonZero::new(workers).expect("workers must be non-zero"),
1712+
credits_per_hour,
1713+
cpu_exclusive,
1714+
is_cc,
1715+
swap_enabled,
1716+
disabled,
1717+
selectors,
1718+
};
1719+
tx.insert_cluster_replica_size(name, allocation, false)?;
1720+
}
1721+
Op::DropClusterReplicaSize { name } => {
1722+
// Check that it exists and is not builtin
1723+
let sizes: Vec<_> = tx.get_cluster_replica_sizes().collect();
1724+
let size = sizes.iter().find(|s| s.name == name);
1725+
match size {
1726+
Some(s) if s.builtin => {
1727+
return Err(AdapterError::Unstructured(anyhow::anyhow!(
1728+
"cannot drop builtin cluster replica size '{}'",
1729+
name
1730+
)));
1731+
}
1732+
Some(_) => {
1733+
tx.remove_cluster_replica_size(&name)?;
1734+
}
1735+
None => {
1736+
return Err(AdapterError::PlanError(PlanError::Catalog(
1737+
SqlCatalogError::UnknownClusterReplicaSize(name),
1738+
)));
1739+
}
1740+
}
1741+
}
16671742
Op::Comment {
16681743
object_id,
16691744
sub_component,

src/adapter/src/command.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,10 @@ pub enum ExecuteResponse {
589589
CreatedType,
590590
/// The requested network policy was created.
591591
CreatedNetworkPolicy,
592+
/// The requested cluster replica size was created.
593+
CreatedClusterReplicaSize,
594+
/// The requested cluster replica size was dropped.
595+
DroppedClusterReplicaSize,
592596
/// The requested prepared statement was removed.
593597
Deallocate { all: bool },
594598
/// The requested cursor was declared.
@@ -755,6 +759,12 @@ impl TryInto<ExecuteResponse> for ExecuteResponseKind {
755759
Ok(ExecuteResponse::CreatedMaterializedView)
756760
}
757761
ExecuteResponseKind::CreatedNetworkPolicy => Ok(ExecuteResponse::CreatedNetworkPolicy),
762+
ExecuteResponseKind::CreatedClusterReplicaSize => {
763+
Ok(ExecuteResponse::CreatedClusterReplicaSize)
764+
}
765+
ExecuteResponseKind::DroppedClusterReplicaSize => {
766+
Ok(ExecuteResponse::DroppedClusterReplicaSize)
767+
}
758768
ExecuteResponseKind::CreatedContinualTask => Ok(ExecuteResponse::CreatedContinualTask),
759769
ExecuteResponseKind::CreatedType => Ok(ExecuteResponse::CreatedType),
760770
ExecuteResponseKind::Deallocate => Err(()),
@@ -820,6 +830,8 @@ impl ExecuteResponse {
820830
CreatedContinualTask { .. } => Some("CREATE CONTINUAL TASK".into()),
821831
CreatedType => Some("CREATE TYPE".into()),
822832
CreatedNetworkPolicy => Some("CREATE NETWORKPOLICY".into()),
833+
CreatedClusterReplicaSize => Some("CREATE CLUSTER REPLICA SIZE".into()),
834+
DroppedClusterReplicaSize => Some("DROP CLUSTER REPLICA SIZE".into()),
823835
Deallocate { all } => Some(format!("DEALLOCATE{}", if *all { " ALL" } else { "" })),
824836
DeclaredCursor => Some("DECLARE CURSOR".into()),
825837
Deleted(n) => Some(format!("DELETE {}", n)),
@@ -914,6 +926,8 @@ impl ExecuteResponse {
914926
CreateType => &[CreatedType],
915927
PlanKind::Deallocate => &[ExecuteResponseKind::Deallocate],
916928
CreateNetworkPolicy => &[CreatedNetworkPolicy],
929+
CreateClusterReplicaSize => &[CreatedClusterReplicaSize],
930+
DropClusterReplicaSize => &[DroppedClusterReplicaSize],
917931
Declare => &[DeclaredCursor],
918932
DiscardTemp => &[DiscardedTemp],
919933
DiscardAll => &[DiscardedAll],

src/adapter/src/coord/appends.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -957,6 +957,8 @@ pub(crate) fn waiting_on_startup_appends(
957957
| Plan::CreateSchema(_)
958958
| Plan::CreateRole(_)
959959
| Plan::CreateNetworkPolicy(_)
960+
| Plan::CreateClusterReplicaSize(_)
961+
| Plan::DropClusterReplicaSize(_)
960962
| Plan::CreateCluster(_)
961963
| Plan::CreateClusterReplica(_)
962964
| Plan::CreateContinualTask(_)

src/adapter/src/coord/catalog_serving.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>(
8282
| Plan::CreateSchema(_)
8383
| Plan::CreateRole(_)
8484
| Plan::CreateNetworkPolicy(_)
85+
| Plan::CreateClusterReplicaSize(_)
86+
| Plan::DropClusterReplicaSize(_)
8587
| Plan::CreateCluster(_)
8688
| Plan::CreateClusterReplica(_)
8789
| Plan::CreateContinualTask(_)

src/adapter/src/coord/command_handler.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,8 @@ impl Coordinator {
12961296
| Statement::CreateView(_)
12971297
| Statement::CreateWebhookSource(_)
12981298
| Statement::CreateNetworkPolicy(_)
1299+
| Statement::CreateClusterReplicaSize(_)
1300+
| Statement::DropClusterReplicaSize(_)
12991301
| Statement::Delete(_)
13001302
| Statement::DropObjects(_)
13011303
| Statement::DropOwned(_)

src/adapter/src/coord/ddl.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1291,7 +1291,9 @@ impl Coordinator {
12911291
| Op::ResetAllSystemConfiguration { .. }
12921292
| Op::Comment { .. }
12931293
| Op::WeirdStorageUsageUpdates { .. }
1294-
| Op::InjectAuditEvents { .. } => {}
1294+
| Op::InjectAuditEvents { .. }
1295+
| Op::CreateClusterReplicaSize { .. }
1296+
| Op::DropClusterReplicaSize { .. } => {}
12951297
}
12961298
}
12971299

src/adapter/src/coord/sequencer.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,18 @@ impl Coordinator {
302302
.await;
303303
ctx.retire(res);
304304
}
305+
Plan::CreateClusterReplicaSize(plan) => {
306+
let res = self
307+
.sequence_create_cluster_replica_size(ctx.session(), plan)
308+
.await;
309+
ctx.retire(res);
310+
}
311+
Plan::DropClusterReplicaSize(plan) => {
312+
let res = self
313+
.sequence_drop_cluster_replica_size(ctx.session(), plan)
314+
.await;
315+
ctx.retire(res);
316+
}
305317
Plan::Comment(plan) => {
306318
let result = self.sequence_comment_on(ctx.session(), plan).await;
307319
ctx.retire(result);

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -905,6 +905,54 @@ impl Coordinator {
905905
.map(|_| ExecuteResponse::CreatedNetworkPolicy)
906906
}
907907

908+
#[instrument]
909+
pub(super) async fn sequence_create_cluster_replica_size(
910+
&mut self,
911+
_session: &Session,
912+
plan: plan::CreateClusterReplicaSizePlan,
913+
) -> Result<ExecuteResponse, AdapterError> {
914+
let op = catalog::Op::CreateClusterReplicaSize {
915+
name: plan.name,
916+
workers: plan.workers,
917+
scale: plan.scale,
918+
credits_per_hour: plan.credits_per_hour,
919+
memory_limit: plan.memory_limit,
920+
cpu_limit: plan.cpu_limit,
921+
disk_limit: plan.disk_limit,
922+
cpu_exclusive: plan.cpu_exclusive,
923+
disabled: plan.disabled,
924+
selectors: plan.selectors,
925+
is_cc: plan.is_cc,
926+
swap_enabled: plan.swap_enabled,
927+
};
928+
self.catalog_transact(None, vec![op])
929+
.await
930+
.map(|_| ExecuteResponse::CreatedClusterReplicaSize)
931+
}
932+
933+
#[instrument]
934+
pub(super) async fn sequence_drop_cluster_replica_size(
935+
&mut self,
936+
_session: &Session,
937+
plan: plan::DropClusterReplicaSizePlan,
938+
) -> Result<ExecuteResponse, AdapterError> {
939+
// Check in-use
940+
if self
941+
.catalog()
942+
.state()
943+
.is_cluster_replica_size_in_use(&plan.name)
944+
{
945+
return Err(AdapterError::Unstructured(anyhow::anyhow!(
946+
"cannot drop cluster replica size '{}' because it is in use by an existing replica",
947+
plan.name
948+
)));
949+
}
950+
let op = catalog::Op::DropClusterReplicaSize { name: plan.name };
951+
self.catalog_transact(None, vec![op])
952+
.await
953+
.map(|_| ExecuteResponse::DroppedClusterReplicaSize)
954+
}
955+
908956
#[instrument]
909957
pub(super) async fn sequence_alter_network_policy(
910958
&mut self,

0 commit comments

Comments
 (0)