Skip to content

Commit c505fd7

Browse files
claudeantiguru
authored andcommitted
compute: capture arrangement_dictionary_compression on the replica
`arrangement_dictionary_compression` was passed via `CreateInstance`, baked into `InstanceConfig` from the controller's dyncfg at replica creation. After an environmentd restart the controller re-derives the value, so if the flag was flipped in between, the reconciled `CreateInstance` carried a different value than the running replica was created with -- a difference that risks an unnecessary replica restart. Move the value out of `CreateInstance` entirely. The replica now captures the first value it observes from a configuration update (in `handle_update_configuration`) and holds it fixed for its lifetime, mirroring it into the process-global `DICTIONARY_COMPRESSION`. The reduced command history always orders `UpdateConfiguration` before any `CreateDataflow`, so the value is latched before any arrangement is built. The flag is also declared `ParameterScope::Replica` so it can be overridden per-replica. Drops `arrangement_dictionary_compression` from `InstanceConfig` and `ReplicaConfig` (and the controller capture / specialize_command path), and the clusterd test driver now pushes it as a dyncfg instead. Fixes CLU-135. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01LBErWUhZR4sBuEfGStb5FP
1 parent d46a894 commit c505fd7

8 files changed

Lines changed: 79 additions & 56 deletions

File tree

