@@ -19,6 +19,7 @@ use mz_catalog::memory::objects::ClusterReplicaProcessStatus;
1919use mz_controller:: ControllerResponse ;
2020use mz_controller:: clusters:: { ClusterEvent , ClusterStatus } ;
2121use mz_ore:: instrument;
22+ use mz_ore:: cast:: CastFrom ;
2223use mz_ore:: now:: EpochMillis ;
2324use mz_ore:: option:: OptionExt ;
2425use mz_ore:: tracing:: OpenTelemetryContext ;
@@ -130,6 +131,17 @@ impl Coordinator {
130131 Message :: StorageUsagePrune ( expired) => {
131132 self . storage_usage_prune ( expired) . boxed_local ( ) . await ;
132133 }
134+ Message :: ArrangementSizesSchedule => {
135+ self . schedule_arrangement_sizes_collection ( )
136+ . boxed_local ( )
137+ . await ;
138+ }
139+ Message :: ArrangementSizesSnapshot => {
140+ self . arrangement_sizes_snapshot ( ) . boxed_local ( ) . await ;
141+ }
142+ Message :: ArrangementSizesPrune ( expired) => {
143+ self . arrangement_sizes_prune ( expired) . boxed_local ( ) . await ;
144+ }
133145 Message :: RetireExecute {
134146 otel_ctx,
135147 data,
@@ -340,6 +352,201 @@ impl Coordinator {
340352 } ) ;
341353 }
342354
355+ /// Schedules the next per-object arrangement sizes snapshot.
356+ ///
357+ /// Re-reads the interval dyncfg on every call so operators can retune
358+ /// cadence without a restart. Aligns to an `organization_id`-seeded offset
359+ /// within the interval so collections stay consistent across restarts and
360+ /// don't synchronize across environments.
361+ pub async fn schedule_arrangement_sizes_collection ( & self ) {
362+ let interval_duration = mz_adapter_types:: dyncfgs:: ARRANGEMENT_SIZE_COLLECTION_INTERVAL
363+ . get ( self . catalog ( ) . system_config ( ) . dyncfgs ( ) ) ;
364+
365+ const SEED_LEN : usize = 32 ;
366+ let mut seed = [ 0 ; SEED_LEN ] ;
367+ for ( i, byte) in self
368+ . catalog ( )
369+ . state ( )
370+ . config ( )
371+ . environment_id
372+ . organization_id ( )
373+ . as_bytes ( )
374+ . into_iter ( )
375+ . take ( SEED_LEN )
376+ . enumerate ( )
377+ {
378+ seed[ i] = * byte;
379+ }
380+ let interval_ms: EpochMillis = EpochMillis :: try_from ( interval_duration. as_millis ( ) )
381+ . expect ( "arrangement_size_collection_interval must fit into u64" ) ;
382+ // `rand::random_range` panics on an empty range.
383+ let interval_ms = interval_ms. max ( 1 ) ;
384+ let offset = rngs:: SmallRng :: from_seed ( seed) . random_range ( 0 ..interval_ms) ;
385+ let now_ts: EpochMillis = self . peek_local_write_ts ( ) . await . into ( ) ;
386+
387+ let previous_collection_ts = ( now_ts - ( now_ts % interval_ms) ) + offset;
388+ let next_collection_ts = if previous_collection_ts > now_ts {
389+ previous_collection_ts
390+ } else {
391+ previous_collection_ts + interval_ms
392+ } ;
393+ let sleep_for = Duration :: from_millis ( next_collection_ts - now_ts) ;
394+
395+ let internal_cmd_tx = self . internal_cmd_tx . clone ( ) ;
396+ task:: spawn ( || "arrangement_sizes_collection" , async move {
397+ tokio:: time:: sleep ( sleep_for) . await ;
398+ // Best-effort: if the coordinator is shutting down, just drop.
399+ let _ = internal_cmd_tx. send ( Message :: ArrangementSizesSnapshot ) ;
400+ } ) ;
401+ }
402+
403+ /// Snapshots the current contents of `mz_object_arrangement_sizes` and
404+ /// appends them to `mz_object_arrangement_size_history`, tagged with a
405+ /// shared `collection_timestamp`. Reschedules on completion.
406+ #[ mz_ore:: instrument( level = "debug" ) ]
407+ async fn arrangement_sizes_snapshot ( & mut self ) {
408+ // The catalog server is not writable in read-only mode.
409+ if self . controller . read_only ( ) {
410+ self . schedule_arrangement_sizes_collection ( ) . await ;
411+ return ;
412+ }
413+
414+ let collection_timer = self
415+ . metrics
416+ . arrangement_sizes_collection_time_seconds
417+ . start_timer ( ) ;
418+
419+ let live_item_id = self
420+ . catalog ( )
421+ . resolve_builtin_storage_collection (
422+ & mz_catalog:: builtin:: MZ_OBJECT_ARRANGEMENT_SIZES_UNIFIED ,
423+ ) ;
424+ let live_global_id = self . catalog . get_entry ( & live_item_id) . latest_global_id ( ) ;
425+ let history_item_id = self
426+ . catalog ( )
427+ . resolve_builtin_table ( & mz_catalog:: builtin:: MZ_OBJECT_ARRANGEMENT_SIZE_HISTORY ) ;
428+
429+ let read_ts = self . get_local_read_ts ( ) . await ;
430+ let snapshot = match self
431+ . controller
432+ . storage_collections
433+ . snapshot ( live_global_id, read_ts)
434+ . await
435+ {
436+ Ok ( s) => s,
437+ Err ( e) => {
438+ tracing:: warn!( "arrangement sizes snapshot failed: {e:?}" ) ;
439+ drop ( collection_timer) ;
440+ self . schedule_arrangement_sizes_collection ( ) . await ;
441+ return ;
442+ }
443+ } ;
444+
445+ // `collection_ts` is stamped after the snapshot so it's always >= the
446+ // state the rows describe, and monotone across restarts. The snapshot
447+ // read and this stamp aren't atomic, but the resulting skew is bounded
448+ // by snapshot latency and negligible at this cadence.
449+ let collection_ts: EpochMillis = self . get_local_write_ts ( ) . await . timestamp . into ( ) ;
450+ let collection_datum = Datum :: TimestampTz (
451+ mz_ore:: now:: to_datetime ( collection_ts)
452+ . try_into ( )
453+ . expect ( "collection_timestamp must fit into TimestampTz" ) ,
454+ ) ;
455+
456+ let mut consolidated = snapshot;
457+ differential_dataflow:: consolidation:: consolidate ( & mut consolidated) ;
458+
459+ // Column positions in `mz_object_arrangement_sizes`.
460+ const LIVE_COL_REPLICA_ID : usize = 0 ;
461+ const LIVE_COL_OBJECT_ID : usize = 1 ;
462+ const LIVE_COL_SIZE : usize = 2 ;
463+ const LIVE_COL_COUNT : usize = 3 ;
464+
465+ let mut skipped_malformed: u64 = 0 ;
466+ let mut skipped_null_size: u64 = 0 ;
467+ let updates: Vec < BuiltinTableUpdate > = consolidated
468+ . into_iter ( )
469+ . filter_map ( |( row, diff) | {
470+ if diff != 1 {
471+ return None ;
472+ }
473+ let datums = row. unpack ( ) ;
474+ // Surface schema drift via a warn log below rather than silently
475+ // skipping entire snapshots.
476+ if datums. len ( ) != LIVE_COL_COUNT {
477+ skipped_malformed += 1 ;
478+ return None ;
479+ }
480+ let replica_id = datums[ LIVE_COL_REPLICA_ID ] . unwrap_str ( ) ;
481+ let object_id = datums[ LIVE_COL_OBJECT_ID ] . unwrap_str ( ) ;
482+ let size_datum = datums[ LIVE_COL_SIZE ] ;
483+ // The history table's `size` is non-null; fabricating zero would
484+ // be misleading, so drop.
485+ if size_datum. is_null ( ) {
486+ skipped_null_size += 1 ;
487+ return None ;
488+ }
489+ let size = size_datum. unwrap_int64 ( ) ;
490+ let new_row = Row :: pack_slice ( & [
491+ Datum :: String ( replica_id) ,
492+ Datum :: String ( object_id) ,
493+ Datum :: Int64 ( size) ,
494+ collection_datum,
495+ ] ) ;
496+ Some ( BuiltinTableUpdate :: row ( history_item_id, new_row, Diff :: ONE ) )
497+ } )
498+ . collect ( ) ;
499+ if skipped_malformed > 0 {
500+ warn ! (
501+ "mz_object_arrangement_sizes schema drift: skipped {skipped_malformed} rows \
502+ with unexpected arity"
503+ ) ;
504+ }
505+ if skipped_null_size > 0 {
506+ tracing:: debug!( "skipped {skipped_null_size} live rows with null size" ) ;
507+ }
508+
509+ let row_count = updates. len ( ) ;
510+ // Captures snapshot + row construction. The async table-apply below
511+ // is captured separately by `mz_append_table_duration_seconds`.
512+ collection_timer. observe_duration ( ) ;
513+
514+ if !updates. is_empty ( ) {
515+ self . metrics
516+ . arrangement_sizes_rows_written
517+ . inc_by ( u64:: cast_from ( row_count) ) ;
518+ // TODO(arrangement-sizes): when the writeable-catalog-server plumbing
519+ // in https://github.com/MaterializeInc/materialize/pull/35436 lands,
520+ // append directly on `mz_catalog_server` instead of going through
521+ // the environmentd builtin-table-update path.
522+ let ( fut, _) = self . builtin_table_update ( ) . execute ( updates) . await ;
523+ let internal_cmd_tx = self . internal_cmd_tx . clone ( ) ;
524+ let task_span =
525+ info_span ! ( parent: None , "coord::arrangement_sizes_snapshot::table_updates" ) ;
526+ OpenTelemetryContext :: obtain ( ) . attach_as_parent_to ( & task_span) ;
527+ task:: spawn ( || "arrangement_sizes_snapshot_apply" , async move {
528+ fut. instrument ( task_span) . await ;
529+ if let Err ( e) = internal_cmd_tx. send ( Message :: ArrangementSizesSchedule ) {
530+ warn ! ( "internal_cmd_rx dropped before we could send: {e:?}" ) ;
531+ }
532+ } ) ;
533+ } else {
534+ self . schedule_arrangement_sizes_collection ( ) . await ;
535+ }
536+
537+ tracing:: debug!(
538+ "appended {row_count} rows to mz_object_arrangement_size_history at ts {collection_ts}"
539+ ) ;
540+ }
541+
542+ #[ mz_ore:: instrument( level = "debug" ) ]
543+ async fn arrangement_sizes_prune ( & mut self , expired : Vec < BuiltinTableUpdate > ) {
544+ let ( fut, _) = self . builtin_table_update ( ) . execute ( expired) . await ;
545+ task:: spawn ( || "arrangement_sizes_pruning_apply" , async move {
546+ fut. await ;
547+ } ) ;
548+ }
549+
343550 #[ mz_ore:: instrument( level = "debug" ) ]
344551 async fn message_command ( & mut self , cmd : Command ) {
345552 self . handle_command ( cmd) . await ;
0 commit comments