Skip to content

Commit 141cb2a

Browse files
antiguruclaude
andauthored
compute: sync controller config before create-time setup (#37294)
### Motivation A replica creates its `ComputeState` when it handles `CreateInstance`, seeding `worker_config` with dyncfg defaults. `handle_create_instance` then immediately runs `apply_worker_config`, but the controller's synced configuration only arrives one command later, in the first `UpdateConfiguration`. Both the live command stream (`Instance::run` sends `Hello` then `CreateInstance` before the first `update_configuration`) and the reconciliation order (`ComputeCommandHistory::reduce` places the single `UpdateConfiguration` after `CreateInstance`) put configuration after creation. Reordering is not viable either: there is no `ComputeState`/`worker_config` to apply configuration to before `CreateInstance`. As a result, every create-time read that depends on configuration silently used defaults: lgalloc, the pager, the column-paged batcher, the memory limiter, `ore_overflowing_behavior`, and the introspection dataflows rendered during `CreateInstance`. This is also why create-time-frozen scoped flags from #37158 had to be worked around. ### Description Carry the configuration with `CreateInstance`. `InstanceConfig` gains an `initial_config` snapshot of the controller's dyncfg with the replica's scoped overrides applied on top. The controller builds it in `Instance::specialize_command_for_replica`, the same layer that already injects scoped overrides into `UpdateConfiguration`, and it has the live `dyncfg` and override map there. It is evaluated fresh on every send and on every `add_replica` replay, so the snapshot always reflects current values. The replica applies it to `worker_config` before `apply_worker_config`. The snapshot is excluded from `InstanceConfig::compatible_with`, like dictionary compression, so reconnecting to a running replica does not halt on configuration that has changed since creation. An empty snapshot leaves the worker at its defaults until the first `UpdateConfiguration`, preserving prior behavior. Soundness: * Fresh startup or replica restart: reconcile applies `CreateInstance`, seeding `worker_config` from the snapshot before create-time setup. * `environmentd` reconnect to a running replica: reconcile only compatibility-checks `CreateInstance`; the existing already-synced `worker_config` is retained and the replayed `UpdateConfiguration` keeps it current. * `UpdateConfiguration` still applies globally and remains hoistable; the snapshot is a redundant create-time seed of the same values, not a new ordering dependency. ### Tests Adds unit tests in the compute controller covering the create-time snapshot and the scoped-override merge, and confirming the existing `UpdateConfiguration` override merge is unchanged. Adds a `clusterd-test-driver` scenario and a parse test exercising the create-time snapshot plumbing end to end. ### Checklist * [ ] This PR has adequate test coverage / or no new functionality requires testing. --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 5faf988 commit 141cb2a

11 files changed

Lines changed: 297 additions & 21 deletions

File tree

src/clusterd-test-driver/src/driver.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,26 +72,31 @@ impl Driver {
7272
/// Sends `CreateInstance`, opening the compute instance (and the reconciliation
7373
/// window).
7474
///
75-
/// `expiration_offset` and `arrangement_dictionary_compression` are the
75+
/// `expiration_offset`, `arrangement_dictionary_compression`, and `initial_config` are the
7676
/// caller-settable [`InstanceConfig`] knobs; `logging` is left at its default
7777
/// (introspection logging off — enabling it safely needs `index_logs` wiring)
7878
/// and `peek_stash_persist_location` is necessarily the host's, since the driver
7979
/// hosts persist.
8080
///
81-
/// This also force-disables `ENABLE_PEEK_RESPONSE_STASH`: the driver reads peek
82-
/// results inline, so a stashed peek would break [`Self::peek`]/`count`. It is
83-
/// patched here rather than exposed as a configuration knob, so a script's
84-
/// `update-configuration` cannot turn it back on.
81+
/// `initial_config` is the create-time configuration snapshot the controller would build from
82+
/// its synced dyncfg. The replica applies it before create-time setup, so a script can assert
83+
/// that create-time work observes synced values rather than defaults. The peek-response stash
84+
/// is always force-disabled on top of it: the driver reads peek results inline, so a stashed
85+
/// peek would break [`Self::peek`]/`count`. It is patched here rather than exposed as a knob,
86+
/// so neither `initial_config` nor a later `update-configuration` can turn it back on.
8587
pub fn create_instance(
8688
&self,
8789
expiration_offset: Option<Duration>,
8890
arrangement_dictionary_compression: bool,
91+
mut initial_config: ConfigUpdates,
8992
) -> anyhow::Result<()> {
93+
initial_config.add(&ENABLE_PEEK_RESPONSE_STASH, false);
9094
self.send(ComputeCommand::CreateInstance(Box::new(InstanceConfig {
9195
logging: Default::default(),
9296
expiration_offset,
9397
peek_stash_persist_location: self.host.location().clone(),
9498
arrangement_dictionary_compression,
99+
initial_config,
95100
})))?;
96101
let mut dyncfg_updates = ConfigUpdates::default();
97102
dyncfg_updates.add(&ENABLE_PEEK_RESPONSE_STASH, false);

src/clusterd-test-driver/src/script.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,11 @@ pub enum Command {
527527
/// Whether arrangements use dictionary compression.
528528
#[serde(default)]
529529
arrangement_dictionary_compression: bool,
530+
/// The create-time dyncfg snapshot the controller would supply (`name type value` rows).
531+
/// Applied to the replica's worker config before create-time setup, so a scenario can
532+
/// assert that create-time work observes synced values rather than dyncfg defaults.
533+
#[serde(default)]
534+
initial_config: Vec<ConfigSetting>,
530535
},
531536
/// Send `UpdateConfiguration` with a table of dyncfg updates (`name type value`
532537
/// rows). Generic over any configuration; the peek-response stash is not settable
@@ -1062,6 +1067,7 @@ impl ScriptState {
10621067
Command::CreateInstance {
10631068
expiration_offset,
10641069
arrangement_dictionary_compression,
1070+
initial_config,
10651071
} => {
10661072
let expiration_offset = expiration_offset
10671073
.as_deref()
@@ -1071,8 +1077,18 @@ impl ScriptState {
10711077
})
10721078
})
10731079
.transpose()?;
1074-
self.driver
1075-
.create_instance(expiration_offset, arrangement_dictionary_compression)?;
1080+
let mut initial = ConfigUpdates::default();
1081+
for setting in &initial_config {
1082+
initial.add_dynamic(
1083+
&setting.name,
1084+
parse_config_val(&setting.ty, &setting.value)?,
1085+
);
1086+
}
1087+
self.driver.create_instance(
1088+
expiration_offset,
1089+
arrangement_dictionary_compression,
1090+
initial,
1091+
)?;
10761092
Ok("ok".to_string())
10771093
}
10781094
Command::UpdateConfiguration { updates } => {

src/clusterd-test-driver/src/text.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,8 @@ fn parse_command(input: &str) -> anyhow::Result<Command> {
504504
.transpose()
505505
.context("argument `arrangement-dictionary-compression` is not a bool")?
506506
.unwrap_or(false),
507+
// Same `name type value` body rows as `update-configuration`.
508+
initial_config: settings_from_body(body)?,
507509
},
508510
"update-configuration" => Command::UpdateConfiguration {
509511
updates: settings_from_body(body)?,
@@ -699,6 +701,7 @@ mod tests {
699701
Command::CreateInstance {
700702
expiration_offset: None,
701703
arrangement_dictionary_compression: false,
704+
initial_config: vec![],
702705
}
703706
);
704707
assert_eq!(
@@ -709,6 +712,20 @@ mod tests {
709712
Command::CreateInstance {
710713
expiration_offset: Some("30s".to_string()),
711714
arrangement_dictionary_compression: true,
715+
initial_config: vec![],
716+
}
717+
);
718+
// The create-time snapshot is a `name type value` body, like `update-configuration`.
719+
assert_eq!(
720+
parse_command("create-instance\n enable_my_flag bool true").unwrap(),
721+
Command::CreateInstance {
722+
expiration_offset: None,
723+
arrangement_dictionary_compression: false,
724+
initial_config: vec![ConfigSetting {
725+
name: "enable_my_flag".to_string(),
726+
ty: "bool".to_string(),
727+
value: "true".to_string(),
728+
}],
712729
}
713730
);
714731

src/clusterd-test-driver/tests/index_smoke.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ async fn index_over_small_shard() {
6767
// `create_instance` also force-disables the peek-response stash, so peeks
6868
// return their rows inline.
6969
driver
70-
.create_instance(None, false)
70+
.create_instance(None, false, Default::default())
7171
.expect("create instance");
7272
driver
7373
.send(mz_compute_client::protocol::command::ComputeCommand::InitializationComplete)

src/compute-client/src/controller.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,11 @@ impl ComputeController {
551551
instance.call(Instance::initialization_complete);
552552
}
553553

554+
// The replica also receives the current dyncfg create-time, folded into `CreateInstance`
555+
// so create-time setup observes synced values. This `UpdateConfiguration` is still
556+
// required: it carries the rest of `ComputeParameters` (workload class, max result size,
557+
// tracing) and syncs the dyncfg into the persist config and metrics, none of which ride in
558+
// `CreateInstance`. The overlapping dyncfg application is idempotent.
554559
let mut config_params = self.config.clone();
555560
config_params.workload_class = Some(workload_class);
556561
instance.call(|i| i.update_configuration(config_params));

src/compute-client/src/controller/instance.rs

Lines changed: 156 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -989,10 +989,12 @@ impl Instance {
989989
let instance_config = InstanceConfig {
990990
peek_stash_persist_location: self.peek_stash_persist_location.clone(),
991991
// The remaining fields are replica-specific and will be set in
992-
// `ReplicaTask::specialize_command`.
992+
// `ReplicaTask::specialize_command` (logging, expiration, dictionary compression) and
993+
// `Instance::specialize_command_for_replica` (the initial config snapshot).
993994
logging: Default::default(),
994995
expiration_offset: Default::default(),
995996
arrangement_dictionary_compression: Default::default(),
997+
initial_config: Default::default(),
996998
};
997999

9981000
self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
@@ -1107,36 +1109,55 @@ impl Instance {
11071109

11081110
let target_replica = self.target_replica(&cmd);
11091111

1110-
// Borrow the overrides separately from `self.replicas` so the per-replica
1112+
// Borrow the overrides and dyncfg separately from `self.replicas` so the per-replica
11111113
// specialization below does not conflict with the mutable replica borrow.
11121114
let overrides = &self.replica_dyncfg_overrides;
1115+
let dyncfg = &self.dyncfg;
11131116

11141117
if let Some(rid) = target_replica {
11151118
if let Some(replica) = self.replicas.get_mut(&rid) {
1116-
let cmd = Self::specialize_command_for_replica(cmd, rid, overrides);
1119+
let cmd = Self::specialize_command_for_replica(cmd, rid, overrides, dyncfg);
11171120
let _ = replica.client.send(cmd);
11181121
}
11191122
} else {
11201123
for (rid, replica) in self.replicas.iter_mut() {
1121-
let cmd = Self::specialize_command_for_replica(cmd.clone(), *rid, overrides);
1124+
let cmd =
1125+
Self::specialize_command_for_replica(cmd.clone(), *rid, overrides, dyncfg);
11221126
let _ = replica.client.send(cmd);
11231127
}
11241128
}
11251129
}
11261130

1127-
/// Specializes a command for a specific replica by merging that replica's
1128-
/// dyncfg override (if any) into an `UpdateConfiguration` command. All other
1129-
/// commands, and replicas without an override, are returned unchanged.
1131+
/// Specializes a command for a specific replica by merging that replica's dyncfg override into
1132+
/// its configuration. For `UpdateConfiguration` the override is merged into the update. For
1133+
/// `CreateInstance` the current dyncfg, with the override applied on top, is captured as the
1134+
/// initial config snapshot. All other commands are returned unchanged.
1135+
///
1136+
/// The snapshot is built here, rather than baked into the history, so it reflects the dyncfg
1137+
/// and override values current at the time the command is sent or replayed to the replica.
11301138
fn specialize_command_for_replica(
11311139
mut cmd: ComputeCommand,
11321140
replica_id: ReplicaId,
11331141
overrides: &BTreeMap<ReplicaId, ConfigUpdates>,
1142+
dyncfg: &ConfigSet,
11341143
) -> ComputeCommand {
1135-
if let ComputeCommand::UpdateConfiguration(params) = &mut cmd
1136-
&& let Some(over) = overrides.get(&replica_id)
1137-
&& !over.updates.is_empty()
1138-
{
1139-
params.dyncfg_updates.extend(over.clone());
1144+
let over = overrides.get(&replica_id);
1145+
match &mut cmd {
1146+
ComputeCommand::UpdateConfiguration(params) => {
1147+
if let Some(over) = over
1148+
&& !over.updates.is_empty()
1149+
{
1150+
params.dyncfg_updates.extend(over.clone());
1151+
}
1152+
}
1153+
ComputeCommand::CreateInstance(config) => {
1154+
let mut initial = ConfigUpdates::from(dyncfg);
1155+
if let Some(over) = over {
1156+
initial.extend(over.clone());
1157+
}
1158+
config.initial_config = initial;
1159+
}
1160+
_ => {}
11401161
}
11411162
cmd
11421163
}
@@ -1228,11 +1249,13 @@ impl Instance {
12281249
continue;
12291250
}
12301251

1231-
// Re-apply this replica's dyncfg override to replayed config commands.
1252+
// Re-apply this replica's dyncfg override to replayed config commands, and rebuild the
1253+
// create-instance snapshot from the current dyncfg.
12321254
let command = Self::specialize_command_for_replica(
12331255
command.clone(),
12341256
id,
12351257
&self.replica_dyncfg_overrides,
1258+
&self.dyncfg,
12361259
);
12371260
if client.send(command).is_err() {
12381261
// We swallow the error here. On the next send, we will fail again, and
@@ -3387,3 +3410,123 @@ impl Drop for ReplicaCollectionIntrospection {
33873410
self.send(IntrospectionType::ReplicaFrontiers, updates);
33883411
}
33893412
}
3413+
3414+
#[cfg(test)]
3415+
mod tests {
3416+
use std::collections::BTreeMap;
3417+
3418+
use mz_compute_types::dyncfgs::{ENABLE_COLUMN_PAGED_BATCHER, ENABLE_MZ_JOIN_CORE};
3419+
use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal};
3420+
use mz_persist_types::PersistLocation;
3421+
3422+
use crate::protocol::command::{ComputeCommand, InstanceConfig};
3423+
3424+
use super::{Instance, ReplicaId};
3425+
3426+
fn create_instance_command() -> ComputeCommand {
3427+
ComputeCommand::CreateInstance(Box::new(InstanceConfig {
3428+
logging: Default::default(),
3429+
expiration_offset: None,
3430+
peek_stash_persist_location: PersistLocation::new_in_mem(),
3431+
arrangement_dictionary_compression: false,
3432+
initial_config: Default::default(),
3433+
}))
3434+
}
3435+
3436+
fn initial_config(cmd: &ComputeCommand) -> &ConfigUpdates {
3437+
match cmd {
3438+
ComputeCommand::CreateInstance(config) => &config.initial_config,
3439+
other => panic!("expected CreateInstance, got {other:?}"),
3440+
}
3441+
}
3442+
3443+
/// `CreateInstance` is specialized with a full snapshot of the instance-wide dyncfg, so the
3444+
/// replica seeds its worker config at create time rather than waiting for the first
3445+
/// `UpdateConfiguration`. This is the regression guard for create-time setup observing dyncfg
3446+
/// defaults.
3447+
#[mz_ore::test]
3448+
fn create_instance_snapshots_instance_wide_dyncfg() {
3449+
let dyncfg = ConfigSet::default()
3450+
.add(&ENABLE_COLUMN_PAGED_BATCHER)
3451+
.add(&ENABLE_MZ_JOIN_CORE);
3452+
let mut updates = ConfigUpdates::default();
3453+
updates.add(&ENABLE_COLUMN_PAGED_BATCHER, true);
3454+
updates.add(&ENABLE_MZ_JOIN_CORE, false);
3455+
updates.apply(&dyncfg);
3456+
3457+
// A replica without an override sees exactly the instance-wide values.
3458+
let overrides = BTreeMap::new();
3459+
let cmd = Instance::specialize_command_for_replica(
3460+
create_instance_command(),
3461+
ReplicaId::User(1),
3462+
&overrides,
3463+
&dyncfg,
3464+
);
3465+
let snapshot = initial_config(&cmd);
3466+
assert_eq!(
3467+
snapshot.updates.get(ENABLE_COLUMN_PAGED_BATCHER.name()),
3468+
Some(&ConfigVal::Bool(true)),
3469+
);
3470+
assert_eq!(
3471+
snapshot.updates.get(ENABLE_MZ_JOIN_CORE.name()),
3472+
Some(&ConfigVal::Bool(false)),
3473+
);
3474+
}
3475+
3476+
/// A replica-scoped override beats the instance-wide value in the create-time snapshot, so a
3477+
/// create-time-frozen scoped flag reaches the replica with its override applied.
3478+
#[mz_ore::test]
3479+
fn create_instance_snapshot_applies_replica_override() {
3480+
let dyncfg = ConfigSet::default().add(&ENABLE_COLUMN_PAGED_BATCHER);
3481+
let mut updates = ConfigUpdates::default();
3482+
updates.add(&ENABLE_COLUMN_PAGED_BATCHER, true);
3483+
updates.apply(&dyncfg);
3484+
3485+
let replica = ReplicaId::User(1);
3486+
let mut override_updates = ConfigUpdates::default();
3487+
override_updates.add(&ENABLE_COLUMN_PAGED_BATCHER, false);
3488+
let overrides = BTreeMap::from([(replica, override_updates)]);
3489+
3490+
let cmd = Instance::specialize_command_for_replica(
3491+
create_instance_command(),
3492+
replica,
3493+
&overrides,
3494+
&dyncfg,
3495+
);
3496+
assert_eq!(
3497+
initial_config(&cmd)
3498+
.updates
3499+
.get(ENABLE_COLUMN_PAGED_BATCHER.name()),
3500+
Some(&ConfigVal::Bool(false)),
3501+
"replica override should win over the instance-wide value",
3502+
);
3503+
}
3504+
3505+
/// `UpdateConfiguration` continues to merge the replica's override into the update.
3506+
#[mz_ore::test]
3507+
fn update_configuration_merges_replica_override() {
3508+
let dyncfg = ConfigSet::default().add(&ENABLE_COLUMN_PAGED_BATCHER);
3509+
3510+
let replica = ReplicaId::User(1);
3511+
let mut override_updates = ConfigUpdates::default();
3512+
override_updates.add(&ENABLE_COLUMN_PAGED_BATCHER, true);
3513+
let overrides = BTreeMap::from([(replica, override_updates)]);
3514+
3515+
let cmd = Instance::specialize_command_for_replica(
3516+
ComputeCommand::UpdateConfiguration(Box::new(Default::default())),
3517+
replica,
3518+
&overrides,
3519+
&dyncfg,
3520+
);
3521+
match cmd {
3522+
ComputeCommand::UpdateConfiguration(params) => assert_eq!(
3523+
params
3524+
.dyncfg_updates
3525+
.updates
3526+
.get(ENABLE_COLUMN_PAGED_BATCHER.name()),
3527+
Some(&ConfigVal::Bool(true)),
3528+
),
3529+
other => panic!("expected UpdateConfiguration, got {other:?}"),
3530+
}
3531+
}
3532+
}

0 commit comments

Comments
 (0)