src/clusterd-test-driver/src/driver.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use mz_compute_client::protocol::command::{
1616
};
1717
use mz_compute_client::protocol::response::{FrontiersResponse, PeekResponse};
1818
use mz_compute_types::dataflows::DataflowDescription;
19-
use mz_compute_types::dyncfgs::ENABLE_PEEK_RESPONSE_STASH;
19+
use mz_compute_types::dyncfgs::{
20+
ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA, ENABLE_PEEK_RESPONSE_STASH,
21+
};
2022
use mz_compute_types::plan::render_plan::RenderPlan;
2123
use mz_dyncfg::ConfigUpdates;
2224
use mz_expr::{MapFilterProject, RowSetFinishing};
@@ -72,11 +74,13 @@ impl Driver {
7274
/// Sends `CreateInstance`, opening the compute instance (and the reconciliation
7375
/// window).
7476
///
75-
/// `expiration_offset` and `arrangement_dictionary_compression` are the
76-
/// caller-settable [`InstanceConfig`] knobs; `logging` is left at its default
77-
/// (introspection logging off — enabling it safely needs `index_logs` wiring)
78-
/// and `peek_stash_persist_location` is necessarily the host's, since the driver
79-
/// hosts persist.
77+
/// `expiration_offset` is a caller-settable [`InstanceConfig`] knob; `logging` is left at its
78+
/// default (introspection logging off — enabling it safely needs `index_logs` wiring) and
79+
/// `peek_stash_persist_location` is necessarily the host's, since the driver hosts persist.
80+
///
81+
/// `arrangement_dictionary_compression` is no longer carried in `CreateInstance`; the replica
82+
/// captures it from the first configuration update it sees, so we push it as the
83+
/// `enable_arrangement_dictionary_compression_alpha` dyncfg in the `UpdateConfiguration` below.
8084
///
8185
/// This also force-disables `ENABLE_PEEK_RESPONSE_STASH`: the driver reads peek
8286
/// results inline, so a stashed peek would break [`Self::peek`]/`count`. It is
@@ -91,10 +95,13 @@ impl Driver {
9195
logging: Default::default(),
9296
expiration_offset,
9397
peek_stash_persist_location: self.host.location().clone(),
94-
arrangement_dictionary_compression,
9598
})))?;
9699
let mut dyncfg_updates = ConfigUpdates::default();
97100
dyncfg_updates.add(&ENABLE_PEEK_RESPONSE_STASH, false);
101+
dyncfg_updates.add(
102+
&ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA,
103+
arrangement_dictionary_compression,
104+
);
98105
self.send(ComputeCommand::UpdateConfiguration(Box::new(
99106
ComputeParameters {
100107
dyncfg_updates,

src/compute-client/src/controller.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ use mz_cluster_client::{ReplicaId, WallclockLagFn};
4343
use mz_compute_types::ComputeInstanceId;
4444
use mz_compute_types::config::ComputeReplicaConfig;
4545
use mz_compute_types::dataflows::DataflowDescription;
46-
use mz_compute_types::dyncfgs::{
47-
COMPUTE_REPLICA_EXPIRATION_OFFSET, ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA,
48-
};
46+
use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
4947
use mz_dyncfg::{ConfigSet, ConfigUpdates};
5048
use mz_expr::RowSetFinishing;
5149
use mz_expr::row::RowCollection;
@@ -715,12 +713,10 @@ impl ComputeController {
715713

716714
let expiration_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.dyncfg);
717715

718-
// Capture dictionary compression once, at replica creation, and hold it fixed for the
719-
// replica's lifetime (see `InstanceConfig::arrangement_dictionary_compression`). This is
720-
// why a later flip of the flag only affects replicas created afterwards.
721-
let arrangement_dictionary_compression =
722-
ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA.get(&self.dyncfg);
723-
716+
// Note: arrangement dictionary compression is no longer captured here and is not part of
717+
// `CreateInstance`. The replica captures the first value it sees from a configuration
718+
// update instead (see `enable_arrangement_dictionary_compression_alpha`), which avoids
719+
// forcing a replica restart when the flag is flipped across an `environmentd` restart.
724720
let replica_config = ReplicaConfig {
725721
location,
726722
logging: LoggingConfig {
@@ -731,7 +727,6 @@ impl ComputeController {
731727
},
732728
grpc_client: self.config.grpc_client.clone(),
733729
expiration_offset: (!expiration_offset.is_zero()).then_some(expiration_offset),
734-
arrangement_dictionary_compression,
735730
};
736731

737732
let instance = self.instance_mut(instance_id).expect("validated");

src/compute-client/src/controller/instance.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -992,7 +992,6 @@ impl Instance {
992992
// `ReplicaTask::specialize_command`.
993993
logging: Default::default(),
994994
expiration_offset: Default::default(),
995-
arrangement_dictionary_compression: Default::default(),
996995
};
997996

998997
self.send(ComputeCommand::CreateInstance(Box::new(instance_config)));

src/compute-client/src/controller/replica.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ pub(super) struct ReplicaConfig {
4949
pub grpc_client: GrpcClientParameters,
5050
/// The offset to use for replica expiration, if any.
5151
pub expiration_offset: Option<Duration>,
52-
/// Whether arrangements on this replica use dictionary compression, captured at creation.
53-
pub arrangement_dictionary_compression: bool,
5452
}
5553

5654
/// A client for a replica task.
@@ -284,8 +282,6 @@ impl ReplicaTask {
284282
if ENABLE_COMPUTE_REPLICA_EXPIRATION.get(&self.dyncfg) {
285283
config.expiration_offset = self.config.expiration_offset;
286284
}
287-
config.arrangement_dictionary_compression =
288-
self.config.arrangement_dictionary_compression;
289285
}
290286
_ => {}
291287
}

src/compute-client/src/protocol/command.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -275,12 +275,6 @@ pub struct InstanceConfig {
275275
pub expiration_offset: Option<Duration>,
276276
/// The persist location where we can stash large peek results.
277277
pub peek_stash_persist_location: PersistLocation,
278-
/// Whether arrangements created by this replica use dictionary compression.
279-
///
280-
/// Captured from `enable_arrangement_dictionary_compression_alpha` when the replica is created and
281-
/// held fixed for the replica's lifetime, so flipping the flag only affects replicas created
282-
/// afterwards rather than retroactively changing arrangements across the environment.
283-
pub arrangement_dictionary_compression: bool,
284278
}
285279

286280
impl InstanceConfig {
@@ -292,23 +286,20 @@ impl InstanceConfig {
292286
/// forcing replica restarts. However, it also means that replicas will only pick up the new
293287
/// value after a restart.
294288
///
295-
/// Dictionary compression is intentionally excluded from this check: it is captured at replica
296-
/// creation, so a change is always compatible and a running replica keeps the value it was
297-
/// created with (a new value is only picked up by replicas created afterwards).
289+
/// Note that arrangement dictionary compression is no longer part of `InstanceConfig`: the
290+
/// replica captures it from the first configuration update it sees, so a flip of the flag can
291+
/// never make an otherwise-reconcilable `CreateInstance` incompatible and force a restart.
298292
pub fn compatible_with(&self, other: &InstanceConfig) -> bool {
299293
// Destructure to protect against adding fields in the future.
300294
let InstanceConfig {
301295
logging: self_logging,
302296
expiration_offset: self_offset,
303297
peek_stash_persist_location: self_peek_stash_persist_location,
304-
// Captured at replica creation; intentionally not part of compatibility (see above).
305-
arrangement_dictionary_compression: _,
306298
} = self;
307299
let InstanceConfig {
308300
logging: other_logging,
309301
expiration_offset: other_offset,
310302
peek_stash_persist_location: other_peek_stash_persist_location,
311-
arrangement_dictionary_compression: _,
312303
} = other;
313304

314305
// Logging is compatible if exactly the same.

src/compute-types/src/dyncfgs.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,11 +393,17 @@ pub const COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK: Config<Duration> = Config
393393
///
394394
/// Disposition: added 2026-06-09, default off; solicit feedback for one month
395395
/// and remove in the absence of a positive response.
396+
///
397+
/// Declared [`ParameterScope::Replica`] so the value can be overridden per-replica. The replica
398+
/// captures the first value it observes from a configuration update and holds it fixed for its
399+
/// lifetime (the value is no longer carried in `CreateInstance`); a later flip only affects
400+
/// replicas that start afterwards.
396401
pub const ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA: Config<bool> = Config::new(
397402
"enable_arrangement_dictionary_compression_alpha",
398403
false,
399404
"Enable arrangement dictionary compression (alpha; not yet production-ready).",
400-
);
405+
)
406+
.scoped(ParameterScope::Replica);
401407

402408
/// Whether to enable the peek response stash, for sending back large peek
403409
/// responses. The response stash will only be used for results that exceed

src/compute/src/compute_state.rs

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ use mz_compute_client::protocol::response::{
2929
};
3030
use mz_compute_types::dataflows::DataflowDescription;
3131
use mz_compute_types::dyncfgs::{
32-
ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
33-
PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
32+
ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA, ENABLE_PEEK_RESPONSE_STASH,
33+
PEEK_RESPONSE_STASH_BATCH_MAX_RUNS, PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE,
34+
PEEK_STASH_NUM_BATCHES,
3435
};
3536
use mz_compute_types::plan::render_plan::RenderPlan;
3637
use mz_dyncfg::ConfigSet;
@@ -171,6 +172,15 @@ pub struct ComputeState {
171172

172173
/// The storage worker forwards its introspection logs to the compute worker.
173174
pub storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
175+
176+
/// Arrangement dictionary compression, captured from the first configuration update this
177+
/// replica observes and then held fixed for its lifetime.
178+
///
179+
/// `None` until the first configuration update is applied. The captured value is mirrored into
180+
/// the process-global `mz_row_spine::DICTIONARY_COMPRESSION`. We deliberately capture it from
181+
/// configuration updates rather than from `CreateInstance` so that flipping the flag across an
182+
/// `environmentd` restart does not force a replica restart (see CLU-135).
183+
arrangement_dictionary_compression: Option<bool>,
174184
}
175185

176186
impl ComputeState {
@@ -212,6 +222,7 @@ impl ComputeState {
212222
init_system_time: mz_ore::now::SYSTEM_TIME(),
213223
replica_expiration: Antichain::default(),
214224
storage_log_reader,
225+
arrangement_dictionary_compression: None,
215226
}
216227
}
217228

@@ -309,9 +320,10 @@ impl ComputeState {
309320
);
310321

311322
// NB: arrangement dictionary compression is deliberately NOT applied here. Unlike the
312-
// settings above, it is captured once at replica creation (see `handle_create_instance`
313-
// and `InstanceConfig::arrangement_dictionary_compression`) and held fixed, so that
314-
// flipping the flag does not retroactively change arrangements on existing replicas.
323+
// settings above, it is captured once from the first configuration update this replica
324+
// sees (see `capture_dictionary_compression`, called from `handle_update_configuration`)
325+
// and held fixed, so that flipping the flag does not retroactively change arrangements on
326+
// existing replicas.
315327

316328
// Apply column-paged-batcher configuration. Routes through
317329
// `apply_tiered_config`, which reuses a process-wide `TieredPolicy`
@@ -375,6 +387,23 @@ impl ComputeState {
375387
}
376388
}
377389

390+
/// Capture arrangement dictionary compression from the current `worker_config`, the first
391+
/// time this is called. Subsequent calls are no-ops, so the value this replica first observes
392+
/// from a configuration update is held fixed for its lifetime.
393+
///
394+
/// The captured value is mirrored into the process-global `mz_row_spine::DICTIONARY_COMPRESSION`.
395+
/// `DICTIONARY_COMPRESSION` is process-global and a replica process hosts a single instance, so
396+
/// this single store covers all of the replica's arrangements.
397+
fn capture_dictionary_compression(&mut self) {
398+
if self.arrangement_dictionary_compression.is_none() {
399+
let enabled = ENABLE_ARRANGEMENT_DICTIONARY_COMPRESSION_ALPHA.get(&self.worker_config);
400+
self.arrangement_dictionary_compression = Some(enabled);
401+
mz_row_spine::DICTIONARY_COMPRESSION
402+
.store(enabled, std::sync::atomic::Ordering::Relaxed);
403+
info!(enabled, "captured arrangement dictionary compression");
404+
}
405+
}
406+
378407
/// Apply the provided replica expiration `offset` by converting it to a frontier relative to
379408
/// the replica's initialization system time.
380409
///
@@ -480,15 +509,9 @@ impl<'a> ActiveComputeState<'a> {
480509
// Ensure the state is consistent with the config before we initialize anything.
481510
self.compute_state.apply_worker_config();
482511

483-
// Apply dictionary compression exactly once, here at instance creation, from the value the
484-
// controller captured when the replica was created. We deliberately do NOT re-apply it on
485-
// `handle_update_configuration`, so flipping the flag does not retroactively change this
486-
// replica's arrangements. `DICTIONARY_COMPRESSION` is process-global and a replica process
487-
// hosts a single instance, so this single store covers all of the replica's arrangements.
488-
mz_row_spine::DICTIONARY_COMPRESSION.store(
489-
config.arrangement_dictionary_compression,
490-
std::sync::atomic::Ordering::Relaxed,
491-
);
512+
// Arrangement dictionary compression is captured from the first configuration update this
513+
// replica sees (in `handle_update_configuration`), not from `CreateInstance`, so flipping
514+
// the flag across an `environmentd` restart cannot force a replica restart (see CLU-135).
492515

493516
if let Some(offset) = config.expiration_offset {
494517
self.compute_state.apply_expiration_offset(offset);
@@ -531,6 +554,13 @@ impl<'a> ActiveComputeState<'a> {
531554
// share the metrics.
532555
mz_metrics::update_dyncfg(&dyncfg_updates);
533556

557+
// Capture arrangement dictionary compression from the first configuration update we see
558+
// and hold it fixed for the replica's lifetime. Doing this here (rather than in
559+
// `apply_worker_config`, which also runs at instance creation before any configuration
560+
// update with only default values) ensures we latch the value the controller actually
561+
// synced, not the default.
562+
self.compute_state.capture_dictionary_compression();
563+
534564
self.compute_state.apply_worker_config();
535565
}
536566

src/row-spine/src/lib.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -744,13 +744,12 @@ pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize
744744
///
745745
/// The dictionary behavior is controlled by the `DICTIONARY_COMPRESSION` flag, which if disabled
746746
/// prevents the construction of codecs, which when absent simply cause the wrapper to behave as
747-
/// a no-op that fails to use any spare tags for common values. The flag is set once, when a
748-
/// replica is created (from compute's `InstanceConfig::arrangement_dictionary_compression`, itself
749-
/// captured from the `enable_arrangement_dictionary_compression_alpha` dyncfg at that moment), and is
750-
/// not changed for the life of the process; flipping the dyncfg only affects replicas created
751-
/// afterwards. Even with the flag fixed, a single replica can hold a mix of compressed and
752-
/// uncompressed containers — e.g. containers that never observed enough records to install a
753-
/// codec, or that were merged from uncompressed inputs.
747+
/// a no-op that fails to use any spare tags for common values. The flag is set once, when the
748+
/// replica captures the `enable_arrangement_dictionary_compression_alpha` dyncfg from the first
749+
/// configuration update it observes, and is not changed for the life of the process; flipping the
750+
/// dyncfg only affects replicas created afterwards. Even with the flag fixed, a single replica can
751+
/// hold a mix of compressed and uncompressed containers — e.g. containers that never observed
752+
/// enough records to install a codec, or that were merged from uncompressed inputs.
754753
mod dictionary {
755754

756755
use differential_dataflow::trace::implementations::BatchContainer;

0 commit comments

Comments
 (0)