Skip to content

Commit 6b71709

Browse files
claudeantiguru
authored andcommitted
compute: capture arrangement_dictionary_compression on the replica
`arrangement_dictionary_compression` was passed via `CreateInstance`, baked into `InstanceConfig` from the controller's dyncfg at replica creation. After an environmentd restart the controller re-derives the value, so if the flag was flipped in between, the reconciled `CreateInstance` carried a different value than the running replica was created with -- a difference that risks an unnecessary replica restart. Move the value out of `CreateInstance` entirely. The replica now captures the first value it observes from a configuration update (in `handle_update_configuration`) and holds it fixed for its lifetime, mirroring it into the process-global `DICTIONARY_COMPRESSION`. The reduced command history always orders `UpdateConfiguration` before any `CreateDataflow`, so the value is latched before any arrangement is built. The flag is also declared `ParameterScope::Replica` so it can be overridden per-replica. Drops `arrangement_dictionary_compression` from `InstanceConfig` and `ReplicaConfig` (and the controller capture / specialize_command path), and the clusterd test driver now pushes it as a dyncfg instead. Fixes CLU-135. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01LBErWUhZR4sBuEfGStb5FP
1 parent 141cb2a commit 6b71709

8 files changed

Lines changed: 74 additions & 61 deletions

File tree

