Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/clusterd-test-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,31 @@ impl Driver {
/// Sends `CreateInstance`, opening the compute instance (and the reconciliation
/// window).
///
/// `expiration_offset` and `arrangement_dictionary_compression` are the
/// `expiration_offset`, `arrangement_dictionary_compression`, and `initial_config` are the
/// caller-settable [`InstanceConfig`] knobs; `logging` is left at its default
/// (introspection logging off — enabling it safely needs `index_logs` wiring)
/// and `peek_stash_persist_location` is necessarily the host's, since the driver
/// hosts persist.
///
/// This also force-disables `ENABLE_PEEK_RESPONSE_STASH`: the driver reads peek
/// results inline, so a stashed peek would break [`Self::peek`]/`count`. It is
/// patched here rather than exposed as a configuration knob, so a script's
/// `update-configuration` cannot turn it back on.
/// `initial_config` is the create-time configuration snapshot the controller would build from
/// its synced dyncfg. The replica applies it before create-time setup, so a script can assert
/// that create-time work observes synced values rather than defaults. The peek-response stash
/// is always force-disabled on top of it: the driver reads peek results inline, so a stashed
/// peek would break [`Self::peek`]/`count`. It is patched here rather than exposed as a knob,
/// so neither `initial_config` nor a later `update-configuration` can turn it back on.
pub fn create_instance(
&self,
expiration_offset: Option<Duration>,
arrangement_dictionary_compression: bool,
mut initial_config: ConfigUpdates,
) -> anyhow::Result<()> {
initial_config.add(&ENABLE_PEEK_RESPONSE_STASH, false);
self.send(ComputeCommand::CreateInstance(Box::new(InstanceConfig {
logging: Default::default(),
expiration_offset,
peek_stash_persist_location: self.host.location().clone(),
arrangement_dictionary_compression,
initial_config,
})))?;
let mut dyncfg_updates = ConfigUpdates::default();
dyncfg_updates.add(&ENABLE_PEEK_RESPONSE_STASH, false);
Expand Down
20 changes: 18 additions & 2 deletions src/clusterd-test-driver/src/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,11 @@ pub enum Command {
/// Whether arrangements use dictionary compression.
#[serde(default)]
arrangement_dictionary_compression: bool,
/// The create-time dyncfg snapshot the controller would supply (`name type value` rows).
/// Applied to the replica's worker config before create-time setup, so a scenario can
/// assert that create-time work observes synced values rather than dyncfg defaults.
#[serde(default)]
initial_config: Vec<ConfigSetting>,
},
/// Send `UpdateConfiguration` with a table of dyncfg updates (`name type value`
/// rows). Generic over any configuration; the peek-response stash is not settable
Expand Down Expand Up @@ -1062,6 +1067,7 @@ impl ScriptState {
Command::CreateInstance {
expiration_offset,
arrangement_dictionary_compression,
initial_config,
} => {
let expiration_offset = expiration_offset
.as_deref()
Expand All @@ -1071,8 +1077,18 @@ impl ScriptState {
})
})
.transpose()?;
self.driver
.create_instance(expiration_offset, arrangement_dictionary_compression)?;
let mut initial = ConfigUpdates::default();
for setting in &initial_config {
initial.add_dynamic(
&setting.name,
parse_config_val(&setting.ty, &setting.value)?,
);
}
self.driver.create_instance(
expiration_offset,
arrangement_dictionary_compression,
initial,
)?;
Ok("ok".to_string())
}
Command::UpdateConfiguration { updates } => {
Expand Down
17 changes: 17 additions & 0 deletions src/clusterd-test-driver/src/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ fn parse_command(input: &str) -> anyhow::Result<Command> {
.transpose()
.context("argument `arrangement-dictionary-compression` is not a bool")?
.unwrap_or(false),
// Same `name type value` body rows as `update-configuration`.
initial_config: settings_from_body(body)?,
},
"update-configuration" => Command::UpdateConfiguration {
updates: settings_from_body(body)?,
Expand Down Expand Up @@ -699,6 +701,7 @@ mod tests {
Command::CreateInstance {
expiration_offset: None,
arrangement_dictionary_compression: false,
initial_config: vec![],
}
);
assert_eq!(
Expand All @@ -709,6 +712,20 @@ mod tests {
Command::CreateInstance {
expiration_offset: Some("30s".to_string()),
arrangement_dictionary_compression: true,
initial_config: vec![],
}
);
// The create-time snapshot is a `name type value` body, like `update-configuration`.
assert_eq!(
parse_command("create-instance\n enable_my_flag bool true").unwrap(),
Command::CreateInstance {
expiration_offset: None,
arrangement_dictionary_compression: false,
initial_config: vec![ConfigSetting {
name: "enable_my_flag".to_string(),
ty: "bool".to_string(),
value: "true".to_string(),
}],
}
);

Expand Down
2 changes: 1 addition & 1 deletion src/clusterd-test-driver/tests/index_smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn index_over_small_shard() {
// `create_instance` also force-disables the peek-response stash, so peeks
// return their rows inline.
driver
.create_instance(None, false)
.create_instance(None, false, Default::default())
.expect("create instance");
driver
.send(mz_compute_client::protocol::command::ComputeCommand::InitializationComplete)
Expand Down
5 changes: 5 additions & 0 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,11 @@ impl ComputeController {
instance.call(Instance::initialization_complete);
}

// The replica also receives the current dyncfg create-time, folded into `CreateInstance`
// so create-time setup observes synced values. This `UpdateConfiguration` is still
// required: it carries the rest of `ComputeParameters` (workload class, max result size,
// tracing) and syncs the dyncfg into the persist config and metrics, none of which ride in
// `CreateInstance`. The overlapping dyncfg application is idempotent.
let mut config_params = self.config.clone();
config_params.workload_class = Some(workload_class);
instance.call(|i| i.update_configuration(config_params));
Expand Down
169 changes: 156 additions & 13 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,10 +989,12 @@ impl Instance {
let instance_config = InstanceConfig {
peek_stash_persist_location: self.peek_stash_persist_location.clone(),
// The remaining fields are replica-specific and will be set in
// `ReplicaTask::specialize_command`.
// `ReplicaTask::specialize_command` (logging, expiration, dictionary compression) and
// `Instance::specialize_command_for_replica` (the initial config snapshot).
logging: Default::default(),
expiration_offset: Default::default(),
arrangement_dictionary_compression: Default::default(),
initial_config: Default::default(),
};

self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));
Expand Down Expand Up @@ -1107,36 +1109,55 @@ impl Instance {

let target_replica = self.target_replica(&cmd);

// Borrow the overrides separately from `self.replicas` so the per-replica
// Borrow the overrides and dyncfg separately from `self.replicas` so the per-replica
// specialization below does not conflict with the mutable replica borrow.
let overrides = &self.replica_dyncfg_overrides;
let dyncfg = &self.dyncfg;

if let Some(rid) = target_replica {
if let Some(replica) = self.replicas.get_mut(&rid) {
let cmd = Self::specialize_command_for_replica(cmd, rid, overrides);
let cmd = Self::specialize_command_for_replica(cmd, rid, overrides, dyncfg);
let _ = replica.client.send(cmd);
}
} else {
for (rid, replica) in self.replicas.iter_mut() {
let cmd = Self::specialize_command_for_replica(cmd.clone(), *rid, overrides);
let cmd =
Self::specialize_command_for_replica(cmd.clone(), *rid, overrides, dyncfg);
let _ = replica.client.send(cmd);
}
}
}

/// Specializes a command for a specific replica by merging that replica's
/// dyncfg override (if any) into an `UpdateConfiguration` command. All other
/// commands, and replicas without an override, are returned unchanged.
/// Specializes a command for a specific replica by merging that replica's dyncfg override into
/// its configuration. For `UpdateConfiguration` the override is merged into the update. For
/// `CreateInstance` the current dyncfg, with the override applied on top, is captured as the
/// initial config snapshot. All other commands are returned unchanged.
///
/// The snapshot is built here, rather than baked into the history, so it reflects the dyncfg
/// and override values current at the time the command is sent or replayed to the replica.
fn specialize_command_for_replica(
mut cmd: ComputeCommand,
replica_id: ReplicaId,
overrides: &BTreeMap<ReplicaId, ConfigUpdates>,
dyncfg: &ConfigSet,
) -> ComputeCommand {
if let ComputeCommand::UpdateConfiguration(params) = &mut cmd
&& let Some(over) = overrides.get(&replica_id)
&& !over.updates.is_empty()
{
params.dyncfg_updates.extend(over.clone());
let over = overrides.get(&replica_id);
match &mut cmd {
ComputeCommand::UpdateConfiguration(params) => {
if let Some(over) = over
&& !over.updates.is_empty()
{
params.dyncfg_updates.extend(over.clone());
}
}
ComputeCommand::CreateInstance(config) => {
let mut initial = ConfigUpdates::from(dyncfg);
if let Some(over) = over {
initial.extend(over.clone());
}
config.initial_config = initial;
}
_ => {}
}
cmd
}
Expand Down Expand Up @@ -1228,11 +1249,13 @@ impl Instance {
continue;
}

// Re-apply this replica's dyncfg override to replayed config commands.
// Re-apply this replica's dyncfg override to replayed config commands, and rebuild the
// create-instance snapshot from the current dyncfg.
let command = Self::specialize_command_for_replica(
command.clone(),
id,
&self.replica_dyncfg_overrides,
&self.dyncfg,
);
if client.send(command).is_err() {
// We swallow the error here. On the next send, we will fail again, and
Expand Down Expand Up @@ -3387,3 +3410,123 @@ impl Drop for ReplicaCollectionIntrospection {
self.send(IntrospectionType::ReplicaFrontiers, updates);
}
}

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

use mz_compute_types::dyncfgs::{ENABLE_COLUMN_PAGED_BATCHER, ENABLE_MZ_JOIN_CORE};
use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal};
use mz_persist_types::PersistLocation;

use crate::protocol::command::{ComputeCommand, InstanceConfig};

use super::{Instance, ReplicaId};

fn create_instance_command() -> ComputeCommand {
ComputeCommand::CreateInstance(Box::new(InstanceConfig {
logging: Default::default(),
expiration_offset: None,
peek_stash_persist_location: PersistLocation::new_in_mem(),
arrangement_dictionary_compression: false,
initial_config: Default::default(),
}))
}

fn initial_config(cmd: &ComputeCommand) -> &ConfigUpdates {
match cmd {
ComputeCommand::CreateInstance(config) => &config.initial_config,
other => panic!("expected CreateInstance, got {other:?}"),
}
}

/// `CreateInstance` is specialized with a full snapshot of the instance-wide dyncfg, so the
/// replica seeds its worker config at create time rather than waiting for the first
/// `UpdateConfiguration`. This is the regression guard for create-time setup observing dyncfg
/// defaults.
#[mz_ore::test]
fn create_instance_snapshots_instance_wide_dyncfg() {
let dyncfg = ConfigSet::default()
.add(&ENABLE_COLUMN_PAGED_BATCHER)
.add(&ENABLE_MZ_JOIN_CORE);
let mut updates = ConfigUpdates::default();
updates.add(&ENABLE_COLUMN_PAGED_BATCHER, true);
updates.add(&ENABLE_MZ_JOIN_CORE, false);
updates.apply(&dyncfg);

// A replica without an override sees exactly the instance-wide values.
let overrides = BTreeMap::new();
let cmd = Instance::specialize_command_for_replica(
create_instance_command(),
ReplicaId::User(1),
&overrides,
&dyncfg,
);
let snapshot = initial_config(&cmd);
assert_eq!(
snapshot.updates.get(ENABLE_COLUMN_PAGED_BATCHER.name()),
Some(&ConfigVal::Bool(true)),
);
assert_eq!(
snapshot.updates.get(ENABLE_MZ_JOIN_CORE.name()),
Some(&ConfigVal::Bool(false)),
);
}

/// A replica-scoped override beats the instance-wide value in the create-time snapshot, so a
/// create-time-frozen scoped flag reaches the replica with its override applied.
#[mz_ore::test]
fn create_instance_snapshot_applies_replica_override() {
let dyncfg = ConfigSet::default().add(&ENABLE_COLUMN_PAGED_BATCHER);
let mut updates = ConfigUpdates::default();
updates.add(&ENABLE_COLUMN_PAGED_BATCHER, true);
updates.apply(&dyncfg);

let replica = ReplicaId::User(1);
let mut override_updates = ConfigUpdates::default();
override_updates.add(&ENABLE_COLUMN_PAGED_BATCHER, false);
let overrides = BTreeMap::from([(replica, override_updates)]);

let cmd = Instance::specialize_command_for_replica(
create_instance_command(),
replica,
&overrides,
&dyncfg,
);
assert_eq!(
initial_config(&cmd)
.updates
.get(ENABLE_COLUMN_PAGED_BATCHER.name()),
Some(&ConfigVal::Bool(false)),
"replica override should win over the instance-wide value",
);
}

/// `UpdateConfiguration` continues to merge the replica's override into the update.
#[mz_ore::test]
fn update_configuration_merges_replica_override() {
let dyncfg = ConfigSet::default().add(&ENABLE_COLUMN_PAGED_BATCHER);

let replica = ReplicaId::User(1);
let mut override_updates = ConfigUpdates::default();
override_updates.add(&ENABLE_COLUMN_PAGED_BATCHER, true);
let overrides = BTreeMap::from([(replica, override_updates)]);

let cmd = Instance::specialize_command_for_replica(
ComputeCommand::UpdateConfiguration(Box::new(Default::default())),
replica,
&overrides,
&dyncfg,
);
match cmd {
ComputeCommand::UpdateConfiguration(params) => assert_eq!(
params
.dyncfg_updates
.updates
.get(ENABLE_COLUMN_PAGED_BATCHER.name()),
Some(&ConfigVal::Bool(true)),
),
other => panic!("expected UpdateConfiguration, got {other:?}"),
}
}
}
Loading
Loading