Skip to content

Commit 6ca5667

Browse files
jubradclaude
andcommitted
catalog: add durable storage for cluster replica sizes
Add ClusterReplicaSize as a new durable catalog object type. This lays the groundwork for making cluster replica sizes persistent and user-definable via SQL DDL. The new type follows the established pattern (NetworkPolicy, Cluster, etc.): - Proto Key/Value definitions in catalog-protos with Arbitrary derives - DurableType impl converting between ReplicaAllocation and raw proto-compatible fields (the Value stores raw u64/string fields rather than ReplicaAllocation directly, since Numeric doesn't implement Eq/Ord needed by TableTransaction) - StateUpdateKind variant with collection type mapping - Transaction CRUD methods (insert, remove, get) - Snapshot field and persist read/write support - Memory StateUpdateKind variant applied in pre-cluster ordering - Debug trace support - No-op v81→v82 catalog migration (existing catalogs have no ClusterReplicaSize entries; builtins will be populated in a follow-up commit) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent adc65c6 commit 6ca5667

18 files changed

Lines changed: 3597 additions & 24 deletions

File tree

src/adapter/src/catalog/apply.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,9 @@ impl CatalogState {
300300
StateUpdateKind::NetworkPolicy(network_policy) => {
301301
self.apply_network_policy_update(network_policy, diff, retractions);
302302
}
303+
StateUpdateKind::ClusterReplicaSize(size) => {
304+
self.apply_cluster_replica_size_update(size, diff, retractions);
305+
}
303306
StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
304307
self.apply_introspection_source_index_update(
305308
introspection_source_index,
@@ -542,6 +545,25 @@ impl CatalogState {
542545
);
543546
}
544547

548+
#[instrument(level = "debug")]
549+
fn apply_cluster_replica_size_update(
550+
&mut self,
551+
size: mz_catalog::durable::ClusterReplicaSize,
552+
diff: StateDiff,
553+
_retractions: &mut InProgressRetractions,
554+
) {
555+
match diff {
556+
StateDiff::Addition => {
557+
self.cluster_replica_sizes
558+
.0
559+
.insert(size.name, size.allocation);
560+
}
561+
StateDiff::Retraction => {
562+
self.cluster_replica_sizes.0.remove(&size.name);
563+
}
564+
}
565+
}
566+
545567
#[instrument(level = "debug")]
546568
fn apply_introspection_source_index_update(
547569
&mut self,
@@ -1457,6 +1479,7 @@ impl CatalogState {
14571479
StateUpdateKind::Database(_)
14581480
| StateUpdateKind::Schema(_)
14591481
| StateUpdateKind::NetworkPolicy(_)
1482+
| StateUpdateKind::ClusterReplicaSize(_)
14601483
| StateUpdateKind::StorageCollectionMetadata(_)
14611484
| StateUpdateKind::UnfinalizedShard(_) => Vec::new(),
14621485
}
@@ -2031,7 +2054,8 @@ fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
20312054
| StateUpdateKind::DefaultPrivilege(_)
20322055
| StateUpdateKind::SystemPrivilege(_)
20332056
| StateUpdateKind::SystemConfiguration(_)
2034-
| StateUpdateKind::NetworkPolicy(_) => push_update(
2057+
| StateUpdateKind::NetworkPolicy(_)
2058+
| StateUpdateKind::ClusterReplicaSize(_) => push_update(
20352059
update,
20362060
diff,
20372061
&mut pre_cluster_retractions,
@@ -2397,6 +2421,7 @@ impl ApplyState {
23972421
| SystemConfiguration(_)
23982422
| Cluster(_)
23992423
| NetworkPolicy(_)
2424+
| ClusterReplicaSize(_)
24002425
| ClusterReplica(_)
24012426
| SourceReferences(_)
24022427
| Comment(_)

src/adapter/src/catalog/open.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ impl Catalog {
282282
| BootstrapStateUpdateKind::SystemConfiguration(_)
283283
| BootstrapStateUpdateKind::Cluster(_)
284284
| BootstrapStateUpdateKind::NetworkPolicy(_)
285+
| BootstrapStateUpdateKind::ClusterReplicaSize(_)
285286
| BootstrapStateUpdateKind::ClusterReplica(_) => {
286287
pre_item_updates.push(StateUpdate {
287288
kind: kind.into(),

src/catalog-protos/objects_hashes.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[
22
{
33
"name": "objects.rs",
4-
"md5": "42585baa1b4b5e1b6da4e361a35ec546"
4+
"md5": "11000173b41468f6ff3f9593b31e5db7"
55
},
66
{
77
"name": "objects_v74.rs",
@@ -34,5 +34,9 @@
3434
{
3535
"name": "objects_v81.rs",
3636
"md5": "42585baa1b4b5e1b6da4e361a35ec546"
37+
},
38+
{
39+
"name": "objects_v82.rs",
40+
"md5": "11000173b41468f6ff3f9593b31e5db7"
3741
}
3842
]

src/catalog-protos/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@ pub mod objects_v78;
1919
pub mod objects_v79;
2020
pub mod objects_v80;
2121
pub mod objects_v81;
22+
pub mod objects_v82;
2223
pub mod serialization;
2324

2425
/// The current version of the `Catalog`.
2526
///
2627
/// We will initialize new `Catalog`s with this version, and migrate existing `Catalog`s to this
2728
/// version. Whenever the `Catalog` changes, e.g. the types we serialize in the `Catalog`
2829
/// change, we need to bump this version.
29-
pub const CATALOG_VERSION: u64 = 81;
30+
pub const CATALOG_VERSION: u64 = 82;
3031

3132
/// The minimum `Catalog` version number that we support migrating from.
3233
///

src/catalog-protos/src/objects.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,63 @@ pub struct NetworkPolicyValue {
465465
pub oid: u32,
466466
}
467467

468+
#[derive(
469+
Clone,
470+
Debug,
471+
PartialEq,
472+
Eq,
473+
PartialOrd,
474+
Ord,
475+
Serialize,
476+
Deserialize,
477+
Arbitrary
478+
)]
479+
pub struct ClusterReplicaSizeKey {
480+
pub name: String,
481+
}
482+
483+
#[derive(
484+
Clone,
485+
Debug,
486+
PartialEq,
487+
Eq,
488+
PartialOrd,
489+
Ord,
490+
Serialize,
491+
Deserialize,
492+
Arbitrary
493+
)]
494+
pub struct ClusterReplicaSizeValue {
495+
/// The memory limit for each process, in bytes.
496+
pub memory_limit: Option<u64>,
497+
/// The memory request for each process, in bytes.
498+
pub memory_request: Option<u64>,
499+
/// The CPU limit for each process, in nanocpus.
500+
pub cpu_limit: Option<u64>,
501+
/// The CPU request for each process, in nanocpus.
502+
pub cpu_request: Option<u64>,
503+
/// The disk limit for each process, in bytes.
504+
pub disk_limit: Option<u64>,
505+
/// The number of processes in the replica.
506+
pub scale: u16,
507+
/// The number of worker threads per process.
508+
pub workers: u64,
509+
/// The number of credits per hour that the replica consumes.
510+
pub credits_per_hour: String,
511+
/// Whether each process has exclusive access to its CPU cores.
512+
pub cpu_exclusive: bool,
513+
/// Whether this size represents a modern "cc" size rather than a legacy T-shirt size.
514+
pub is_cc: bool,
515+
/// Whether instances of this type use swap as the spill-to-disk mechanism.
516+
pub swap_enabled: bool,
517+
/// Whether instances of this type can be created.
518+
pub disabled: bool,
519+
/// Additional node selectors.
520+
pub selectors: std::collections::BTreeMap<String, String>,
521+
/// Whether this is a builtin size (from env var) or user-defined.
522+
pub builtin: bool,
523+
}
524+
468525
#[derive(
469526
Clone,
470527
Debug,
@@ -2514,6 +2571,7 @@ pub enum StateUpdateKind {
25142571
Cluster(Cluster),
25152572
ClusterIntrospectionSourceIndex(ClusterIntrospectionSourceIndex),
25162573
ClusterReplica(ClusterReplica),
2574+
ClusterReplicaSize(ClusterReplicaSize),
25172575
Comment(Comment),
25182576
Config(Config),
25192577
Database(Database),
@@ -2760,6 +2818,22 @@ pub struct NetworkPolicy {
27602818
pub value: NetworkPolicyValue,
27612819
}
27622820

2821+
#[derive(
2822+
Clone,
2823+
Debug,
2824+
PartialEq,
2825+
Eq,
2826+
PartialOrd,
2827+
Ord,
2828+
Serialize,
2829+
Deserialize,
2830+
Arbitrary
2831+
)]
2832+
pub struct ClusterReplicaSize {
2833+
pub key: ClusterReplicaSizeKey,
2834+
pub value: ClusterReplicaSizeValue,
2835+
}
2836+
27632837
#[derive(
27642838
Clone,
27652839
Debug,

0 commit comments

Comments
 (0)