Skip to content

Commit 09a5f7d

Browse files
maksymarIDX GitHub Automation
andauthored
feat(DSM): add canister_log_retention_seconds metric (#9952)
## Summary - Add `canister_log_retention_seconds` histogram to `SchedulerMetrics` — wall-clock span between the oldest and newest records currently in a canister's log buffer. - Activity-gated observation at round finalization; cost scales with activity, not canister count. - Works on both log stores (branched on `LOG_MEMORY_STORE_FEATURE_ENABLED`), giving directly-comparable before/after-flip data on the same histogram. On the new path, `first_timestamp` is cached in a transient `Option<u64>` on `LogMemoryStore`, populated eagerly from every mutation site — no header change, no checkpoint migration. - Buckets: `decimal_buckets_with_zero(1, 6)` → 19 buckets, `[0, 10 s .. ~58 d]`. ## Stacked on Depends on #9948. Base is `maksym/canister-log-metrics`; retarget to `master` once #9948 lands. --------- Co-authored-by: IDX GitHub Automation <infra+github-automation@dfinity.org>
1 parent f48e8e2 commit 09a5f7d

7 files changed

Lines changed: 260 additions & 15 deletions

File tree

rs/execution_environment/src/scheduler.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,6 +1495,19 @@ impl Scheduler for SchedulerImpl {
14951495
} else {
14961496
old_log.delta_log_sizes()
14971497
};
1498+
// Observe retention from whichever log store is active,
1499+
// only for canisters that appended this round.
1500+
let retention = if LOG_MEMORY_STORE_FEATURE_ENABLED {
1501+
new_log
1502+
.has_delta_log_sizes()
1503+
.then(|| new_log.retention())
1504+
.flatten()
1505+
} else {
1506+
old_log
1507+
.has_delta_log_sizes()
1508+
.then(|| old_log.retention())
1509+
.flatten()
1510+
};
14981511
if new_log.has_delta_log_sizes() || old_log.has_delta_log_sizes() {
14991512
// Only clone state if delta log sizes are not empty.
15001513
let canister = Arc::make_mut(canister);
@@ -1511,6 +1524,11 @@ impl Scheduler for SchedulerImpl {
15111524
.canister_log_delta_memory_usage
15121525
.observe(size as f64);
15131526
}
1527+
if let Some(retention) = retention {
1528+
self.metrics
1529+
.canister_log_retention
1530+
.observe(retention.as_secs_f64());
1531+
}
15141532

15151533
// TODO(EXC-1124): Re-enable once the cycle balance check is fixed.
15161534
// cycles_out_sum += canister.system_state.queues().output_queue_cycles();

rs/execution_environment/src/scheduler/scheduler_metrics.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub(crate) const SCHEDULER_CORES_INVARIANT_BROKEN: &str = "scheduler_cores_invar
2525
pub struct SchedulerMetrics {
2626
pub(super) canister_age: Histogram,
2727
pub(super) canister_log_delta_memory_usage: Histogram,
28+
pub(super) canister_log_retention: Histogram,
2829
pub(super) canister_ingress_queue_latencies: Histogram,
2930
pub(super) compute_utilization_per_core: Histogram,
3031
pub(super) msg_execution_duration: Histogram,
@@ -91,6 +92,12 @@ impl SchedulerMetrics {
9192
// 1 KiB (2^10) .. 8 MiB (2^23), plus zero — 15 total buckets (0 + 14 powers).
9293
binary_buckets_with_zero(10, 23)
9394
),
95+
canister_log_retention: metrics_registry.histogram(
96+
"canister_log_retention_seconds",
97+
"Time span between the oldest and newest records in the canister log buffer, in seconds.",
98+
// 10 s .. 5×10⁶ s (~58 d), plus zero — 19 total buckets (0 + 18 powers).
99+
decimal_buckets_with_zero(1, 6),
100+
),
94101
canister_ingress_queue_latencies: metrics_registry.histogram(
95102
"scheduler_canister_ingress_queue_latencies_seconds",
96103
"Per-canister mean IC clock duration spent by messages in the ingress queue.",

rs/execution_environment/tests/canister_logging.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1794,6 +1794,39 @@ fn test_metric_canister_log_delta_memory_usage_bytes() {
17941794
);
17951795
}
17961796

1797+
#[test]
1798+
fn test_metric_canister_log_retention_seconds() {
1799+
// Observed by the scheduler at round finalization for canisters that
1800+
// appended log records this round. Retention is the wall-clock span
1801+
// between the oldest and newest records held in the buffer. Both log
1802+
// stores (old `CanisterLog` and new `LogMemoryStore`) compute it the
1803+
// same way — the assertions hold regardless of the feature flag.
1804+
const METRIC: &str = "canister_log_retention_seconds";
1805+
const TIME_ADVANCE: Duration = Duration::from_secs(60);
1806+
let env = setup_env();
1807+
let canister_id = create_and_install_canister(
1808+
&env,
1809+
CanisterSettingsArgsBuilder::new().build(),
1810+
wat_canister()
1811+
.update("test", wat_fn().debug_print(b"hello"))
1812+
.build_wasm(),
1813+
);
1814+
// Seed the buffer with a first record so retention is non-zero on the
1815+
// second observation.
1816+
let _ = env.execute_ingress(canister_id, "test", vec![]);
1817+
let before = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap();
1818+
1819+
// Advance simulated time and append another record.
1820+
env.advance_time(TIME_ADVANCE);
1821+
let _ = env.execute_ingress(canister_id, "test", vec![]);
1822+
1823+
let after = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap();
1824+
assert_eq!(after.count, before.count + 1);
1825+
let sample = after.sum - before.sum;
1826+
// The new sample should report at least the advanced wall-clock gap.
1827+
assert_le!(TIME_ADVANCE.as_secs_f64(), sample);
1828+
}
1829+
17971830
#[test]
17981831
fn test_metric_canister_log_resize_duration_seconds() {
17991832
// Observed at the resize call site in `CanisterManager::update_settings`

rs/replicated_state/src/canister_state/system_state/log_memory_store/mod.rs

Lines changed: 99 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use ic_validate_eq_derive::ValidateEq;
2020
use std::collections::VecDeque;
2121
use std::sync::Arc;
2222
use std::sync::OnceLock;
23+
use std::time::Duration;
2324

2425
/// Upper bound on stored delta-log sizes used for metrics.
2526
/// Limits memory growth, 10k covers expected per-round
@@ -71,6 +72,17 @@ pub struct LogMemoryStore {
7172
#[validate_eq(Ignore)]
7273
header_cache: OnceLock<Option<Header>>,
7374

75+
/// Cached timestamp of the oldest live record. Makes the retention
76+
/// metric's per-round observation (`max_timestamp − first_timestamp`)
77+
/// an O(1) field read instead of a cold page-map read at `data_head`.
78+
///
79+
/// Plain `Option<u64>` (not `OnceLock` like `header_cache`) because we
80+
/// populate eagerly from every mutation site; no lazy init under
81+
/// `&self` needed. Not persisted across checkpoints; rebuilt in
82+
/// `new_inner`.
83+
#[validate_eq(Ignore)]
84+
first_timestamp_cache: Option<u64>,
85+
7486
/// (!) No need to preserve across checkpoints.
7587
/// Tracks the size of each delta log appended during a round.
7688
/// Multiple logs can be appended in one round (e.g. heartbeat, timers, or message executions).
@@ -104,21 +116,33 @@ impl LogMemoryStore {
104116
maybe_page_map: Option<PageMap>,
105117
persistent_next_idx: u64,
106118
) -> Self {
107-
Self {
119+
let maybe_page_map = if feature_flag == FlagStatus::Enabled {
120+
maybe_page_map
121+
} else {
122+
None
123+
};
124+
let persistent_next_idx = if feature_flag == FlagStatus::Enabled {
125+
persistent_next_idx
126+
} else {
127+
0
128+
};
129+
// Rebuild the first-timestamp cache from the ring buffer so the
130+
// invariant holds immediately after `from_checkpoint`, without
131+
// waiting for the next mutation to populate it.
132+
let first_timestamp_cache = maybe_page_map
133+
.clone()
134+
.and_then(RingBuffer::load_checked)
135+
.and_then(|rb| rb.first_timestamp(&rb.get_header()));
136+
let store = Self {
108137
feature_flag,
109-
maybe_page_map: if feature_flag == FlagStatus::Enabled {
110-
maybe_page_map
111-
} else {
112-
None
113-
},
114-
persistent_next_idx: if feature_flag == FlagStatus::Enabled {
115-
persistent_next_idx
116-
} else {
117-
0
118-
},
138+
maybe_page_map,
139+
persistent_next_idx,
119140
header_cache: OnceLock::new(),
141+
first_timestamp_cache,
120142
delta_log_sizes: VecDeque::new(),
121-
}
143+
};
144+
debug_assert!(store.stats_ok());
145+
store
122146
}
123147

124148
/// Provides access to the underlying `PageMap`.
@@ -148,15 +172,21 @@ impl LogMemoryStore {
148172
self.save_ring_buffer(ring_buffer);
149173
} else {
150174
self.header_cache = OnceLock::new();
175+
self.first_timestamp_cache = None;
176+
debug_assert!(self.stats_ok());
151177
}
152178
}
153179

154-
/// Update page_map, header_cache and persistent_next_idx.
180+
/// Update page_map, header_cache, first_timestamp_cache and persistent_next_idx.
155181
fn save_ring_buffer(&mut self, ring_buffer: RingBuffer) {
182+
let header = ring_buffer.get_header();
183+
let first_timestamp = ring_buffer.first_timestamp(&header);
156184
self.maybe_page_map = Some(ring_buffer.to_page_map());
157-
self.header_cache = OnceLock::from(Some(ring_buffer.get_header()));
185+
self.header_cache = OnceLock::from(Some(header));
186+
self.first_timestamp_cache = first_timestamp;
158187
// Must come after header_cache update, since next_idx() reads from it.
159188
self.persistent_next_idx = self.next_idx();
189+
debug_assert!(self.stats_ok());
160190
}
161191

162192
/// Deallocates underlying memory.
@@ -165,6 +195,8 @@ impl LogMemoryStore {
165195
self.persistent_next_idx = self.next_idx();
166196
self.maybe_page_map = None;
167197
self.header_cache = OnceLock::new();
198+
self.first_timestamp_cache = None;
199+
debug_assert!(self.stats_ok());
168200
}
169201

170202
/// Loads the ring buffer from the page map.
@@ -174,13 +206,63 @@ impl LogMemoryStore {
174206
.and_then(RingBuffer::load_checked)
175207
}
176208

209+
/// Invariant: populated caches must match what a fresh read of the ring
210+
/// buffer would return. Intended to be called only via `debug_assert!`;
211+
/// the `cfg!` guard keeps the body a no-op in release regardless of
212+
/// caller, so it's fine for the check to be expensive.
213+
fn stats_ok(&self) -> bool {
214+
if !cfg!(debug_assertions) {
215+
return true;
216+
}
217+
let ring_buffer = self.load_ring_buffer();
218+
let actual_header = ring_buffer.as_ref().map(|rb| rb.get_header());
219+
// `header_cache` is lazy: if populated, must match; if empty, skip.
220+
if let Some(cached) = self.header_cache.get()
221+
&& *cached != actual_header
222+
{
223+
return false;
224+
}
225+
// `first_timestamp_cache` is kept eagerly in sync, so must always match.
226+
let actual_first_ts = ring_buffer
227+
.as_ref()
228+
.zip(actual_header.as_ref())
229+
.and_then(|(rb, h)| rb.first_timestamp(h));
230+
if self.first_timestamp_cache != actual_first_ts {
231+
return false;
232+
}
233+
true
234+
}
235+
177236
/// Returns the ring buffer header.
178237
fn get_header(&self) -> Option<Header> {
179238
*self
180239
.header_cache
181240
.get_or_init(|| self.load_ring_buffer().map(|rb| rb.get_header()))
182241
}
183242

243+
/// Returns the timestamp of the most recently appended record, or `None`
244+
/// if the buffer is empty. O(1) via the cached header.
245+
pub fn max_timestamp(&self) -> Option<u64> {
246+
let header = self.get_header()?;
247+
(header.data_size.get() > 0).then_some(header.max_timestamp)
248+
}
249+
250+
/// Returns the timestamp of the oldest live record, or `None` if the
251+
/// buffer is empty. O(1) field read — the cache is kept in sync with
252+
/// the ring buffer by every mutation.
253+
pub fn first_timestamp(&self) -> Option<u64> {
254+
self.first_timestamp_cache
255+
}
256+
257+
/// Returns the time span between the oldest and newest records, or
258+
/// `None` if the buffer is empty. Returns `Duration::ZERO` when both
259+
/// timestamps are equal (single record).
260+
pub fn retention(&self) -> Option<Duration> {
261+
let max = self.max_timestamp()?;
262+
let first = self.first_timestamp()?;
263+
Some(Duration::from_nanos(max.saturating_sub(first)))
264+
}
265+
184266
/// Returns the total allocated memory.
185267
pub fn memory_usage(&self) -> usize {
186268
self.total_virtual_memory_usage()
@@ -374,13 +456,15 @@ impl Clone for LogMemoryStore {
374456
Some(val) => OnceLock::from(*val),
375457
None => OnceLock::new(),
376458
},
459+
first_timestamp_cache: self.first_timestamp_cache,
377460
}
378461
}
379462
}
380463

381464
impl PartialEq for LogMemoryStore {
382465
fn eq(&self, other: &Self) -> bool {
383-
// header_cache is a transient cache and should not be compared.
466+
// header_cache and first_timestamp_cache are transient caches and
467+
// should not be compared.
384468
self.feature_flag == other.feature_flag
385469
&& self.maybe_page_map == other.maybe_page_map
386470
&& self.persistent_next_idx == other.persistent_next_idx

rs/replicated_state/src/canister_state/system_state/log_memory_store/ring_buffer.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,21 @@ impl RingBuffer {
8484
self.io.load_header()
8585
}
8686

87+
/// Returns the timestamp of the oldest live record, or `None` if the
88+
/// buffer is empty. Takes an already-loaded `header` to avoid a
89+
/// redundant page-map read when the caller has one in hand.
90+
///
91+
/// Reads the record header at `data_head` — the same access pattern used
92+
/// by `load_index_table` to reconstruct its `front` entry.
93+
pub fn first_timestamp(&self, header: &Header) -> Option<u64> {
94+
if header.data_size.get() == 0 {
95+
return None;
96+
}
97+
self.io
98+
.load_record_without_content(header, header.data_head)
99+
.map(|r| r.timestamp)
100+
}
101+
87102
/// Returns the data capacity of the ring buffer.
88103
#[cfg(test)]
89104
pub fn byte_capacity(&self) -> usize {

rs/replicated_state/src/canister_state/system_state/log_memory_store/tests/log_memory_store.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,56 @@ fn initialization_defaults() {
6868
assert_eq!(s.next_idx(), 0);
6969
}
7070

71+
#[test]
72+
fn test_retention_across_lifecycle() {
73+
use std::time::Duration;
74+
75+
// Empty store: no header, no retention.
76+
let mut s = LogMemoryStore::new(TEST_LOG_MEMORY_STORE_FEATURE);
77+
assert_eq!(s.first_timestamp(), None);
78+
assert_eq!(s.max_timestamp(), None);
79+
assert_eq!(s.retention(), None);
80+
81+
// Allocated but still empty: both timestamps report None.
82+
s.resize_for_testing(TEST_LOG_MEMORY_LIMIT);
83+
assert_eq!(s.first_timestamp(), None);
84+
assert_eq!(s.max_timestamp(), None);
85+
assert_eq!(s.retention(), None);
86+
87+
// Single record: retention is zero.
88+
let mut delta = CanisterLog::default_delta();
89+
delta.add_record(1_000_000_000, b"a".to_vec());
90+
s.append_delta_log(&mut delta);
91+
assert_eq!(s.first_timestamp(), Some(1_000_000_000));
92+
assert_eq!(s.max_timestamp(), Some(1_000_000_000));
93+
assert_eq!(s.retention(), Some(Duration::ZERO));
94+
95+
// Multiple records: retention spans first..last.
96+
let mut delta = CanisterLog::new_delta_with_next_index(s.next_idx(), TEST_LOG_MEMORY_LIMIT);
97+
delta.add_record(1_500_000_000, b"b".to_vec()); // t = 1.5 s
98+
delta.add_record(61_000_000_000, b"c".to_vec()); // t = 61 s
99+
s.append_delta_log(&mut delta);
100+
assert_eq!(s.first_timestamp(), Some(1_000_000_000));
101+
assert_eq!(s.max_timestamp(), Some(61_000_000_000));
102+
assert_eq!(s.retention(), Some(Duration::from_secs(60)));
103+
104+
// Clear empties the buffer — both timestamps report None.
105+
s.clear();
106+
assert_eq!(s.first_timestamp(), None);
107+
assert_eq!(s.max_timestamp(), None);
108+
assert_eq!(s.retention(), None);
109+
110+
// Reappend, then deallocate — both caches go empty.
111+
let mut delta = CanisterLog::new_delta_with_next_index(s.next_idx(), TEST_LOG_MEMORY_LIMIT);
112+
delta.add_record(2_000_000_000, b"d".to_vec());
113+
s.append_delta_log(&mut delta);
114+
assert!(s.retention().is_some());
115+
s.deallocate();
116+
assert_eq!(s.first_timestamp(), None);
117+
assert_eq!(s.max_timestamp(), None);
118+
assert_eq!(s.retention(), None);
119+
}
120+
71121
#[test]
72122
fn test_appending_to_uninitialized_store_is_no_op() {
73123
let mut s = LogMemoryStore::new(TEST_LOG_MEMORY_STORE_FEATURE);

0 commit comments

Comments
 (0)