diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 2ad1102..a0bd0a5 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,6 +6,8 @@ ## Upgrading +- The `ChronoError` variant has been removed from the `ErrorKind` enum. This is a breaking change for code that pattern-matches on `ErrorKind`. The variant had no remaining constructors; the time-delta arithmetic that previously surfaced it is now performed inside the new `WallClockTimer`, and resampling-interval misconfiguration is reported as `InvalidConfig`. + - `LogicalMeterConfig` instances can't be created directly anymore, and need to be created using the `LogicalMeterConfig::new` method. This helps avoid future breaking changes, as we add more config parameters. - Formula streaming methods in the `LogicalMeterHandle` no longer take metric as a function parameter, but expect a generic argument. For example: @@ -36,6 +38,8 @@ - `power()` — a `Formula` for the pool's aggregated power. - `power_bounds()` — a `broadcast::Receiver>>` tracking the pool's power bounds. +- The logical meter now survives NTP-style wall-clock jumps. A new internal `WallClockTimer` schedules resampler ticks against the wall clock while sleeping on tokio's monotonic clock; when the two diverge by more than one interval in either direction, the timer realigns and the actor rebuilds its inner resamplers against the new clock frame. Subscribers see one `None` sample at the resync tick (preserving the every-interval cadence) and real values resume on the next tick. Note: a backward jump produces a sample timestamped in the past relative to the previous one — consumers that assume monotonically increasing `Sample` timestamps must handle this. + ## Bug Fixes diff --git a/src/bounds.rs b/src/bounds.rs index f0cf8d5..183b4f6 100644 --- a/src/bounds.rs +++ b/src/bounds.rs @@ -380,8 +380,8 @@ mod tests { /// single endpoint merge into one. #[test] fn squash_merges_touching_endpoints() { - let a = vec![Bounds::new(Some(1.0), Some(5.0))]; - let b = vec![Bounds::new(Some(5.0), Some(10.0))]; + let a = [Bounds::new(Some(1.0), Some(5.0))]; + let b = [Bounds::new(Some(5.0), Some(10.0))]; // `intersect_bounds_sets` runs the pairwise intersect through squash. let result = intersect_bounds_sets( &[Bounds::new(Some(0.0), Some(20.0))], diff --git a/src/client/test_utils.rs b/src/client/test_utils.rs index 08f9ac4..e9d4741 100644 --- a/src/client/test_utils.rs +++ b/src/client/test_utils.rs @@ -3,11 +3,15 @@ //! A mock implementation of the MicrogridApiClient for testing. +mod tokio_synced_clock; +pub use tokio_synced_clock::TokioSyncedClock; + use std::{sync::Arc, time::SystemTime}; use tokio_stream::wrappers::ReceiverStream; use tonic::Response; +use crate::wall_clock_timer::Clock as _; use crate::{ client::proto::{ common::{ @@ -43,18 +47,28 @@ use super::MicrogridApiClient; pub struct MockMicrogridApiClient { pub components: Vec>, pub connections: Vec, + /// Shared clock used for every emitted `sample_time`. Tests that want + /// to inject wall-clock jumps construct their own [`TokioSyncedClock`], + /// share a clone with [`LogicalMeterActor`], and pass another in via + /// [`MockMicrogridApiClient::new_with_clock`]. + clock: TokioSyncedClock, } +/// One row per emitted telemetry frame: `(power, reactive_power, voltage, +/// current)`. Each field is independently optional so individual metrics +/// can be omitted from a frame. +pub type MockMetricRow = ( + Option, + Option, + Option, + Option, +); + #[derive(Default, Debug, Clone)] pub struct MockComponent { pub component: ElectricalComponent, pub children: Vec, - pub metrics: Vec<( - Option, - Option, - Option, - Option, - )>, + pub metrics: Vec, /// Overrides the state code reported in each telemetry sample. `None` /// defaults to `Ready`. state_code: Option, @@ -63,8 +77,6 @@ pub struct MockComponent { /// prevents the client actor from reconnecting and replaying the same /// data. Useful for testing missing-data timeouts. silence_after_metrics: bool, - start_ts: Option, - start_instant: Option, } impl MockComponent { @@ -166,7 +178,7 @@ impl MockComponent { if self.component.category == ElectricalComponentCategory::Unspecified as i32 { panic!("Cannot add children to a hidden load component"); } - self.children.extend(children.into_iter()); + self.children.extend(children); self } @@ -240,58 +252,56 @@ impl MockComponent { self.silence_after_metrics = true; self } - - pub fn with_start_times(mut self, ts: SystemTime, instant: tokio::time::Instant) -> Self { - self.start_ts = Some(ts); - self.start_instant = Some(instant); - self - } } impl MockMicrogridApiClient { - /// Creates a new `MockMicrogridApiClient` with default successful responses. + /// Creates a new `MockMicrogridApiClient` with an internally-owned + /// [`TokioSyncedClock`] anchored to the next whole-second boundary, so + /// telemetry timestamps line up with the resampler's interval boundaries + /// and tests get reproducible resampled values. pub fn new(graph: MockComponent) -> Self { + let since_epoch = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default(); + let next_sec_secs = since_epoch.as_secs() + 1; + + // The anchor is the wall-clock value `TokioSyncedClock` will report + // at the current tokio instant; it doesn't have to match real time, + // so there's no need to sleep until the boundary actually arrives. + let anchor = chrono::DateTime::::from_timestamp(next_sec_secs as i64, 0) + .unwrap_or_else(chrono::Utc::now); + Self::new_with_clock(graph, TokioSyncedClock::with_wall_anchor(anchor)) + } + + /// Returns a clone of the clock driving telemetry timestamps. Pass it + /// to [`LogicalMeterHandle::try_new_with_clock`] so the resampler and + /// the mock observe the same wall-clock value at every tokio instant. + pub fn clock(&self) -> TokioSyncedClock { + self.clock.clone() + } + + /// Creates a `MockMicrogridApiClient` whose telemetry timestamps come + /// from the given clock. Share a clone with [`LogicalMeterActor`] to + /// simulate whole-machine NTP jumps that both sides observe. + pub fn new_with_clock(graph: MockComponent, clock: TokioSyncedClock) -> Self { let mut this_client = Self { components: vec![], connections: vec![], + clock, }; - let now = SystemTime::now(); - - let since_epoch = now - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default(); - let next_sec = - SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(since_epoch.as_secs() + 1); - - // Sleep until the start of the next second to align telemetry - // timestamps with the resampler's clock, so that we get reproducible - // values from the logical meter. - std::thread::sleep(next_sec.duration_since(now).unwrap_or_default()); - - let now = next_sec; - let now_instant = tokio::time::Instant::now(); - - fn traverse( - node: &MockComponent, - client: &mut MockMicrogridApiClient, - now: &SystemTime, - now_instant: &tokio::time::Instant, - ) { - client.components.push(Arc::new( - node.clone() - .with_start_times(now.clone(), now_instant.clone()), - )); + fn traverse(node: &MockComponent, client: &mut MockMicrogridApiClient) { + client.components.push(Arc::new(node.clone())); for child in &node.children { client.connections.push(ElectricalComponentConnection { source_electrical_component_id: node.component.id, destination_electrical_component_id: child.component.id, operational_lifetime: None, }); - traverse(child, client, &now, &now_instant); + traverse(child, client); } } - traverse(&Arc::new(graph), &mut this_client, &now, &now_instant); + traverse(&graph, &mut this_client); this_client } @@ -351,124 +361,134 @@ impl MicrogridApiClient for MockMicrogridApiClient { .find(|c| c.component.id == comp_id) .cloned(); - // TODO: use wall time for next ts, if that's the issue. - if let Some(component) = component { - if !component.metrics.is_empty() { - let metrics = component.metrics.clone(); - let state_code = component - .state_code - .unwrap_or(ElectricalComponentStateCode::Ready); - let silence_after_metrics = component.silence_after_metrics; - tokio::spawn(async move { - let dur = std::time::Duration::from_millis(200); - let mut interval = tokio::time::interval(dur); - let mut next_ts = component.start_ts.unwrap() - + (tokio::time::Instant::now() - component.start_instant.unwrap()); - - for metrics in metrics.iter() { - interval.tick().await; - next_ts += dur; - let duration_since_epoch = - next_ts.duration_since(SystemTime::UNIX_EPOCH).unwrap(); - let ts = Some(protobuf::Timestamp { - seconds: duration_since_epoch.as_secs() as i64, - nanos: duration_since_epoch.subsec_nanos() as i32, - }); - let mut metric_samples = vec![]; - if let Some(power) = metrics.0 { - metric_samples.push(MetricSample { - sample_time: ts.clone(), - metric: Metric::AcPowerActive as i32, - value: Some(MetricValueVariant { - metric_value_variant: Some( - metric_value_variant::MetricValueVariant::SimpleMetric( - SimpleMetricValue { - value: power.as_watts(), - }, - ), + if let Some(component) = component + && !component.metrics.is_empty() + { + let metrics = component.metrics.clone(); + let state_code = component + .state_code + .unwrap_or(ElectricalComponentStateCode::Ready); + let silence_after_metrics = component.silence_after_metrics; + let clock = self.clock.clone(); + tokio::spawn(async move { + let dur = std::time::Duration::from_millis(200); + let mut interval = tokio::time::interval(dur); + let offset = chrono::TimeDelta::from_std(dur).unwrap_or_default(); + + for metrics in metrics.iter() { + interval.tick().await; + // `tokio::time::interval`'s first tick fires + // immediately, so `clock.wall_now()` is still the + // anchor here. Add one interval so the first sample + // is timestamped at `anchor + dur`, matching the + // resampler's first interval boundary. + let wall = clock.wall_now() + offset; + let sys_delta = + wall.signed_duration_since(chrono::DateTime::::UNIX_EPOCH); + let next_ts = SystemTime::UNIX_EPOCH + + std::time::Duration::from_nanos( + sys_delta.num_nanoseconds().unwrap_or(0).max(0) as u64, + ); + let duration_since_epoch = + next_ts.duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let ts = Some(protobuf::Timestamp { + seconds: duration_since_epoch.as_secs() as i64, + nanos: duration_since_epoch.subsec_nanos() as i32, + }); + let mut metric_samples = vec![]; + if let Some(power) = metrics.0 { + metric_samples.push(MetricSample { + sample_time: ts, + metric: Metric::AcPowerActive as i32, + value: Some(MetricValueVariant { + metric_value_variant: Some( + metric_value_variant::MetricValueVariant::SimpleMetric( + SimpleMetricValue { + value: power.as_watts(), + }, ), - }), - bounds: vec![], - connection: None, - }); - } - if let Some(reactive_power) = metrics.1 { - metric_samples.push(MetricSample { - sample_time: ts.clone(), - metric: Metric::AcPowerReactive as i32, - value: Some(MetricValueVariant { - metric_value_variant: Some( - metric_value_variant::MetricValueVariant::SimpleMetric( - SimpleMetricValue { - value: reactive_power.as_volt_amperes_reactive(), - }, - ), + ), + }), + bounds: vec![], + connection: None, + }); + } + if let Some(reactive_power) = metrics.1 { + metric_samples.push(MetricSample { + sample_time: ts, + metric: Metric::AcPowerReactive as i32, + value: Some(MetricValueVariant { + metric_value_variant: Some( + metric_value_variant::MetricValueVariant::SimpleMetric( + SimpleMetricValue { + value: reactive_power.as_volt_amperes_reactive(), + }, ), - }), - bounds: vec![], - connection: None, - }); - } - if let Some(voltage) = metrics.2 { - metric_samples.push(MetricSample { - sample_time: ts.clone(), - metric: Metric::AcVoltage as i32, - value: Some(MetricValueVariant { - metric_value_variant: Some( - metric_value_variant::MetricValueVariant::SimpleMetric( - SimpleMetricValue { - value: voltage.as_volts(), - }, - ), + ), + }), + bounds: vec![], + connection: None, + }); + } + if let Some(voltage) = metrics.2 { + metric_samples.push(MetricSample { + sample_time: ts, + metric: Metric::AcVoltage as i32, + value: Some(MetricValueVariant { + metric_value_variant: Some( + metric_value_variant::MetricValueVariant::SimpleMetric( + SimpleMetricValue { + value: voltage.as_volts(), + }, ), - }), - bounds: vec![], - connection: None, - }); - } - if let Some(current) = metrics.3 { - metric_samples.push(MetricSample { - sample_time: ts.clone(), - metric: Metric::AcCurrent as i32, - value: Some(MetricValueVariant { - metric_value_variant: Some( - metric_value_variant::MetricValueVariant::SimpleMetric( - SimpleMetricValue { - value: current.as_amperes(), - }, - ), + ), + }), + bounds: vec![], + connection: None, + }); + } + if let Some(current) = metrics.3 { + metric_samples.push(MetricSample { + sample_time: ts, + metric: Metric::AcCurrent as i32, + value: Some(MetricValueVariant { + metric_value_variant: Some( + metric_value_variant::MetricValueVariant::SimpleMetric( + SimpleMetricValue { + value: current.as_amperes(), + }, ), - }), - bounds: vec![], - connection: None, - }); - } - - let resp = ReceiveElectricalComponentTelemetryStreamResponse { - telemetry: Some(ElectricalComponentTelemetry { - electrical_component_id: comp_id, - metric_samples, - // TODO: support sending errors - state_snapshots: vec![ElectricalComponentStateSnapshot { - origin_time: ts, - states: vec![state_code as i32], - warnings: vec![], - errors: vec![], - }], + ), }), - }; - if tx.send(Ok(resp)).await.is_err() { - break; - } + bounds: vec![], + connection: None, + }); } - if silence_after_metrics { - // Hold the sender open indefinitely so the client - // actor doesn't see the stream end and reconnect. - let _keep_open = tx; - std::future::pending::<()>().await; + + let resp = ReceiveElectricalComponentTelemetryStreamResponse { + telemetry: Some(ElectricalComponentTelemetry { + electrical_component_id: comp_id, + metric_samples, + // TODO: support sending errors + state_snapshots: vec![ElectricalComponentStateSnapshot { + origin_time: ts, + states: vec![state_code as i32], + warnings: vec![], + errors: vec![], + }], + }), + }; + if tx.send(Ok(resp)).await.is_err() { + break; } - }); - } + } + if silence_after_metrics { + // Hold the sender open indefinitely so the client + // actor doesn't see the stream end and reconnect. + let _keep_open = tx; + std::future::pending::<()>().await; + } + }); } let stream = tokio_stream::wrappers::ReceiverStream::new(rx); diff --git a/src/client/test_utils/tokio_synced_clock.rs b/src/client/test_utils/tokio_synced_clock.rs new file mode 100644 index 0000000..58829c9 --- /dev/null +++ b/src/client/test_utils/tokio_synced_clock.rs @@ -0,0 +1,76 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! Test clock for `#[tokio::test(start_paused = true)]` tests. +//! +//! Pins wall-clock time to tokio's (possibly paused) monotonic clock, so +//! advancing tokio time advances wall time too. Clones share state, so a +//! test can hand copies to the actor and a mock telemetry source and jump +//! both by calling +//! [`inject_wall_jump`][TokioSyncedClock::inject_wall_jump] once. +//! +//! Lives under `test_utils` so the `Arc`/`RwLock`/injection machinery +//! isn't compiled into production builds. + +use chrono::{DateTime, TimeDelta, Utc}; +use tokio::time::Instant; + +use crate::wall_clock_timer::Clock; + +#[derive(Clone, Debug)] +pub struct TokioSyncedClock { + inner: std::sync::Arc>, +} + +#[derive(Debug)] +struct TokioSyncedClockInner { + wall_anchor: DateTime, + mono_anchor: Instant, +} + +impl Default for TokioSyncedClock { + fn default() -> Self { + Self::new() + } +} + +impl TokioSyncedClock { + /// Creates a clock anchored to the current `Utc::now()` (and the current + /// tokio monotonic instant). Suitable when the caller doesn't care about + /// a specific starting wall-clock value. + pub fn new() -> Self { + Self::with_wall_anchor(Utc::now()) + } + + /// Creates a clock whose wall-clock time at the current tokio instant + /// is exactly `wall_anchor`. Useful when the caller needs the anchor + /// aligned to a specific boundary (e.g. a whole-second tick). + pub fn with_wall_anchor(wall_anchor: DateTime) -> Self { + Self { + inner: std::sync::Arc::new(std::sync::RwLock::new(TokioSyncedClockInner { + wall_anchor, + mono_anchor: Instant::now(), + })), + } + } + + /// Shifts wall-clock time by `offset` relative to the monotonic clock, + /// simulating an NTP jump. Visible to every clone. + pub fn inject_wall_jump(&self, offset: TimeDelta) { + let mut inner = self.inner.write().expect("clock poisoned"); + // Only `wall_anchor` moves — `mono_anchor` is intentionally left + // untouched so wall and monotonic diverge, which is what makes + // this simulate an NTP jump rather than a re-anchor of both + // clocks together. + inner.wall_anchor += offset; + } +} + +impl Clock for TokioSyncedClock { + fn wall_now(&self) -> DateTime { + let inner = self.inner.read().expect("clock poisoned"); + let elapsed = Instant::now().duration_since(inner.mono_anchor); + inner.wall_anchor + + TimeDelta::from_std(elapsed).expect("tokio elapsed fits in TimeDelta (~292 years)") + } +} diff --git a/src/error.rs b/src/error.rs index 6325ea0..30848f1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -56,10 +56,10 @@ ErrorKind!( (ComponentGraphError, component_graph_error), (ComponentDataError, component_data_error), (ConnectionFailure, connection_failure), - (ChronoError, chrono_error), (DroppedUnusedFormulas, dropped_unused_formulas), (FormulaEngineError, formula_engine_error), (InvalidComponent, invalid_component), + (InvalidConfig, invalid_config), (Internal, internal), (APIServerError, api_server_error), ); diff --git a/src/lib.rs b/src/lib.rs index 1c756ee..6e11614 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,5 +34,7 @@ pub use logical_meter::{Formula, FormulaSubscriber, LogicalMeterConfig, LogicalM pub mod metric; +pub(crate) mod wall_clock_timer; + mod microgrid; pub use microgrid::{BatteryPool, Microgrid}; diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index 267ffa9..cda49fb 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -5,16 +5,16 @@ //! component data, evaluating formulas based on that data, and streaming the //! data to subscribers. -use chrono::{DateTime, TimeDelta, Utc}; +use chrono::{DateTime, Utc}; use frequenz_microgrid_formula_engine::FormulaEngine; use frequenz_resampling::ResamplingFunction; use std::collections::{HashMap, HashSet}; use tokio::sync::{broadcast, mpsc, oneshot}; -use tokio::time::{MissedTickBehavior, interval}; use crate::ErrorKind; use crate::client::proto::common::metrics::{Metric, metric_value_variant::MetricValueVariant}; use crate::quantity::{Current, Power, Quantity, ReactivePower, Voltage}; +use crate::wall_clock_timer::{Clock, WallClockTimer}; use crate::{ Error, MicrogridClientHandle, Sample, client::proto::common::microgrid::electrical_components::ElectricalComponentTelemetry, @@ -34,6 +34,29 @@ struct ComponentDataResampler { receiver: broadcast::Receiver, } +/// Polls the broadcast receiver once, logging `Lagged` as a warning +/// (it represents real data loss) and retrying. Returns `Some(data)` +/// with the next sample, or `None` on `Empty` / `Closed`. `Lagged` can +/// happen during a wall-clock jump if the server bursts enough samples +/// to fill the channel buffer, or under sustained back-pressure. +fn poll_telemetry( + receiver: &mut broadcast::Receiver, + component_id: u64, +) -> Option { + loop { + match receiver.try_recv() { + Ok(data) => return Some(data), + Err(tokio::sync::broadcast::error::TryRecvError::Empty) => return None, + Err(tokio::sync::broadcast::error::TryRecvError::Lagged(n)) => { + tracing::warn!( + "resampler receiver lagged {n} samples for cid={component_id}; samples discarded" + ); + } + Err(tokio::sync::broadcast::error::TryRecvError::Closed) => return None, + } + } +} + /// Used to send strongly-typed formula streams from the LogicalMeterActor back /// to the Handle. pub(crate) enum TypedFormulaResponseSender { @@ -90,12 +113,12 @@ pub(crate) enum Instruction { }, } -pub(super) struct LogicalMeterActor { +pub(super) struct LogicalMeterActor { instructions_rx: mpsc::Receiver, client: MicrogridClientHandle, config: LogicalMeterConfig, resampler_ts: DateTime, - resampler_timer: tokio::time::Interval, + resampler_timer: WallClockTimer, } /// Holds all active formulas, grouped by quantity type. @@ -244,47 +267,37 @@ impl Formulas { } } -/// Returns the next timestamp aligned to the epoch based on the given interval. -pub(crate) fn epoch_align(timestamp: DateTime, interval: TimeDelta) -> Option> { - let millis_since_epoch = timestamp.timestamp_millis(); - let interval_millis = interval.num_milliseconds(); - - let intervals_since_epoch = millis_since_epoch / interval_millis; - let aligned_millis_since_epoch = intervals_since_epoch * interval_millis; - - let aligned_timestamp = DateTime::from_timestamp_millis(aligned_millis_since_epoch)?; - - Some(aligned_timestamp) -} - -impl LogicalMeterActor { - pub fn try_new( +impl LogicalMeterActor { + pub(crate) fn try_new( instructions_rx: mpsc::Receiver, client: MicrogridClientHandle, config: LogicalMeterConfig, + clock: C, ) -> Result { - let now = Utc::now(); - let last_aligned_ts = epoch_align(now, config.resampling_interval).ok_or_else(|| { - Error::chrono_error("Failed to align current time to the epoch".to_string()) - })?; - let mut timer = - interval(config.resampling_interval.to_std().map_err(|e| { - Error::chrono_error(format!("Failed to convert interval to std: {e}")) - })?); - timer.set_missed_tick_behavior(MissedTickBehavior::Burst); - - // The next tick should be at the next aligned timestamp. - timer.reset_after( - (last_aligned_ts + config.resampling_interval - now) - .to_std() - .map_err(|e| Error::chrono_error(format!("Failed to calculate time delta: {e}")))?, - ); + if config.resampling_interval <= chrono::TimeDelta::zero() { + return Err(Error::invalid_config(format!( + "resampling_interval must be positive, got {:?}", + config.resampling_interval + ))); + } + if config.max_age_in_intervals > i32::MAX as u32 { + return Err(Error::invalid_config(format!( + "max_age_in_intervals must fit in i32, got {}", + config.max_age_in_intervals + ))); + } + let timer = WallClockTimer::try_new(config.resampling_interval, clock)?; + // Resamplers created before the first tick use `resampler_ts` as + // their start; setting it one interval before the first scheduled + // tick lines up with the original semantics (first tick produces the + // first resampled sample). + let resampler_ts = timer.next_tick_time() - config.resampling_interval; Ok(Self { instructions_rx, client, config, - resampler_ts: last_aligned_ts, + resampler_ts, resampler_timer: timer, }) } @@ -295,8 +308,29 @@ impl LogicalMeterActor { loop { tokio::select! { - _ = self.resampler_timer.tick() => { - self.resampler_ts += self.config.resampling_interval; + tick_info = self.resampler_timer.tick() => { + if tick_info.resynced { + // Wall clock jumped; the inner resamplers' `start` + // fields reference the old clock frame and can't be + // advanced through the gap (the API is + // single-output-per-tick). Drop any buffered + // telemetry from the gap and rebuild them aligned + // to one interval before the realigned current + // tick, so the resample below emits a single + // (empty-buffer → `None`) sample at the realigned + // tick — preserving the every-interval cadence + // across the jump. + let realigned_current = + self.resampler_timer.next_tick_time() + - self.config.resampling_interval; + self.rebuild_resamplers_after_jump( + &mut resamplers, + realigned_current - self.config.resampling_interval, + ); + self.resampler_ts = realigned_current; + } else { + self.resampler_ts = tick_info.expected_tick_time; + } let mut resampled = match self.resample_metrics(&mut resamplers) { Ok(resampled) => resampled, @@ -357,6 +391,34 @@ impl LogicalMeterActor { } } + /// Builds an inner resampler for `metric` aligned to `start`. Used + /// by both the startup path and the post-jump rebuild path so the + /// two stay consistent as `LogicalMeterConfig` evolves. + fn build_resampler( + &self, + metric: Metric, + start: DateTime, + ) -> frequenz_resampling::Resampler> { + let function = self + .config + // Look for a specific metric override first + .resampling_overrides + .get(&metric) + .cloned() + // Then look for a configured default + .or_else(|| self.config.resampling_function.clone()) + // Finally, default to average if no default is configured + .unwrap_or(ResamplingFunction::Average); + frequenz_resampling::Resampler::new( + self.config.resampling_interval, + function, + // Validated at construction to fit in `i32`. + self.config.max_age_in_intervals as i32, + start, + false, + ) + } + async fn start_resamplers( &mut self, components: &HashSet, @@ -371,24 +433,7 @@ impl LogicalMeterActor { let resampler = ComponentDataResampler { component_id: *component_id, metric, - resampler: frequenz_resampling::Resampler::new( - self.config.resampling_interval, - self.config - // Look for a specific metric override first - .resampling_overrides - .get(&metric) - .cloned() - // Then look for a configured default - .or_else(|| self.config.resampling_function.clone()) - // Finally, default to average if no default is - // configured - .unwrap_or(ResamplingFunction::Average), - // The resampler expects max age to be i32, so we need to - // cap it if the user provided a higher value. - self.config.max_age_in_intervals.min(i32::MAX as u32) as i32, - self.resampler_ts, - false, - ), + resampler: self.build_resampler(metric, self.resampler_ts), receiver: self .client .receive_electrical_component_telemetry_stream(*component_id) @@ -480,7 +525,7 @@ impl LogicalMeterActor { let mut resampled_metrics: HashMap>> = HashMap::new(); for (_, resampler) in resamplers.iter_mut() { - while let Ok(data) = resampler.receiver.try_recv() { + while let Some(data) = poll_telemetry(&mut resampler.receiver, resampler.component_id) { self.push_to_resampler(resampler, data, resampler.metric); } let resampled = resampler.resampler.resample(self.resampler_ts); @@ -499,6 +544,26 @@ impl LogicalMeterActor { Ok(resampled_metrics) } + /// Rebuilds every inner `frequenz_resampling::Resampler` with `start` + /// set to the given boundary, preserving each one's telemetry broadcast + /// receiver. Buffered telemetry from the jumped-over window is drained + /// and discarded (including `Lagged` errors from the broadcast receiver, + /// which can happen when the server bursts enough samples during the + /// jump to fill the channel). + fn rebuild_resamplers_after_jump( + &self, + resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>, + start: DateTime, + ) { + for resampler in resamplers.values_mut() { + // Drain any samples that were queued during the jump window; + // they are timestamped on the old wall-clock frame and would + // pollute the freshly-aligned resampler. + while poll_telemetry(&mut resampler.receiver, resampler.component_id).is_some() {} + resampler.resampler = self.build_resampler(resampler.metric, start); + } + } + /// Cleans up resamplers that are no longer needed by any formula. fn cleanup_resamplers( &mut self, @@ -582,3 +647,300 @@ impl LogicalMeterActor { resampler.resampler.push(sample); } } + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeDelta; + use tokio_stream::{StreamExt, wrappers::BroadcastStream}; + + use crate::{ + LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, + client::test_utils::{MockComponent, MockMicrogridApiClient, TokioSyncedClock}, + logical_meter::formula::Formula, + quantity::Power, + }; + + async fn new_handle( + meter: MockComponent, + config: LogicalMeterConfig, + clock: TokioSyncedClock, + ) -> LogicalMeterHandle { + let api_client = MockMicrogridApiClient::new_with_clock( + MockComponent::grid(1).with_children(vec![meter]), + clock.clone(), + ); + LogicalMeterHandle::try_new_with_clock( + MicrogridClientHandle::new_from_client(api_client), + config, + clock, + ) + .await + .unwrap() + } + + // Pins the upstream contract that `rebuild_resamplers_after_jump` + // relies on: after rebuilding with `start = current - interval`, a + // `resample(current)` call on an empty buffer must yield exactly + // one output, with `value() == None`. If `frequenz_resampling` + // ever returns zero outputs for an empty window, the jump-recovery + // path flips from a graceful `None` sample to a runtime + // `ConnectionFailure("Resampling produced N values")`, so this + // assumption deserves a focused regression test rather than only + // implicit coverage from the end-to-end NTP-jump tests. + #[test] + fn test_resampler_empty_window_yields_single_none_sample() { + let interval = TimeDelta::try_seconds(1).unwrap(); + let current = chrono::DateTime::from_timestamp(1_700_000_000, 0).unwrap(); + let start = current - interval; + let mut resampler: frequenz_resampling::Resampler> = + frequenz_resampling::Resampler::new( + interval, + frequenz_resampling::ResamplingFunction::Average, + 3, + start, + false, + ); + let result = resampler.resample(current); + assert_eq!( + result.len(), + 1, + "rebuild contract: empty window must yield exactly one sample, got {}", + result.len(), + ); + assert!( + result[0].clone().value().is_none(), + "rebuild contract: empty window must yield None, got {:?}", + result[0].value(), + ); + } + + #[tokio::test] + async fn test_nonpositive_resampling_interval_rejected() { + let api_client = MockMicrogridApiClient::new(MockComponent::grid(1)); + let client = MicrogridClientHandle::new_from_client(api_client); + for bad in [TimeDelta::zero(), -TimeDelta::try_milliseconds(1).unwrap()] { + let (_tx, rx) = mpsc::channel(1); + let result = LogicalMeterActor::try_new( + rx, + client.clone(), + LogicalMeterConfig::new(bad), + TokioSyncedClock::new(), + ); + match result { + Err(e) => assert_eq!(e.kind(), crate::ErrorKind::InvalidConfig), + Ok(_) => panic!("expected error for interval {bad:?}"), + } + } + } + + #[tokio::test] + async fn test_max_age_in_intervals_overflow_rejected() { + let api_client = MockMicrogridApiClient::new(MockComponent::grid(1)); + let client = MicrogridClientHandle::new_from_client(api_client); + let (_tx, rx) = mpsc::channel(1); + let config = LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()) + .with_max_age_in_intervals(i32::MAX as u32 + 1); + let result = LogicalMeterActor::try_new(rx, client, config, TokioSyncedClock::new()); + match result { + Err(e) => assert_eq!(e.kind(), crate::ErrorKind::InvalidConfig), + Ok(_) => panic!("expected error for over-i32::MAX max_age_in_intervals"), + } + } + + async fn next_sample(stream: &mut BroadcastStream>) -> Option> { + loop { + match tokio::time::timeout(std::time::Duration::from_secs(10), stream.next()).await { + Ok(Some(Ok(s))) => return Some(s), + Ok(Some(Err(_))) => continue, + _ => return None, + } + } + } + + /// Anchors a `TokioSyncedClock` to the next whole-second boundary, so + /// samples emitted at `anchor + 200ms·N` from the mock land on + /// resampler-window boundaries regardless of when in real wall-time + /// the test runs. Without this, `Utc::now()`'s subsecond offset can + /// place the first resampler tick before the mock has emitted + /// anything, surfacing as a flaky `None` first sample. + fn aligned_clock() -> TokioSyncedClock { + let anchor = + chrono::DateTime::from_timestamp(chrono::Utc::now().timestamp() + 1, 0).unwrap(); + TokioSyncedClock::with_wall_anchor(anchor) + } + + #[tokio::test(start_paused = true)] + async fn test_actor_emits_samples_for_subscribed_formula() { + let meter = MockComponent::meter(2) + .with_power(vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0]); + let lm = new_handle( + meter, + LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()), + aligned_clock(), + ) + .await; + let formula: Formula = lm.grid::().unwrap(); + let rx = formula.subscribe().await.unwrap(); + let mut stream = BroadcastStream::new(rx); + + let first = next_sample(&mut stream).await.expect("no first sample"); + let second = next_sample(&mut stream).await.expect("no second sample"); + + assert_eq!( + second.timestamp() - first.timestamp(), + TimeDelta::try_seconds(1).unwrap(), + ); + assert!(first.value().is_some()); + } + + #[tokio::test(start_paused = true)] + async fn test_actor_shares_subscription_across_handles() { + let meter = MockComponent::meter(2) + .with_power(vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0]); + let lm = new_handle( + meter, + LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()), + aligned_clock(), + ) + .await; + let mut a = BroadcastStream::new( + lm.grid::() + .unwrap() + .subscribe() + .await + .unwrap(), + ); + let mut b = BroadcastStream::new( + lm.grid::() + .unwrap() + .subscribe() + .await + .unwrap(), + ); + + let sa = next_sample(&mut a).await.expect("no sample on a"); + let sb = next_sample(&mut b).await.expect("no sample on b"); + assert_eq!(sa.timestamp(), sb.timestamp()); + assert_eq!( + sa.value().map(|v| v.as_watts()), + sb.value().map(|v| v.as_watts()), + ); + } + + // Shared body for forward/backward NTP-jump recovery tests. Asserts the + // sample-timestamp contract across the jump in addition to values: + // + // - pre-jump cadence is exactly `interval`, values are the baseline 10 W + // - the first post-jump sample is the resync tick: `None`-valued and + // timestamped at `last_pre + jump + interval` (holds for signed `jump`) + // - subsequent ticks flow at `interval` cadence with post-jump values + async fn run_ntp_jump_recovery(jump: TimeDelta) { + let interval = TimeDelta::try_milliseconds(200).unwrap(); + let clock = aligned_clock(); + let power: Vec = (0..200).map(|i| if i < 10 { 10.0 } else { 99.0 }).collect(); + let meter = MockComponent::meter(2).with_power(power); + + let lm = new_handle(meter, LogicalMeterConfig::new(interval), clock.clone()).await; + let formula = lm.grid::().unwrap(); + let mut stream = BroadcastStream::new(formula.subscribe().await.unwrap()); + + let mut pre = Vec::new(); + for _ in 0..4 { + if let Some(s) = next_sample(&mut stream).await { + pre.push(s); + } + } + assert_eq!(pre.len(), 4, "expected 4 pre-jump samples"); + for w in pre.windows(2) { + assert_eq!( + w[1].timestamp() - w[0].timestamp(), + interval, + "pre-jump cadence should be {interval:?}", + ); + } + for s in &pre { + assert_eq!( + s.value().map(|v| v.as_watts()), + Some(10.0), + "pre-jump sample should be baseline 10.0 W, got {:?}", + s.value(), + ); + } + let last_pre_ts = pre.last().unwrap().timestamp(); + + clock.inject_wall_jump(jump); + + let resync = next_sample(&mut stream) + .await + .expect("no resync sample after jump"); + assert!( + resync.value().is_none(), + "resync tick should be None (buffered telemetry was on the old clock frame), got {:?}", + resync.value(), + ); + assert_eq!( + resync.timestamp() - last_pre_ts, + jump + interval, + "resync sample should be jump + interval after the last pre-jump sample", + ); + + // Collect enough post-jump samples to see the mock's power profile + // roll past its baseline-10 prefix into the 99 region. Cadence and + // "resync was the only None" are invariants across every sample; + // the 99 W value only needs to appear by the end of the window. + let mut post = Vec::new(); + for _ in 0..10 { + if let Some(s) = next_sample(&mut stream).await { + post.push(s); + } + } + assert_eq!(post.len(), 10, "expected 10 post-jump samples"); + assert_eq!( + post[0].timestamp() - resync.timestamp(), + interval, + "first post-resync tick should be one interval after the resync tick", + ); + for w in post.windows(2) { + assert_eq!( + w[1].timestamp() - w[0].timestamp(), + interval, + "post-jump cadence should be {interval:?}", + ); + } + for s in &post { + assert!( + s.value().is_some(), + "post-resync samples should carry real values, got {:?}", + s.value(), + ); + } + let last = post.last().unwrap(); + assert!( + last.value() + .map(|v| (v.as_watts() - 99.0).abs() < 0.01) + .unwrap_or(false), + "last post-jump sample should be ≈99.0 W, got {:?}", + last.value(), + ); + } + + // Realistic NTP resync: a single shared clock drives both the mock + // telemetry's `sample_time`s and the actor's `WallClockTimer`. A mid-run + // `inject_wall_jump(+30s)` appears to both sides simultaneously, like a + // whole-machine NTP adjustment. The WallClockTimer detects the drift + // between wall and monotonic on the next sleep, resyncs, and the actor + // rebuilds the inner resamplers. Post-jump telemetry should flow through + // again. + #[tokio::test(start_paused = true)] + async fn test_actor_recovers_from_whole_machine_ntp_jump() { + run_ntp_jump_recovery(TimeDelta::try_seconds(30).unwrap()).await; + } + + // Symmetric to the forward-jump test: a whole-machine backward NTP + // adjustment should resync the timer and flow post-jump telemetry. + #[tokio::test(start_paused = true)] + async fn test_actor_recovers_from_whole_machine_backward_ntp_jump() { + run_ntp_jump_recovery(-TimeDelta::try_seconds(30).unwrap()).await; + } +} diff --git a/src/logical_meter/logical_meter_handle.rs b/src/logical_meter/logical_meter_handle.rs index feb7aba..25f79f0 100644 --- a/src/logical_meter/logical_meter_handle.rs +++ b/src/logical_meter/logical_meter_handle.rs @@ -29,6 +29,14 @@ impl LogicalMeterHandle { pub async fn try_new( client: MicrogridClientHandle, config: LogicalMeterConfig, + ) -> Result { + Self::try_new_with_clock(client, config, crate::wall_clock_timer::SystemClock).await + } + + pub(crate) async fn try_new_with_clock( + client: MicrogridClientHandle, + config: LogicalMeterConfig, + clock: C, ) -> Result { let (sender, receiver) = mpsc::channel(8); let graph = ComponentGraph::try_new( @@ -47,7 +55,7 @@ impl LogicalMeterHandle { Error::component_graph_error(format!("Unable to create a component graph: {e}")) })?; - let logical_meter = LogicalMeterActor::try_new(receiver, client, config)?; + let logical_meter = LogicalMeterActor::try_new(receiver, client, config, clock)?; tokio::task::spawn(async move { logical_meter.run().await; @@ -233,9 +241,11 @@ mod tests { ]), ); - LogicalMeterHandle::try_new( + let clock = api_client.clock(); + LogicalMeterHandle::try_new_with_clock( MicrogridClientHandle::new_from_client(api_client), config.unwrap_or_else(|| LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap())), + clock, ) .await .unwrap() @@ -557,7 +567,7 @@ mod tests { ) { let values = samples .iter() - .map(|res| res.value().map(|v| extractor(v))) + .map(|res| res.value().map(&extractor)) .collect::>(); samples.as_slice().windows(2).for_each(|w| { diff --git a/src/quantity/current.rs b/src/quantity/current.rs index cda4ad4..5d44bbb 100644 --- a/src/quantity/current.rs +++ b/src/quantity/current.rs @@ -82,8 +82,8 @@ mod tests { assert_eq!(s(-0.001558), "-1.558 mA"); assert_eq!(p(-0.001558, 1), "-1.6 mA"); - assert_eq!(s(-2030.04487), "-2030.045 A"); - assert_eq!(p(-2030.04487, 1), "-2030 A"); - assert_eq!(p(-2030.04487, 2), "-2030.04 A"); + assert_eq!(s(-2030.0449), "-2030.045 A"); + assert_eq!(p(-2030.0449, 1), "-2030 A"); + assert_eq!(p(-2030.0449, 2), "-2030.04 A"); } } diff --git a/src/quantity/energy.rs b/src/quantity/energy.rs index 1550163..c6c4fb8 100644 --- a/src/quantity/energy.rs +++ b/src/quantity/energy.rs @@ -107,8 +107,8 @@ mod tests { assert_eq!(s(0.0015508), "1.551 mWh"); assert_eq!(p(0.0015508, 5), "1.5508 mWh"); - assert_eq!(s(1030.04487), "1.03 kWh"); - assert_eq!(p(1030.04487, 1), "1 kWh"); + assert_eq!(s(1030.0449), "1.03 kWh"); + assert_eq!(p(1030.0449, 1), "1 kWh"); assert_eq!(s(2_030_022.0), "2.03 MWh"); assert_eq!(s(2_030_022_123.0), "2.03 GWh"); @@ -117,8 +117,8 @@ mod tests { assert_eq!(s(-1.558), "-1.558 Wh"); assert_eq!(p(-1.558, 1), "-1.6 Wh"); - assert_eq!(s(-1030.04487), "-1.03 kWh"); - assert_eq!(p(-1030.04487, 1), "-1 kWh"); + assert_eq!(s(-1030.0449), "-1.03 kWh"); + assert_eq!(p(-1030.0449, 1), "-1 kWh"); assert_eq!(s(-2_030_022.0), "-2.03 MWh"); assert_eq!(p(-2_030_022.0, 1), "-2 MWh"); diff --git a/src/quantity/frequency.rs b/src/quantity/frequency.rs index ef9e63a..d8ac6df 100644 --- a/src/quantity/frequency.rs +++ b/src/quantity/frequency.rs @@ -33,7 +33,7 @@ mod tests { let freq_2 = Frequency::from_megahertz(0.0012); assert_f32_eq(freq_2.as_hertz(), 1200.0); - let freq_2 = Frequency::from_gigahertz(0.000_0012); + let freq_2 = Frequency::from_gigahertz(0.000_001_2); assert_f32_eq(freq_2.as_hertz(), 1200.0); assert!(freq_1 < freq_2); diff --git a/src/wall_clock_timer.rs b/src/wall_clock_timer.rs new file mode 100644 index 0000000..852759f --- /dev/null +++ b/src/wall_clock_timer.rs @@ -0,0 +1,520 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! A minimal wall-clock-aligned timer that survives NTP jumps. +//! +//! Sleeps run on tokio's monotonic clock; tick times are expressed on the +//! wall clock. If the two diverge by more than one interval (detected at +//! tick entry), the timer realigns to the wall clock and the next tick +//! fires immediately with `TickInfo::resynced = true`. +//! +//! Scope: this is a deliberately minimal timer for the +//! `LogicalMeterActor` resampling loop. Compared to richer +//! implementations elsewhere, it is always epoch-aligned (no +//! caller-supplied alignment anchor), it does not track gradual +//! wall-clock drift, and it does not warn on async scheduling +//! lateness. Add those if a second caller needs them. + +use chrono::{DateTime, TimeDelta, Utc}; + +/// Abstracts wall-clock time so tests can wire it to tokio's paused clock. +/// Production uses [`SystemClock`]; test-only implementations live in +/// `crate::client::test_utils`. +pub trait Clock: Send + Sync { + fn wall_now(&self) -> DateTime; +} + +/// The real system wall clock. `wall_now()` returns `chrono::Utc::now()`. +/// +/// Not suitable for `#[tokio::test(start_paused = true)]` tests: +/// `Utc::now()` keeps advancing in real time while tokio's monotonic +/// clock is frozen, so the timer's drift detector immediately sees a +/// huge wall-vs-monotonic skew and resyncs every tick. Use +/// [`crate::client::test_utils::TokioSyncedClock`] for paused-time +/// tests instead. +pub struct SystemClock; + +impl Clock for SystemClock { + fn wall_now(&self) -> DateTime { + Utc::now() + } +} + +/// Rounds `timestamp` down to the previous multiple of `interval` since the +/// Unix epoch. Returns `timestamp` unchanged if `interval` is zero, +/// negative, or either value cannot be represented in nanoseconds. +fn align_to_epoch(timestamp: DateTime, interval: TimeDelta) -> DateTime { + let Some(interval_nanos) = interval.num_nanoseconds() else { + return timestamp; + }; + if interval_nanos <= 0 { + return timestamp; + } + let Some(ts_nanos) = timestamp + .signed_duration_since(DateTime::::UNIX_EPOCH) + .num_nanoseconds() + else { + return timestamp; + }; + let aligned = ts_nanos.div_euclid(interval_nanos) * interval_nanos; + DateTime::::UNIX_EPOCH + TimeDelta::nanoseconds(aligned) +} + +/// Information about a single timer tick. +#[derive(Debug, Clone)] +pub struct TickInfo { + /// The wall-clock time this tick was expected to fire at. On a resync + /// tick this is the pre-jump expected time. + pub expected_tick_time: DateTime, + /// `true` if a wall-clock jump was detected and the timer realigned. + /// Callers holding timestamp-sensitive state should rebuild it against + /// the timer's now-realigned `next_tick_time()`. + pub resynced: bool, +} + +/// A wall-clock-aligned periodic timer. +pub struct WallClockTimer { + interval: TimeDelta, + next_tick: DateTime, + clock: C, + last_wall: DateTime, + last_monotonic: tokio::time::Instant, +} + +impl WallClockTimer { + /// Returns `Err` if `interval` is non-positive or cannot be represented + /// as a [`std::time::Duration`]. Validating both at construction lets + /// `tick()` convert sleep durations infallibly — without this guarantee a + /// conversion failure would have to silently fall back to a zero + /// duration, busy-looping the tick loop. + pub fn try_new(interval: TimeDelta, clock: C) -> Result { + if interval <= TimeDelta::zero() { + return Err(crate::Error::invalid_config(format!( + "interval must be positive, got {interval:?}", + ))); + } + if interval.to_std().is_err() { + return Err(crate::Error::invalid_config(format!( + "interval too large for std::time::Duration: {interval:?}", + ))); + } + let now = clock.wall_now(); + let next_tick = align_to_epoch(now, interval) + interval; + Ok(Self { + interval, + next_tick, + clock, + last_wall: now, + last_monotonic: tokio::time::Instant::now(), + }) + } + + /// The wall-clock time the next tick is scheduled for. + /// + /// Note: this can move *backwards* across calls if a backward + /// wall-clock jump triggers a resync; successive return values are + /// not monotonic. Don't use them for ordering or latency math. + pub fn next_tick_time(&self) -> DateTime { + self.next_tick + } + + /// Waits until the next tick, resyncing on a detected wall-clock jump. + pub async fn tick(&mut self) -> TickInfo { + // Detect wall-clock jumps by comparing wall-clock elapsed against + // monotonic elapsed since the last observation. A real NTP jump + // shifts the wall clock without touching the monotonic clock, so + // the two diverge. A slow caller (event-loop lateness, long work + // between ticks) advances both clocks together, so they don't + // diverge — this avoids spurious resyncs that would otherwise + // rebuild resamplers and surface a phantom `None` sample whenever + // the caller takes longer than one interval to re-enter `tick()`. + // + // Strict `>`: drift of exactly 1× interval is treated as scheduling + // jitter, not a jump. Only strictly-greater drifts resync. + let threshold = self.interval; + loop { + let wall_now = self.clock.wall_now(); + let monotonic_now = tokio::time::Instant::now(); + let wall_elapsed = wall_now - self.last_wall; + let monotonic_elapsed = TimeDelta::from_std( + monotonic_now.duration_since(self.last_monotonic), + ) + .unwrap_or_else(|_| { + tracing::warn!( + "monotonic elapsed exceeds TimeDelta range (~292 years); clamping to TimeDelta::MAX, will trigger resync", + ); + TimeDelta::MAX + }); + let drift = wall_elapsed - monotonic_elapsed; + + if drift.abs() > threshold { + let expected = self.next_tick; + self.next_tick = align_to_epoch(wall_now, self.interval) + self.interval; + self.last_wall = wall_now; + self.last_monotonic = monotonic_now; + let drift_secs = drift.num_nanoseconds().unwrap_or(0) as f64 / 1e9; + tracing::warn!( + "wall clock jumped (drift={drift_secs:+.3}s); re-syncing, tick fires immediately", + ); + return TickInfo { + expected_tick_time: expected, + resynced: true, + }; + } + + let to_next = self.next_tick - wall_now; + if to_next <= TimeDelta::zero() { + let expected = self.next_tick; + self.next_tick = expected + self.interval; + self.last_wall = wall_now; + self.last_monotonic = monotonic_now; + return TickInfo { + expected_tick_time: expected, + resynced: false, + }; + } + + // `to_next` is positive (the `<= zero` branch returned) and + // bounded by ~2× `interval` — a sub-threshold backward drift + // can stretch it past one interval, but a larger drift would + // have hit the resync branch. Since `try_new` validated that + // `interval` fits in `Duration`, the conversion shouldn't + // fail; falling back to one `interval` (also known to fit) + // keeps us out of a busy-loop and lets the next iteration + // re-evaluate. + let sleep_for = to_next.to_std().unwrap_or_else(|_| { + tracing::warn!( + "to_next ({to_next:?}) does not fit in std::time::Duration; sleeping for one interval and retrying", + ); + self.interval + .to_std() + .unwrap_or(std::time::Duration::from_secs(1)) + }); + tokio::time::sleep(sleep_for).await; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // ---------- align_to_epoch ---------- + + #[test] + fn test_align_to_epoch_rounds_down() { + let ts = DateTime::from_timestamp(1_000_005, 0).unwrap(); + let aligned = align_to_epoch(ts, TimeDelta::try_seconds(10).unwrap()); + assert_eq!(aligned.timestamp(), 1_000_000); + } + + #[test] + fn test_align_to_epoch_already_aligned() { + let ts = DateTime::from_timestamp(1_000_000, 0).unwrap(); + let aligned = align_to_epoch(ts, TimeDelta::try_seconds(10).unwrap()); + assert_eq!(aligned, ts); + } + + #[test] + fn test_align_to_epoch_sub_second() { + let ts = DateTime::from_timestamp_millis(1_000_000_750).unwrap(); + let aligned = align_to_epoch(ts, TimeDelta::try_milliseconds(200).unwrap()); + assert_eq!(aligned.timestamp_millis(), 1_000_000_600); + } + + #[test] + fn test_align_to_epoch_rounds_down_for_pre_epoch() { + // Both cases exercise the negative side where truncating division + // (`/`) rounds toward zero and would give the wrong answer: + // -5 at 10s interval: `/` → 0, `div_euclid` → -10 (correct). + // -3 at 5s interval: `/` → 0, `div_euclid` → -5 (correct). + let ts = DateTime::from_timestamp(-5, 0).unwrap(); + let aligned = align_to_epoch(ts, TimeDelta::try_seconds(10).unwrap()); + assert_eq!(aligned.timestamp(), -10); + + let ts = DateTime::from_timestamp(-3, 0).unwrap(); + let aligned = align_to_epoch(ts, TimeDelta::try_seconds(5).unwrap()); + assert_eq!(aligned.timestamp(), -5); + } + + #[test] + fn test_align_to_epoch_unrepresentable_is_identity() { + // ~500 years past the epoch — beyond the ~292-year i64-nanosecond + // range, so `num_nanoseconds()` returns None and we fall back to + // the input timestamp unchanged. + let ts = DateTime::from_timestamp(500 * 365 * 86400, 0).unwrap(); + assert_eq!(align_to_epoch(ts, TimeDelta::try_seconds(1).unwrap()), ts); + } + + #[test] + fn test_align_to_epoch_zero_or_negative_interval_is_identity() { + let ts = DateTime::from_timestamp_millis(1_000_000_750).unwrap(); + assert_eq!(align_to_epoch(ts, TimeDelta::zero()), ts); + assert_eq!(align_to_epoch(ts, -TimeDelta::try_seconds(1).unwrap()), ts); + } + + // ---------- timer ---------- + + #[tokio::test(start_paused = true)] + async fn test_try_new_rejects_non_positive_interval() { + for bad in [TimeDelta::zero(), -TimeDelta::try_milliseconds(1).unwrap()] { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let err = WallClockTimer::try_new(bad, clock) + .err() + .unwrap_or_else(|| panic!("expected error for interval {bad:?}")); + assert_eq!(err.kind(), crate::ErrorKind::InvalidConfig); + } + } + + #[tokio::test(start_paused = true)] + async fn test_timer_ticks_at_interval() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_milliseconds(20).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock).unwrap(); + let t1 = timer.tick().await.expected_tick_time; + let t2 = timer.tick().await.expected_tick_time; + let t3 = timer.tick().await.expected_tick_time; + assert_eq!(t2 - t1, interval); + assert_eq!(t3 - t2, interval); + } + + #[tokio::test(start_paused = true)] + async fn test_timer_detects_forward_jump() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_milliseconds(200).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + + let first = timer.tick().await; + assert!(!first.resynced); + + clock.inject_wall_jump(TimeDelta::try_seconds(30).unwrap()); + + let after_jump = timer.tick().await; + assert!(after_jump.resynced, "expected resync on forward jump"); + } + + #[tokio::test(start_paused = true)] + async fn test_timer_detects_backward_jump() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_milliseconds(200).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + + let _ = timer.tick().await; + + clock.inject_wall_jump(-TimeDelta::try_seconds(30).unwrap()); + + let after_jump = timer.tick().await; + assert!(after_jump.resynced, "expected resync on backward jump"); + } + + #[tokio::test(start_paused = true)] + async fn test_timer_first_tick_is_aligned_to_epoch() { + // Anchor 750 ms past a whole-second boundary; with a 1 s interval the + // first scheduled tick must land on the next whole second, not + // 750 ms past one. + let anchor = DateTime::from_timestamp_millis(1_000_000_750).unwrap(); + let clock = crate::client::test_utils::TokioSyncedClock::with_wall_anchor(anchor); + let timer = WallClockTimer::try_new(TimeDelta::try_seconds(1).unwrap(), clock).unwrap(); + assert_eq!(timer.next_tick_time().timestamp_millis(), 1_000_001_000); + } + + #[tokio::test(start_paused = true)] + async fn test_timer_subthreshold_forward_drift_does_not_resync() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_seconds(1).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + let _ = timer.tick().await; + + // 500 ms < 1× interval threshold. + clock.inject_wall_jump(TimeDelta::try_milliseconds(500).unwrap()); + let info = timer.tick().await; + assert!(!info.resynced); + } + + #[tokio::test(start_paused = true)] + async fn test_timer_subthreshold_backward_drift_does_not_resync() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_seconds(1).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + let _ = timer.tick().await; + + clock.inject_wall_jump(-TimeDelta::try_milliseconds(500).unwrap()); + let info = timer.tick().await; + assert!(!info.resynced); + } + + // The threshold uses strict `>`: a jump of *exactly* one interval + // is treated as scheduling jitter and must NOT resync. These tests + // pin the boundary so a future change to `>=` would fail loudly. + #[tokio::test(start_paused = true)] + async fn test_timer_at_exact_interval_forward_drift_does_not_resync() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_seconds(1).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + let _ = timer.tick().await; + + clock.inject_wall_jump(interval); + let info = timer.tick().await; + assert!( + !info.resynced, + "drift of exactly 1× interval should be treated as jitter, not a jump", + ); + } + + #[tokio::test(start_paused = true)] + async fn test_timer_at_exact_interval_backward_drift_does_not_resync() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_seconds(1).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + let _ = timer.tick().await; + + clock.inject_wall_jump(-interval); + let info = timer.tick().await; + assert!( + !info.resynced, + "backward drift of exactly 1× interval should be treated as jitter, not a jump", + ); + } + + // The threshold is `1 × interval`: a jump just above the interval + // should resync. + #[tokio::test(start_paused = true)] + async fn test_timer_detects_forward_jump_just_over_interval() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_seconds(1).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + let _ = timer.tick().await; + + clock.inject_wall_jump(TimeDelta::try_milliseconds(1500).unwrap()); + let info = timer.tick().await; + assert!( + info.resynced, + "expected resync on forward jump > 1× interval" + ); + } + + #[tokio::test(start_paused = true)] + async fn test_timer_detects_backward_jump_just_over_interval() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_seconds(1).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + let _ = timer.tick().await; + + clock.inject_wall_jump(-TimeDelta::try_milliseconds(1500).unwrap()); + let info = timer.tick().await; + assert!( + info.resynced, + "expected resync on backward jump > 1× interval" + ); + } + + #[tokio::test(start_paused = true)] + async fn test_timer_resumes_normal_cadence_after_forward_jump() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_milliseconds(200).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + let _ = timer.tick().await; + + clock.inject_wall_jump(TimeDelta::try_seconds(30).unwrap()); + assert!(timer.tick().await.resynced); + + let a = timer.tick().await; + let b = timer.tick().await; + assert!(!a.resynced && !b.resynced); + assert_eq!(b.expected_tick_time - a.expected_tick_time, interval); + } + + #[tokio::test(start_paused = true)] + async fn test_timer_resumes_normal_cadence_after_backward_jump() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_milliseconds(200).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + let _ = timer.tick().await; + + clock.inject_wall_jump(-TimeDelta::try_seconds(30).unwrap()); + assert!(timer.tick().await.resynced); + + let a = timer.tick().await; + let b = timer.tick().await; + assert!(!a.resynced && !b.resynced); + assert_eq!(b.expected_tick_time - a.expected_tick_time, interval); + } + + #[tokio::test(start_paused = true)] + async fn test_timer_resync_expected_tick_time_is_prejump_schedule() { + // On a resync tick, `TickInfo::expected_tick_time` holds the + // scheduled pre-jump tick time (not the realigned post-jump time), + // so callers reading just this field see the tick that *should* + // have fired. + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_seconds(1).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + let _ = timer.tick().await; + + let scheduled_next = timer.next_tick_time(); + clock.inject_wall_jump(TimeDelta::try_seconds(30).unwrap()); + let info = timer.tick().await; + assert!(info.resynced); + assert_eq!(info.expected_tick_time, scheduled_next); + } + + // A caller that takes longer than one interval to re-enter `tick()` + // (e.g. event-loop lateness, slow downstream work) advances both the + // wall and monotonic clocks by the same amount. The drift detector + // must not mistake this for a wall-clock jump. + #[tokio::test(start_paused = true)] + async fn test_timer_does_not_resync_on_late_caller() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_milliseconds(200).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock).unwrap(); + let _ = timer.tick().await; + + // Burn well over one interval of monotonic time without injecting + // any wall jump — both clocks advance together. + tokio::time::sleep(std::time::Duration::from_millis(700)).await; + + let info = timer.tick().await; + assert!( + !info.resynced, + "late caller (no wall jump) must not look like a wall-clock jump", + ); + } + + // Ten consecutive sub-threshold drifts, each +750 ms on a 1 s interval, + // should all be absorbed without a single resync. The cumulative shift + // is 7.5 s, but no individual observation exceeds the threshold. + #[tokio::test(start_paused = true)] + async fn test_timer_absorbs_many_subthreshold_drifts() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_seconds(1).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + let _ = timer.tick().await; + + for _ in 0..10 { + clock.inject_wall_jump(TimeDelta::try_milliseconds(750).unwrap()); + let info = timer.tick().await; + assert!(!info.resynced, "sub-threshold drift should not resync"); + } + } + + #[tokio::test(start_paused = true)] + async fn test_timer_next_tick_time_realigns_after_jump() { + let clock = crate::client::test_utils::TokioSyncedClock::new(); + let interval = TimeDelta::try_seconds(1).unwrap(); + let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap(); + let _ = timer.tick().await; + + let before_next = timer.next_tick_time(); + clock.inject_wall_jump(TimeDelta::try_seconds(30).unwrap()); + let _ = timer.tick().await; + let after_next = timer.next_tick_time(); + + // The realigned next_tick_time should be ~30 s past where it was + // before (give or take the alignment offset within one interval). + let shift = (after_next - before_next).num_seconds(); + assert!( + (29..=31).contains(&shift), + "expected ~30s shift, got {shift}s" + ); + } +}