Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions src/clusterd-test-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use mz_compute_client::protocol::command::{
};
use mz_compute_client::protocol::response::{FrontiersResponse, PeekResponse};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::dyncfgs::ENABLE_PEEK_RESPONSE_STASH;
use mz_compute_types::dyncfgs::{
ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA, ENABLE_PEEK_RESPONSE_STASH,
};
use mz_compute_types::plan::render_plan::RenderPlan;
use mz_dyncfg::ConfigUpdates;
use mz_expr::{MapFilterProject, RowSetFinishing};
Expand Down Expand Up @@ -72,11 +74,13 @@ impl Driver {
/// Sends `CreateInstance`, opening the compute instance (and the reconciliation
/// window).
///
/// `expiration_offset`, `arrangement_dictionary_compression`, and `initial_config` are the
/// caller-settable [`InstanceConfig`] knobs; `logging` is left at its default
/// (introspection logging off — enabling it safely needs `index_logs` wiring)
/// and `peek_stash_persist_location` is necessarily the host's, since the driver
/// hosts persist.
/// `expiration_offset` is a caller-settable [`InstanceConfig`] knob. `logging` is left at its
/// default (introspection logging off, as enabling it safely needs `index_logs` wiring) and
/// `peek_stash_persist_location` is necessarily the host's, since the driver hosts persist.
///
/// `arrangement_dictionary_compression` is pushed as the
/// `enable_arrangement_dictionary_compression_alpha` dyncfg in the `UpdateConfiguration` below,
/// which is where the replica captures it.
///
/// `initial_config` is the create-time configuration snapshot the controller would build from
/// its synced dyncfg. The replica applies it before create-time setup, so a script can assert
Expand All @@ -95,11 +99,14 @@ impl Driver {
logging: Default::default(),
expiration_offset,
peek_stash_persist_location: self.host.location().clone(),
arrangement_dictionary_compression,
initial_config,
})))?;
let mut dyncfg_updates = ConfigUpdates::default();
dyncfg_updates.add(&ENABLE_PEEK_RESPONSE_STASH, false);
dyncfg_updates.add(
&ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA,
arrangement_dictionary_compression,
);
self.send(ComputeCommand::UpdateConfiguration(Box::new(
ComputeParameters {
dyncfg_updates,
Expand Down
11 changes: 1 addition & 10 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ use mz_cluster_client::{ReplicaId, WallclockLagFn};
use mz_compute_types::ComputeInstanceId;
use mz_compute_types::config::ComputeReplicaConfig;
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::dyncfgs::{
COMPUTE_REPLICA_EXPIRATION_OFFSET, ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA,
};
use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
use mz_dyncfg::{ConfigSet, ConfigUpdates};
use mz_expr::RowSetFinishing;
use mz_expr::row::RowCollection;
Expand Down Expand Up @@ -720,12 +718,6 @@ impl ComputeController {

let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);

// Capture dictionary compression once, at replica creation, and hold it fixed for the
// replica's lifetime (see `InstanceConfig::arrangement_dictionary_compression`). This is
// why a later flip of the flag only affects replicas created afterwards.
let arrangement_dictionary_compression =
ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA.get(&self.dyncfg);

let replica_config = ReplicaConfig {
location,
logging: LoggingConfig {
Expand All @@ -736,7 +728,6 @@ impl ComputeController {
},
grpc_client: self.config.grpc_client.clone(),
expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
arrangement_dictionary_compression,
};

let instance = self.instance_mut(instance_id).expect("validated");
Expand Down
4 changes: 1 addition & 3 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,11 +989,10 @@ impl Instance {
let instance_config = InstanceConfig {
peek_stash_persist_location: self.peek_stash_persist_location.clone(),
// The remaining fields are replica-specific and will be set in
// `ReplicaTask::specialize_command` (logging, expiration, dictionary compression) and
// `ReplicaTask::specialize_command` (logging, expiration) and
// `Instance::specialize_command_for_replica` (the initial config snapshot).
logging: Default::default(),
expiration_offset: Default::default(),
arrangement_dictionary_compression: Default::default(),
initial_config: Default::default(),
};

Expand Down Expand Up @@ -3428,7 +3427,6 @@ mod tests {
logging: Default::default(),
expiration_offset: None,
peek_stash_persist_location: PersistLocation::new_in_mem(),
arrangement_dictionary_compression: false,
initial_config: Default::default(),
}))
}
Expand Down
4 changes: 0 additions & 4 deletions src/compute-client/src/controller/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ pub(super) struct ReplicaConfig {
pub grpc_client: GrpcClientParameters,
/// The offset to use for replica expiration, if any.
pub expiration_offset: Option<Duration>,
/// Whether arrangements on this replica use dictionary compression, captured at creation.
pub arrangement_dictionary_compression: bool,
}

/// A client for a replica task.
Expand Down Expand Up @@ -284,8 +282,6 @@ impl ReplicaTask {
if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
config.expiration_offset = self.config.expiration_offset;
}
config.arrangement_dictionary_compression =
self.config.arrangement_dictionary_compression;
}
_ => {}
}
Expand Down
19 changes: 5 additions & 14 deletions src/compute-client/src/protocol/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ pub enum ComputeCommand {
/// Configuration for a replica, passed with the `CreateInstance`. Replicas should halt
/// if the controller attempt to reconcile them with different values
/// for anything in this struct.
///
/// Keep this minimal: only configuration the replica needs in order to start belongs here.
/// Anything that can be applied through a configuration update should be a dyncfg instead, so a
/// change to it does not make an otherwise-reconcilable `CreateInstance` incompatible.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct InstanceConfig {
/// Specification of introspection logging.
Expand All @@ -275,12 +279,6 @@ pub struct InstanceConfig {
pub expiration_offset: Option<Duration>,
/// The persist location where we can stash large peek results.
pub peek_stash_persist_location: PersistLocation,
/// Whether arrangements created by this replica use dictionary compression.
///
/// Captured from `enable_arrangement_dictionary_compression_alpha` when the replica is created and
/// held fixed for the replica's lifetime, so flipping the flag only affects replicas created
/// afterwards rather than retroactively changing arrangements across the environment.
pub arrangement_dictionary_compression: bool,
/// A snapshot of the controller's dynamic configuration at replica creation, including any
/// replica-scoped overrides.
///
Expand All @@ -302,11 +300,7 @@ impl InstanceConfig {
/// forcing replica restarts. However, it also means that replicas will only pick up the new
/// value after a restart.
///
/// Dictionary compression is intentionally excluded from this check: it is captured at replica
/// creation, so a change is always compatible and a running replica keeps the value it was
/// created with (a new value is only picked up by replicas created afterwards).
///
/// The initial config snapshot is likewise excluded: it carries dyncfg values that apply
/// The initial config snapshot is excluded: it carries dyncfg values that apply
/// globally and are kept current through `UpdateConfiguration`, so a difference across
/// reconnects is expected and does not require a restart.
pub fn compatible_with(&self, other: &InstanceConfig) -> bool {
Expand All @@ -315,16 +309,13 @@ impl InstanceConfig {
logging: self_logging,
expiration_offset: self_offset,
peek_stash_persist_location: self_peek_stash_persist_location,
// Captured at replica creation; intentionally not part of compatibility (see above).
arrangement_dictionary_compression: _,
// Globally-applied dyncfg snapshot, intentionally not part of compatibility (see above).
initial_config: _,
} = self;
let InstanceConfig {
logging: other_logging,
expiration_offset: other_offset,
peek_stash_persist_location: other_peek_stash_persist_location,
arrangement_dictionary_compression: _,
initial_config: _,
} = other;

Expand Down
7 changes: 6 additions & 1 deletion src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,16 @@ pub const COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK: Config<Duration> = Config
///
/// Disposition: added 2026-06-09, default off; solicit feedback for one month
/// and remove in the absence of a positive response.
///
/// Declared [`ParameterScope::Replica`] so the value can be overridden per-replica. The replica
/// captures the first value it observes from a configuration update and holds it fixed for its
/// lifetime. A later flip only affects replicas that start afterwards.
pub const ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA: Config<bool> = Config::new(
"enable_arrangement_dictionary_compression_alpha",
false,
"Enable arrangement dictionary compression (alpha; not yet production-ready).",
);
)
.scoped(ParameterScope::Replica);

/// Whether to enable the peek response stash, for sending back large peek
/// responses. The response stash will only be used for results that exceed
Expand Down
56 changes: 41 additions & 15 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ use mz_compute_client::protocol::response::{
};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::dyncfgs::{
ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA, ENABLE_PEEK_RESPONSE_STASH,
PEEK_RESPONSE_STASH_BATCH_MAX_RUNS, PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE,
PEEK_STASH_NUM_BATCHES,
};
use mz_compute_types::plan::render_plan::RenderPlan;
use mz_dyncfg::ConfigSet;
Expand Down Expand Up @@ -171,6 +172,15 @@ pub struct ComputeState {

/// The storage worker forwards its introspection logs to the compute worker.
pub storage_log_reader: Option<crate::server::StorageTimelyLogReader>,

/// Arrangement dictionary compression, captured from the first configuration update this
/// replica observes and then held fixed for its lifetime.
///
/// `None` until the first configuration update is applied. The captured value is mirrored into
/// the process-global `mz_row_spine::DICTIONARY_COMPRESSION`. Capturing it from configuration
/// updates, rather than from `CreateInstance`, keeps it out of the reconciliation-relevant
/// `InstanceConfig`, so flipping the flag cannot force a replica restart.
arrangement_dictionary_compression: Option<bool>,
}

impl ComputeState {
Expand Down Expand Up @@ -212,6 +222,7 @@ impl ComputeState {
init_system_time: mz_ore::now::SYSTEM_TIME(),
replica_expiration: Antichain::default(),
storage_log_reader,
arrangement_dictionary_compression: None,
}
}

Expand Down Expand Up @@ -309,9 +320,10 @@ impl ComputeState {
);

// NB: arrangement dictionary compression is deliberately NOT applied here. Unlike the
// settings above, it is captured once at replica creation (see `handle_create_instance`
// and `InstanceConfig::arrangement_dictionary_compression`) and held fixed, so that
// flipping the flag does not retroactively change arrangements on existing replicas.
// settings above, it is captured once from the first configuration update this replica
// sees (see `capture_dictionary_compression`, called from `handle_update_configuration`)
// and held fixed, so that flipping the flag does not retroactively change arrangements on
// existing replicas.

// Apply column-paged-batcher configuration. Routes through
// `apply_tiered_config`, which reuses a process-wide `TieredPolicy`
Expand Down Expand Up @@ -375,6 +387,23 @@ impl ComputeState {
}
}

/// Capture arrangement dictionary compression from the current `worker_config`, the first
/// time this is called. Subsequent calls are no-ops, so the value this replica first observes
/// from a configuration update is held fixed for its lifetime.
///
/// The captured value is mirrored into the process-global `mz_row_spine::DICTIONARY_COMPRESSION`.
/// `DICTIONARY_COMPRESSION` is process-global and a replica process hosts a single instance, so
/// this single store covers all of the replica's arrangements.
fn capture_dictionary_compression(&mut self) {
if self.arrangement_dictionary_compression.is_none() {
let enabled = ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA.get(&self.worker_config);
self.arrangement_dictionary_compression = Some(enabled);
mz_row_spine::DICTIONARY_COMPRESSION
.store(enabled, std::sync::atomic::Ordering::Relaxed);
info!(enabled, "captured arrangement dictionary compression");
}
}

/// Apply the provided replica expiration `offset` by converting it to a frontier relative to
/// the replica's initialization system time.
///
Expand Down Expand Up @@ -488,16 +517,6 @@ impl<'a> ActiveComputeState<'a> {
// Ensure the state is consistent with the config before we initialize anything.
self.compute_state.apply_worker_config();

// Apply dictionary compression exactly once, here at instance creation, from the value the
// controller captured when the replica was created. We deliberately do NOT re-apply it on
// `handle_update_configuration`, so flipping the flag does not retroactively change this
// replica's arrangements. `DICTIONARY_COMPRESSION` is process-global and a replica process
// hosts a single instance, so this single store covers all of the replica's arrangements.
mz_row_spine::DICTIONARY_COMPRESSION.store(
config.arrangement_dictionary_compression,
std::sync::atomic::Ordering::Relaxed,
);

if let Some(offset) = config.expiration_offset {
self.compute_state.apply_expiration_offset(offset);
}
Expand Down Expand Up @@ -539,6 +558,13 @@ impl<'a> ActiveComputeState<'a> {
// share the metrics.
mz_metrics::update_dyncfg(&dyncfg_updates);

// Capture arrangement dictionary compression from the first configuration update we see
Comment thread
antiguru marked this conversation as resolved.
// and hold it fixed for the replica's lifetime. Doing this here (rather than in
// `apply_worker_config`, which also runs at instance creation before any configuration
// update with only default values) ensures we latch the value the controller actually
// synced, not the default.
self.compute_state.capture_dictionary_compression();

self.compute_state.apply_worker_config();
}

Expand Down
13 changes: 6 additions & 7 deletions src/row-spine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,13 +744,12 @@ pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize
///
/// The dictionary behavior is controlled by the `DICTIONARY_COMPRESSION` flag, which if disabled
/// prevents the construction of codecs, which when absent simply cause the wrapper to behave as
/// a no-op that fails to use any spare tags for common values. The flag is set once, when a
/// replica is created (from compute's `InstanceConfig::arrangement_dictionary_compression`, itself
/// captured from the `enable_arrangement_dictionary_compression_alpha` dyncfg at that moment), and is
/// not changed for the life of the process; flipping the dyncfg only affects replicas created
/// afterwards. Even with the flag fixed, a single replica can hold a mix of compressed and
/// uncompressed containers — e.g. containers that never observed enough records to install a
/// codec, or that were merged from uncompressed inputs.
/// a no-op that fails to use any spare tags for common values. The flag is set once, when the
/// replica captures the `enable_arrangement_dictionary_compression_alpha` dyncfg from the first
/// configuration update it observes, and is not changed for the life of the process. Flipping the
/// dyncfg only affects replicas created afterwards. Even with the flag fixed, a single replica can
/// hold a mix of compressed and uncompressed containers, e.g. containers that never observed
/// enough records to install a codec, or that were merged from uncompressed inputs.
mod dictionary {

use differential_dataflow::trace::implementations::BatchContainer;
Expand Down
Loading