Skip to content

Commit a8f4526

Browse files
antiguruclaude
andauthored
Scoped feature flags 1/3: scope declaration and per-replica override plumbing (#37079)
### Motivation First of a three-PR stack splitting #36959 (scoped feature flags) for review: **1/3 (this) → 2/3 #37080 → 3/3 #36959**. Design: doc/developer/design/20260609_scoped_feature_flags.md (#36947). This PR is the additive foundation: it introduces the vocabulary the later PRs build on, with no behavior change on its own. ### Description * `ParameterScope` (`Environment` / `Cluster` / `Replica`), declared on system vars and dyncfg `Config`s and carried through to the synced system vars. The declaration is the single source of truth for which contexts get evaluated. * The size-family taxonomy: `ReplicaAllocation::family` plus a size-map `family` field, with a `cc` / `legacy` fallback for sizes that don't set one. * The compute controller's per-replica dyncfg override layer (`update_replica_dyncfg_overrides` + per-replica command specialization), inert until the adapter wires it in 3/3. Nothing consumes the scope or the override layer yet, so this is a no-op. ### Verification `test_replica_allocation_family` covers the size→family fallback; the rest is exercised by the later PRs in the stack. Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent bfaab7a commit a8f4526

8 files changed

Lines changed: 327 additions & 40 deletions

File tree

src/catalog/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ impl ClusterReplicaSizeMap {
189189
credits_per_hour: 1.into(),
190190
cpu_exclusive: false,
191191
is_cc: false,
192+
family: None,
192193
swap_enabled: false,
193194
disabled: false,
194195
selectors: BTreeMap::default(),
@@ -212,6 +213,7 @@ impl ClusterReplicaSizeMap {
212213
credits_per_hour: scale.into(),
213214
cpu_exclusive: false,
214215
is_cc: false,
216+
family: None,
215217
swap_enabled: false,
216218
disabled: false,
217219
selectors: BTreeMap::default(),
@@ -230,6 +232,7 @@ impl ClusterReplicaSizeMap {
230232
credits_per_hour: scale.into(),
231233
cpu_exclusive: false,
232234
is_cc: false,
235+
family: None,
233236
swap_enabled: false,
234237
disabled: false,
235238
selectors: BTreeMap::default(),
@@ -248,6 +251,7 @@ impl ClusterReplicaSizeMap {
248251
credits_per_hour: 1.into(),
249252
cpu_exclusive: false,
250253
is_cc: false,
254+
family: None,
251255
swap_enabled: false,
252256
disabled: false,
253257
selectors: BTreeMap::default(),
@@ -267,6 +271,7 @@ impl ClusterReplicaSizeMap {
267271
credits_per_hour: 2.into(),
268272
cpu_exclusive: false,
269273
is_cc: false,
274+
family: None,
270275
swap_enabled: false,
271276
disabled: false,
272277
selectors: BTreeMap::default(),
@@ -285,6 +290,7 @@ impl ClusterReplicaSizeMap {
285290
credits_per_hour: 0.into(),
286291
cpu_exclusive: false,
287292
is_cc: true,
293+
family: None,
288294
swap_enabled: false,
289295
disabled: true,
290296
selectors: BTreeMap::default(),

src/compute-client/src/controller.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use mz_compute_types::dataflows::DataflowDescription;
4646
use mz_compute_types::dyncfgs::{
4747
COMPUTE_REPLICA_EXPIRATION_OFFSET, ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA,
4848
};
49-
use mz_dyncfg::ConfigSet;
49+
use mz_dyncfg::{ConfigSet, ConfigUpdates};
5050
use mz_expr::RowSetFinishing;
5151
use mz_expr::row::RowCollection;
5252
use mz_ore::cast::CastFrom;
@@ -633,6 +633,24 @@ impl ComputeController {
633633
self.config.update(config_params);
634634
}
635635

636+
/// Replaces the per-replica dyncfg overrides for the given instances.
637+
///
638+
/// This only stores the overrides; callers should follow with a
639+
/// configuration push (e.g. [`Self::update_configuration`]) so existing
640+
/// replicas observe the new values. Instances absent from `overrides` have
641+
/// their overrides cleared, so a replica that no longer has an override
642+
/// reverts to the environment-wide configuration. Used by the scoped
643+
/// feature flags (replica-local) layer.
644+
pub fn update_replica_dyncfg_overrides(
645+
&mut self,
646+
mut overrides: BTreeMap<ComputeInstanceId, BTreeMap<ReplicaId, ConfigUpdates>>,
647+
) {
648+
for (id, instance) in self.instances.iter_mut() {
649+
let instance_overrides = overrides.remove(id).unwrap_or_default();
650+
instance.call(move |i| i.update_replica_dyncfg_overrides(instance_overrides));
651+
}
652+
}
653+
636654
/// Mark the end of any initialization commands.
637655
///
638656
/// The implementor may wait for this method to be called before implementing prior commands,

src/compute-client/src/controller/instance.rs

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use mz_compute_types::sources::SourceInstanceDesc;
2626
use mz_controller_types::dyncfgs::{
2727
ENABLE_PAUSED_CLUSTER_READHOLD_DOWNGRADE, WALLCLOCK_LAG_RECORDING_INTERVAL,
2828
};
29-
use mz_dyncfg::ConfigSet;
29+
use mz_dyncfg::{ConfigSet, ConfigUpdates};
3030
use mz_expr::RowSetFinishing;
3131
use mz_ore::cast::CastFrom;
3232
use mz_ore::channel::instrumented_unbounded_channel;
@@ -140,6 +140,13 @@ pub(super) struct Instance {
140140
workload_class: Option<String>,
141141
/// The replicas of this compute instance.
142142
replicas: BTreeMap<ReplicaId, ReplicaState>,
143+
/// Per-replica dyncfg overrides, merged into the `UpdateConfiguration`
144+
/// command sent to each replica (and into the command-history replay used
145+
/// to hydrate new replicas). Populated from the scoped feature flags
146+
/// (replica-local) layer; empty by default, in which case every replica
147+
/// receives the unmodified environment-wide configuration. Stores only the
148+
/// values that differ from the environment-wide value, so the map is sparse.
149+
replica_dyncfg_overrides: BTreeMap<ReplicaId, ConfigUpdates>,
143150
/// Currently installed compute collections.
144151
///
145152
/// New entries are added for all collections exported from dataflows created through
@@ -845,6 +852,7 @@ impl Instance {
845852
read_only,
846853
workload_class,
847854
replicas,
855+
replica_dyncfg_overrides: _,
848856
collections,
849857
log_sources: _,
850858
peeks,
@@ -950,6 +958,7 @@ impl Instance {
950958
read_only,
951959
workload_class: None,
952960
replicas: Default::default(),
961+
replica_dyncfg_overrides: Default::default(),
953962
collections,
954963
log_sources,
955964
peeks: Default::default(),
@@ -1091,21 +1100,57 @@ impl Instance {
10911100
#[mz_ore::instrument(level = "debug")]
10921101
fn send(&mut self, cmd: ComputeCommand) {
10931102
// Record the command so that new replicas can be brought up to speed.
1103+
// We record the *base* (un-specialized) command, so that the per-replica
1104+
// dyncfg overrides are re-applied at replay time in `add_replica` rather
1105+
// than baked into the shared history.
10941106
self.history.push(cmd.clone());
10951107

10961108
let target_replica = self.target_replica(&cmd);
10971109

1110+
// Borrow the overrides separately from `self.replicas` so the per-replica
1111+
// specialization below does not conflict with the mutable replica borrow.
1112+
let overrides = &self.replica_dyncfg_overrides;
1113+
10981114
if let Some(rid) = target_replica {
10991115
if let Some(replica) = self.replicas.get_mut(&rid) {
1116+
let cmd = Self::specialize_command_for_replica(cmd, rid, overrides);
11001117
let _ = replica.client.send(cmd);
11011118
}
11021119
} else {
1103-
for replica in self.replicas.values_mut() {
1104-
let _ = replica.client.send(cmd.clone());
1120+
for (rid, replica) in self.replicas.iter_mut() {
1121+
let cmd = Self::specialize_command_for_replica(cmd.clone(), *rid, overrides);
1122+
let _ = replica.client.send(cmd);
11051123
}
11061124
}
11071125
}
11081126

1127+
/// Specializes a command for a specific replica by merging that replica's
1128+
/// dyncfg override (if any) into an `UpdateConfiguration` command. All other
1129+
/// commands, and replicas without an override, are returned unchanged.
1130+
fn specialize_command_for_replica(
1131+
mut cmd: ComputeCommand,
1132+
replica_id: ReplicaId,
1133+
overrides: &BTreeMap<ReplicaId, ConfigUpdates>,
1134+
) -> ComputeCommand {
1135+
if let ComputeCommand::UpdateConfiguration(params) = &mut cmd
1136+
&& let Some(over) = overrides.get(&replica_id)
1137+
&& !over.updates.is_empty()
1138+
{
1139+
params.dyncfg_updates.extend(over.clone());
1140+
}
1141+
cmd
1142+
}
1143+
1144+
/// Replaces the per-replica dyncfg overrides. Callers should follow this
1145+
/// with a configuration push (e.g. `update_configuration`) so that existing
1146+
/// replicas observe the new overrides.
1147+
pub(super) fn update_replica_dyncfg_overrides(
1148+
&mut self,
1149+
overrides: BTreeMap<ReplicaId, ConfigUpdates>,
1150+
) {
1151+
self.replica_dyncfg_overrides = overrides;
1152+
}
1153+
11091154
/// Determine the target replica for a compute command. Retrieves the
11101155
/// collection named by the command, and returns the target replica if
11111156
/// it is set, and None if not set, or the command doesn't name a collection.
@@ -1183,7 +1228,13 @@ impl Instance {
11831228
continue;
11841229
}
11851230

1186-
if client.send(command.clone()).is_err() {
1231+
// Re-apply this replica's dyncfg override to replayed config commands.
1232+
let command = Self::specialize_command_for_replica(
1233+
command.clone(),
1234+
id,
1235+
&self.replica_dyncfg_overrides,
1236+
);
1237+
if client.send(command).is_err() {
11871238
// We swallow the error here. On the next send, we will fail again, and
11881239
// restart the connection as well as this rehydration.
11891240
tracing::warn!("Replica {:?} connection terminated during hydration", id);

src/compute-types/src/dyncfgs.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
1212
use std::time::Duration;
1313

14-
use mz_dyncfg::{Config, ConfigSet};
14+
use mz_dyncfg::{Config, ConfigSet, ParameterScope};
1515

1616
/// Whether rendering should use `half_join2` rather than DD's `half_join` for delta joins.
1717
///
@@ -41,7 +41,8 @@ pub const ENABLE_COLUMN_PAGED_BATCHER: Config<bool> = Config::new(
4141
false,
4242
"Use the columnar-native paged merge batcher at arrange sites. When `false` (default), \
4343
arranges fall back to the legacy columnation `Col2ValBatcher` / `RowRowBuilder` path.",
44-
);
44+
)
45+
.scoped(ParameterScope::Replica);
4546

4647
/// Allow the column-paged batcher's pager to actually evict chunks
4748
/// under memory pressure. Only meaningful when
@@ -56,7 +57,8 @@ pub const ENABLE_COLUMN_PAGED_BATCHER_SPILL: Config<bool> = Config::new(
5657
false,
5758
"Allow the column-paged batcher's pager to evict chunks under memory pressure. Only \
5859
meaningful when `enable_column_paged_batcher = true`.",
59-
);
60+
)
61+
.scoped(ParameterScope::Replica);
6062

6163
/// Total resident-byte budget the column-paged batcher's tiered policy
6264
/// (`mz_timely_util::column_pager::policy::TieredPolicy`) is allowed to
@@ -91,7 +93,8 @@ pub const COLUMN_PAGED_BATCHER_LZ4: Config<bool> = Config::new(
9193
false,
9294
"Compress column-paged batcher chunks with lz4 on the spill path. Only meaningful when \
9395
`enable_column_paged_batcher_spill = true`.",
94-
);
96+
)
97+
.scoped(ParameterScope::Replica);
9598

9699
/// Proactively evict the column-paged batcher's lz4-compressed spill chunks
97100
/// from RSS via `MADV_PAGEOUT` when spilling to the swap backend. Only
@@ -176,7 +179,8 @@ pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new(
176179
);
177180

178181
/// Enable lgalloc.
179-
pub const ENABLE_LGALLOC: Config<bool> = Config::new("enable_lgalloc", true, "Enable lgalloc.");
182+
pub const ENABLE_LGALLOC: Config<bool> =
183+
Config::new("enable_lgalloc", true, "Enable lgalloc.").scoped(ParameterScope::Replica);
180184

181185
/// Enable lgalloc's eager memory return/reclamation feature.
182186
pub const ENABLE_LGALLOC_EAGER_RECLAMATION: Config<bool> = Config::new(

src/controller/src/clusters.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,16 @@ pub struct ReplicaAllocation {
9595
/// T-shirt size.
9696
#[serde(default = "default_true")]
9797
pub is_cc: bool,
98+
/// The size *family* this size belongs to, e.g. the size `D.1-xsmall`
99+
/// belongs to family `D` and the legacy t-shirt sizes belong to family
100+
/// `legacy`. The family is the coarse axis and is *not* a prefix of the size
101+
/// name in general. Used as the
102+
/// `replica_size_family` attribute when evaluating replica-local scoped
103+
/// feature flags (see the scoped feature flags design). When unset, the
104+
/// family falls back to a value derived from [`Self::is_cc`] via
105+
/// [`ReplicaAllocation::family`].
106+
#[serde(default)]
107+
pub family: Option<String>,
98108
/// Whether instances of this type use swap as the spill-to-disk mechanism.
99109
#[serde(default)]
100110
pub swap_enabled: bool,
@@ -106,6 +116,24 @@ pub struct ReplicaAllocation {
106116
pub selectors: BTreeMap<String, String>,
107117
}
108118

119+
impl ReplicaAllocation {
120+
/// The name of the size family this allocation belongs to, used as the
121+
/// `replica_size_family` attribute when evaluating replica-local scoped
122+
/// feature flags.
123+
///
124+
/// Falls back to a value derived from [`Self::is_cc`] when [`Self::family`]
125+
/// is unset: `"cc"` for modern sizes and `"legacy"` for the legacy t-shirt
126+
/// sizes. This keeps the legacy family targetable even before every size
127+
/// gains an explicit `family` in the size configuration.
128+
pub fn family(&self) -> &str {
129+
match &self.family {
130+
Some(family) => family.as_str(),
131+
None if self.is_cc => "cc",
132+
None => "legacy",
133+
}
134+
}
135+
}
136+
109137
fn default_true() -> bool {
110138
true
111139
}
@@ -146,6 +174,7 @@ fn test_replica_allocation_deserialization() {
146174
cpu_request: None,
147175
cpu_exclusive: false,
148176
is_cc: true,
177+
family: None,
149178
swap_enabled: true,
150179
scale: NonZero::new(16).unwrap(),
151180
workers: NonZero::new(1).unwrap(),
@@ -182,6 +211,7 @@ fn test_replica_allocation_deserialization() {
182211
cpu_request: None,
183212
cpu_exclusive: true,
184213
is_cc: true,
214+
family: None,
185215
swap_enabled: false,
186216
scale: NonZero::new(1).unwrap(),
187217
workers: NonZero::new(1).unwrap(),
@@ -198,6 +228,40 @@ fn test_replica_allocation_deserialization() {
198228
assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
199229
}
200230

231+
#[mz_ore::test]
232+
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
233+
fn test_replica_allocation_family() {
234+
let parse = |json: &str| -> ReplicaAllocation {
235+
serde_json::from_str(json).expect("deserialization from JSON succeeds")
236+
};
237+
238+
// An explicit `family` is used verbatim.
239+
assert_eq!(
240+
parse(r#"{"scale": 1, "workers": 1, "credits_per_hour": "0", "family": "D"}"#).family(),
241+
"D"
242+
);
243+
// Without an explicit `family`, modern (`is_cc`) sizes fall back to "cc".
244+
// `is_cc` defaults to true.
245+
assert_eq!(
246+
parse(r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#).family(),
247+
"cc"
248+
);
249+
// Without an explicit `family`, legacy (non-`is_cc`) sizes fall back to
250+
// "legacy".
251+
assert_eq!(
252+
parse(r#"{"scale": 1, "workers": 1, "credits_per_hour": "0", "is_cc": false}"#).family(),
253+
"legacy"
254+
);
255+
// An explicit family wins even for a legacy size.
256+
assert_eq!(
257+
parse(
258+
r#"{"scale": 1, "workers": 1, "credits_per_hour": "0", "is_cc": false, "family": "legacy-special"}"#
259+
)
260+
.family(),
261+
"legacy-special"
262+
);
263+
}
264+
201265
/// Configures the location of a cluster replica.
202266
#[derive(Clone, Debug, Serialize, PartialEq)]
203267
pub enum ReplicaLocation {

0 commit comments

Comments
 (0)