src/clusterd-test-driver/src/driver.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use mz_compute_client::protocol::command::{
1616
};
1717
use mz_compute_client::protocol::response::{FrontiersResponse, PeekResponse};
1818
use mz_compute_types::dataflows::DataflowDescription;
19-
use mz_compute_types::dyncfgs::ENABLE_PEEK_RESPONSE_STASH;
19+
use mz_compute_types::dyncfgs::{
20+
ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA, ENABLE_PEEK_RESPONSE_STASH,
21+
};
2022
use mz_compute_types::plan::render_plan::RenderPlan;
2123
use mz_dyncfg::ConfigUpdates;
2224
use mz_expr::{MapFilterProject, RowSetFinishing};
@@ -72,11 +74,13 @@ impl Driver {
7274
/// Sends `CreateInstance`, opening the compute instance (and the reconciliation
7375
/// window).
7476
///
75-
/// `expiration_offset`, `arrangement_dictionary_compression`, and `initial_config` are the
76-
/// caller-settable [`InstanceConfig`] knobs; `logging` is left at its default
77-
/// (introspection logging off — enabling it safely needs `index_logs` wiring)
78-
/// and `peek_stash_persist_location` is necessarily the host's, since the driver
79-
/// hosts persist.
77+
/// `expiration_offset` is a caller-settable [`InstanceConfig`] knob. `logging` is left at its
78+
/// default (introspection logging off, as enabling it safely needs `index_logs` wiring) and
79+
/// `peek_stash_persist_location` is necessarily the host's, since the driver hosts persist.
80+
///
81+
/// `arrangement_dictionary_compression` is pushed as the
82+
/// `enable_arrangement_dictionary_compression_alpha` dyncfg in the `UpdateConfiguration` below,
83+
/// which is where the replica captures it.
8084
///
8185
/// `initial_config` is the create-time configuration snapshot the controller would build from
8286
/// its synced dyncfg. The replica applies it before create-time setup, so a script can assert
@@ -95,11 +99,14 @@ impl Driver {
9599
logging: Default::default(),
96100
expiration_offset,
97101
peek_stash_persist_location: self.host.location().clone(),
98-
arrangement_dictionary_compression,
99102
initial_config,
100103
})))?;
101104
let mut dyncfg_updates = ConfigUpdates::default();
102105
dyncfg_updates.add(&ENABLE_PEEK_RESPONSE_STASH, false);
106+
dyncfg_updates.add(
107+
&ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA,
108+
arrangement_dictionary_compression,
109+
);
103110
self.send(ComputeCommand::UpdateConfiguration(Box::new(
104111
ComputeParameters {
105112
dyncfg_updates,

src/compute-client/src/controller.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ use mz_cluster_client::{ReplicaId, WallclockLagFn};
4343
use mz_compute_types::ComputeInstanceId;
4444
use mz_compute_types::config::ComputeReplicaConfig;
4545
use mz_compute_types::dataflows::DataflowDescription;
46-
use mz_compute_types::dyncfgs::{
47-
COMPUTE_REPLICA_EXPIRATION_OFFSET, ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA,
48-
};
46+
use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
4947
use mz_dyncfg::{ConfigSet, ConfigUpdates};
5048
use mz_expr::RowSetFinishing;
5149
use mz_expr::row::RowCollection;
@@ -720,12 +718,6 @@ impl ComputeController {
720718

721719
let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
722720

723-
// Capture dictionary compression once, at replica creation, and hold it fixed for the
724-
// replica's lifetime (see `InstanceConfig::arrangement_dictionary_compression`). This is
725-
// why a later flip of the flag only affects replicas created afterwards.
726-
let arrangement_dictionary_compression =
727-
ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA.get(&self.dyncfg);
728-
729721
let replica_config = ReplicaConfig {
730722
location,
731723
logging: LoggingConfig {
@@ -736,7 +728,6 @@ impl ComputeController {
736728
},
737729
grpc_client: self.config.grpc_client.clone(),
738730
expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
739-
arrangement_dictionary_compression,
740731
};
741732

742733
let instance = self.instance_mut(instance_id).expect("validated");

src/compute-client/src/controller/instance.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -989,11 +989,10 @@ impl Instance {
989989
let instance_config = InstanceConfig {
990990
peek_stash_persist_location: self.peek_stash_persist_location.clone(),
991991
// The remaining fields are replica-specific and will be set in
992-
// `ReplicaTask::specialize_command` (logging, expiration, dictionary compression) and
992+
// `ReplicaTask::specialize_command` (logging, expiration) and
993993
// `Instance::specialize_command_for_replica` (the initial config snapshot).
994994
logging: Default::default(),
995995
expiration_offset: Default::default(),
996-
arrangement_dictionary_compression: Default::default(),
997996
initial_config: Default::default(),
998997
};
999998

@@ -3428,7 +3427,6 @@ mod tests {
34283427
logging: Default::default(),
34293428
expiration_offset: None,
34303429
peek_stash_persist_location: PersistLocation::new_in_mem(),
3431-
arrangement_dictionary_compression: false,
34323430
initial_config: Default::default(),
34333431
}))
34343432
}

src/compute-client/src/controller/replica.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ pub(super) struct ReplicaConfig {
4949
pub grpc_client: GrpcClientParameters,
5050
/// The offset to use for replica expiration, if any.
5151
pub expiration_offset: Option<Duration>,
52-
/// Whether arrangements on this replica use dictionary compression, captured at creation.
53-
pub arrangement_dictionary_compression: bool,
5452
}
5553

5654
/// A client for a replica task.
@@ -284,8 +282,6 @@ impl ReplicaTask {
284282
if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
285283
config.expiration_offset = self.config.expiration_offset;
286284
}
287-
config.arrangement_dictionary_compression =
288-
self.config.arrangement_dictionary_compression;
289285
}
290286
_ => {}
291287
}

src/compute-client/src/protocol/command.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,10 @@ pub enum ComputeCommand {
267267
/// Configuration for a replica, passed with the `CreateInstance`. Replicas should halt
268268
/// if the controller attempt to reconcile them with different values
269269
/// for anything in this struct.
270+
///
271+
/// Keep this minimal: only configuration the replica needs in order to start belongs here.
272+
/// Anything that can be applied through a configuration update should be a dyncfg instead, so a
273+
/// change to it does not make an otherwise-reconcilable `CreateInstance` incompatible.
270274
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271275
pub struct InstanceConfig {
272276
/// Specification of introspection logging.
@@ -275,12 +279,6 @@ pub struct InstanceConfig {
275279
pub expiration_offset: Option<Duration>,
276280
/// The persist location where we can stash large peek results.
277281
pub peek_stash_persist_location: PersistLocation,
278-
/// Whether arrangements created by this replica use dictionary compression.
279-
///
280-
/// Captured from `enable_arrangement_dictionary_compression_alpha` when the replica is created and
281-
/// held fixed for the replica's lifetime, so flipping the flag only affects replicas created
282-
/// afterwards rather than retroactively changing arrangements across the environment.
283-
pub arrangement_dictionary_compression: bool,
284282
/// A snapshot of the controller's dynamic configuration at replica creation, including any
285283
/// replica-scoped overrides.
286284
///
@@ -302,11 +300,7 @@ impl InstanceConfig {
302300
/// forcing replica restarts. However, it also means that replicas will only pick up the new
303301
/// value after a restart.
304302
///
305-
/// Dictionary compression is intentionally excluded from this check: it is captured at replica
306-
/// creation, so a change is always compatible and a running replica keeps the value it was
307-
/// created with (a new value is only picked up by replicas created afterwards).
308-
///
309-
/// The initial config snapshot is likewise excluded: it carries dyncfg values that apply
303+
/// The initial config snapshot is excluded: it carries dyncfg values that apply
310304
/// globally and are kept current through `UpdateConfiguration`, so a difference across
311305
/// reconnects is expected and does not require a restart.
312306
pub fn compatible_with(&self, other: &InstanceConfig) -> bool {
@@ -315,16 +309,13 @@ impl InstanceConfig {
315309
logging: self_logging,
316310
expiration_offset: self_offset,
317311
peek_stash_persist_location: self_peek_stash_persist_location,
318-
// Captured at replica creation; intentionally not part of compatibility (see above).
319-
arrangement_dictionary_compression: _,
320312
// Globally-applied dyncfg snapshot, intentionally not part of compatibility (see above).
321313
initial_config: _,
322314
} = self;
323315
let InstanceConfig {
324316
logging: other_logging,
325317
expiration_offset: other_offset,
326318
peek_stash_persist_location: other_peek_stash_persist_location,
327-
arrangement_dictionary_compression: _,
328319
initial_config: _,
329320
} = other;
330321

src/compute-types/src/dyncfgs.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,11 +393,16 @@ pub const COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK: Config<Duration> = Config
393393
///
394394
/// Disposition: added 2026-06-09, default off; solicit feedback for one month
395395
/// and remove in the absence of a positive response.
396+
///
397+
/// Declared [`ParameterScope::Replica`] so the value can be overridden per-replica. The replica
398+
/// captures the first value it observes from a configuration update and holds it fixed for its
399+
/// lifetime. A later flip only affects replicas that start afterwards.
396400
pub const ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA: Config<bool> = Config::new(
397401
"enable_arrangement_dictionary_compression_alpha",
398402
false,
399403
"Enable arrangement dictionary compression (alpha; not yet production-ready).",
400-
);
404+
)
405+
.scoped(ParameterScope::Replica);
401406

402407
/// Whether to enable the peek response stash, for sending back large peek
403408
/// responses. The response stash will only be used for results that exceed

src/compute/src/compute_state.rs

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ use mz_compute_client::protocol::response::{
2929
};
3030
use mz_compute_types::dataflows::DataflowDescription;
3131
use mz_compute_types::dyncfgs::{
32-
ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
33-
PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
32+
ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA, ENABLE_PEEK_RESPONSE_STASH,
33+
PEEK_RESPONSE_STASH_BATCH_MAX_RUNS, PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE,
34+
PEEK_STASH_NUM_BATCHES,
3435
};
3536
use mz_compute_types::plan::render_plan::RenderPlan;
3637
use mz_dyncfg::ConfigSet;
@@ -171,6 +172,15 @@ pub struct ComputeState {
171172

172173
/// The storage worker forwards its introspection logs to the compute worker.
173174
pub storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
175+
176+
/// Arrangement dictionary compression, captured from the first configuration update this
177+
/// replica observes and then held fixed for its lifetime.
178+
///
179+
/// `None` until the first configuration update is applied. The captured value is mirrored into
180+
/// the process-global `mz_row_spine::DICTIONARY_COMPRESSION`. Capturing it from configuration
181+
/// updates, rather than from `CreateInstance`, keeps it out of the reconciliation-relevant
182+
/// `InstanceConfig`, so flipping the flag cannot force a replica restart.
183+
arrangement_dictionary_compression: Option<bool>,
174184
}
175185

176186
impl ComputeState {
@@ -212,6 +222,7 @@ impl ComputeState {
212222
init_system_time: mz_ore::now::SYSTEM_TIME(),
213223
replica_expiration: Antichain::default(),
214224
storage_log_reader,
225+
arrangement_dictionary_compression: None,
215226
}
216227
}
217228

@@ -309,9 +320,10 @@ impl ComputeState {
309320
);
310321

311322
// NB: arrangement dictionary compression is deliberately NOT applied here. Unlike the
312-
// settings above, it is captured once at replica creation (see `handle_create_instance`
313-
// and `InstanceConfig::arrangement_dictionary_compression`) and held fixed, so that
314-
// flipping the flag does not retroactively change arrangements on existing replicas.
323+
// settings above, it is captured once from the first configuration update this replica
324+
// sees (see `capture_dictionary_compression`, called from `handle_update_configuration`)
325+
// and held fixed, so that flipping the flag does not retroactively change arrangements on
326+
// existing replicas.
315327

316328
// Apply column-paged-batcher configuration. Routes through
317329
// `apply_tiered_config`, which reuses a process-wide `TieredPolicy`
@@ -375,6 +387,23 @@ impl ComputeState {
375387
}
376388
}
377389

390+
/// Capture arrangement dictionary compression from the current `worker_config`, the first
391+
/// time this is called. Subsequent calls are no-ops, so the value this replica first observes
392+
/// from a configuration update is held fixed for its lifetime.
393+
///
394+
/// The captured value is mirrored into the process-global `mz_row_spine::DICTIONARY_COMPRESSION`.
395+
/// `DICTIONARY_COMPRESSION` is process-global and a replica process hosts a single instance, so
396+
/// this single store covers all of the replica's arrangements.
397+
fn capture_dictionary_compression(&mut self) {
398+
if self.arrangement_dictionary_compression.is_none() {
399+
let enabled = ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA.get(&self.worker_config);
400+
self.arrangement_dictionary_compression = Some(enabled);
401+
mz_row_spine::DICTIONARY_COMPRESSION
402+
.store(enabled, std::sync::atomic::Ordering::Relaxed);
403+
info!(enabled, "captured arrangement dictionary compression");
404+
}
405+
}
406+
378407
/// Apply the provided replica expiration `offset` by converting it to a frontier relative to
379408
/// the replica's initialization system time.
380409
///
@@ -488,16 +517,6 @@ impl<'a> ActiveComputeState<'a> {
488517
// Ensure the state is consistent with the config before we initialize anything.
489518
self.compute_state.apply_worker_config();
490519

491-
// Apply dictionary compression exactly once, here at instance creation, from the value the
492-
// controller captured when the replica was created. We deliberately do NOT re-apply it on
493-
// `handle_update_configuration`, so flipping the flag does not retroactively change this
494-
// replica's arrangements. `DICTIONARY_COMPRESSION` is process-global and a replica process
495-
// hosts a single instance, so this single store covers all of the replica's arrangements.
496-
mz_row_spine::DICTIONARY_COMPRESSION.store(
497-
config.arrangement_dictionary_compression,
498-
std::sync::atomic::Ordering::Relaxed,
499-
);
500-
501520
if let Some(offset) = config.expiration_offset {
502521
self.compute_state.apply_expiration_offset(offset);
503522
}
@@ -539,6 +558,13 @@ impl<'a> ActiveComputeState<'a> {
539558
// share the metrics.
540559
mz_metrics::update_dyncfg(&dyncfg_updates);
541560

561+
// Capture arrangement dictionary compression from the first configuration update we see
562+
// and hold it fixed for the replica's lifetime. Doing this here (rather than in
563+
// `apply_worker_config`, which also runs at instance creation before any configuration
564+
// update with only default values) ensures we latch the value the controller actually
565+
// synced, not the default.
566+
self.compute_state.capture_dictionary_compression();
567+
542568
self.compute_state.apply_worker_config();
543569
}
544570

src/row-spine/src/lib.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -744,13 +744,12 @@ pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize
744744
///
745745
/// The dictionary behavior is controlled by the `DICTIONARY_COMPRESSION` flag, which if disabled
746746
/// prevents the construction of codecs, which when absent simply cause the wrapper to behave as
747-
/// a no-op that fails to use any spare tags for common values. The flag is set once, when a
748-
/// replica is created (from compute's `InstanceConfig::arrangement_dictionary_compression`, itself
749-
/// captured from the `enable_arrangement_dictionary_compression_alpha` dyncfg at that moment), and is
750-
/// not changed for the life of the process; flipping the dyncfg only affects replicas created
751-
/// afterwards. Even with the flag fixed, a single replica can hold a mix of compressed and
752-
/// uncompressed containers — e.g. containers that never observed enough records to install a
753-
/// codec, or that were merged from uncompressed inputs.
747+
/// a no-op that fails to use any spare tags for common values. The flag is set once, when the
748+
/// replica captures the `enable_arrangement_dictionary_compression_alpha` dyncfg from the first
749+
/// configuration update it observes, and is not changed for the life of the process. Flipping the
750+
/// dyncfg only affects replicas created afterwards. Even with the flag fixed, a single replica can
751+
/// hold a mix of compressed and uncompressed containers, e.g. containers that never observed
752+
/// enough records to install a codec, or that were merged from uncompressed inputs.
754753
mod dictionary {
755754

756755
use differential_dataflow::trace::implementations::BatchContainer;

0 commit comments

Comments
 (0)