Make LogicalMeterActor resilient to wall-clock time jumps#33
Conversation
Adds a WallClockTimer that schedules ticks against the wall clock while
sleeping on tokio's monotonic clock. On entry to each tick the timer
compares wall-elapsed against monotonic-elapsed since the previous
observation; if they diverge by more than one interval (in either
direction) it realigns to the wall clock and returns
TickInfo { resynced: true } immediately, so callers can rebuild any
timestamp-keyed state.
Why wall-vs-monotonic, rather than just sleeping until a wall-clock
deadline: a slow caller (event-loop lateness, long downstream work)
advances both clocks by the same amount, so the difference stays
bounded and the timer doesn't mistake catch-up for a jump. A real
NTP-style jump shifts only the wall clock, so the difference exceeds
the threshold and resync fires.
Why strict `>` (not `>=`) on the threshold: a drift of exactly one
interval is treated as scheduling jitter, not a jump. Pinned by
exact-boundary tests.
Why a custom timer instead of tokio::time::interval: the latter sleeps
to a monotonic deadline, so it can't realign when the wall clock jumps
— which is exactly the failure mode this change is meant to survive.
The Clock trait plus the test-only TokioSyncedClock helper let
paused-time tests drive both telemetry timestamps and timer wall-now
from a single shared anchor, simulating a whole-machine NTP jump.
Also adds the InvalidConfig ErrorKind variant for the construction-time
validations (non-positive interval, interval > Duration::MAX).
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Replaces tokio::time::interval with the new WallClockTimer in the
resampling loop. On a resync tick:
- rebuild every inner frequenz_resampling::Resampler with its `start`
aligned one interval before the realigned current tick, draining
any buffered telemetry from the jumped-over window (the buffered
samples are timestamped on the old wall-clock frame and would
pollute the freshly-aligned resampler);
- emit a single `None` sample at the realigned tick, preserving the
every-interval cadence across the jump;
- real values resume on the next tick.
A backward jump produces a sample timestamped in the past relative to
the previous one — this is documented in the release notes; consumers
that assume monotonically increasing Sample timestamps must handle it.
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The variant has no remaining constructors: the only TimeDelta arithmetic that previously surfaced ChronoError now lives inside WallClockTimer, and resampling-interval misconfiguration is reported as InvalidConfig. This is a breaking public API change for downstream code that matches exhaustively on ErrorKind; called out in RELEASE_NOTES.md Upgrading. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
53311fa to
61fa5e0
Compare
There was a problem hiding this comment.
Pull request overview
Adds wall-clock jump resilience to logical meter resampling by introducing a wall-clock–aligned timer and resampler rebuild behavior when NTP-style jumps are detected.
Changes:
- Added an internal
WallClockTimerabstraction with pluggable wall-clockClockimplementations (system + tokio-synced test clock). - Updated
LogicalMeterActorto drive resampling viaWallClockTimerand rebuild inner resamplers on detected wall-clock jumps (emitting a singleNoneat the resync tick). - Updated mock/test utilities and release notes to support and document the new behavior.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/wall_clock_timer.rs |
New wall-clock aligned timer + unit tests for alignment and jump detection. |
src/logical_meter/logical_meter_handle.rs |
Adds try_new_with_clock() for injecting a wall clock (tests) and wires actor construction accordingly. |
src/logical_meter/logical_meter_actor.rs |
Replaces tokio interval scheduling with WallClockTimer, adds resampler rebuild/drain logic, and new jump-recovery tests. |
src/lib.rs |
Registers internal wall_clock_timer module. |
src/error.rs |
Removes ChronoError error kind (no longer needed by actor init). |
src/client/test_utils/tokio_synced_clock.rs |
Adds a tokio-paused-time-friendly wall clock for deterministic jump tests. |
src/client/test_utils.rs |
Updates mock telemetry timestamps to use the shared test clock and supports injecting whole-machine wall jumps. |
RELEASE_NOTES.md |
Documents the new wall-clock jump resilience behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
4b0e32d to
95dc8a6
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| tokio::time::sleep(to_next.to_std().unwrap_or(std::time::Duration::ZERO)).await; | ||
| slept_this_call = true; | ||
| } |
There was a problem hiding this comment.
tokio::time::sleep(to_next.to_std().unwrap_or(Duration::ZERO)) can turn a conversion failure into a zero-duration sleep, which will busy-loop the tick() loop. This can happen when to_next (and/or interval) is too large to fit into std::time::Duration (e.g., very large resampling intervals), and it will peg a core. Prefer validating the interval at construction (e.g., require interval.to_std() succeeds) and/or handle the Err case by returning an error instead of sleeping for zero.
| "resampling_interval must be positive, got {:?}", | ||
| config.resampling_interval | ||
| ))); | ||
| } |
There was a problem hiding this comment.
resampling_interval is now only validated as positive, but not that it fits into std::time::Duration. WallClockTimer::tick() ultimately calls TimeDelta::to_std() for sleeping, and if that conversion fails (interval too large) it falls back to a zero sleep, causing a tight loop. Consider rejecting intervals that can’t be represented as std::time::Duration here (return ErrorKind::InvalidConfig) so misconfiguration can’t trigger a CPU spin.
| } | |
| } | |
| if config.resampling_interval.to_std().is_err() { | |
| return Err(Error::invalid_config(format!( | |
| "resampling_interval must fit into std::time::Duration, got {:?}", | |
| config.resampling_interval | |
| ))); | |
| } |
| ErrorKind!( | ||
| (ComponentGraphError, component_graph_error), | ||
| (ComponentDataError, component_data_error), | ||
| (ConnectionFailure, connection_failure), | ||
| (ChronoError, chrono_error), | ||
| (DroppedUnusedFormulas, dropped_unused_formulas), | ||
| (FormulaEngineError, formula_engine_error), | ||
| (InvalidComponent, invalid_component), | ||
| (InvalidConfig, invalid_config), | ||
| (Internal, internal), | ||
| (APIServerError, api_server_error), | ||
| ); |
There was a problem hiding this comment.
Removing the ChronoError variant from the public ErrorKind enum is a breaking change for downstream users that pattern-match on ErrorKind. If this isn’t intended to be a semver-major change, consider keeping ChronoError (even if unused) and introducing InvalidConfig in addition, or document the breaking change prominently (e.g., in RELEASE_NOTES.md Upgrading section + version bump).
|
my understanding, which might still be wrong, is that the problem is: samples are wall clock stamped. i'm not sure if it's when they were collected or when they arrive but this is all on the same machine so there won't be much difference it might be simpler if the resampler is asked for the "last sample" which is the current wall clock time, floored to some point (e.g. whole seconds) to make sure different clients are aligned, minus the resampling interval and no clock is kept at all. the client wakes up based on the monotonic clock, asks for the last sample and it doesn't matter about shifting clocks. whilst simpler there maybe use cases this would break for of which i am unaware - i don't know if ppl try to get historical data for example, or multiple samples. this isn't the approach of the PR though. this is just for other confused readers as it took me a while to get my head around this and the problem has popped up a few times. |
Interesting. Not sure about the resampler in this repo, but on the SDK (this is based on the SDK timer AFAIK) that's not good enough because you could stop receiving data, so asking for the last sample might give you a very old sample (and the resampler will still send resampler data anyway, worse case with value = None, but we want it to keep ticking), so not very reliable for that scenario. |
it wouldn't because it would be outside of the time window. it wouldn't go backs searching for data, just things it already has within the window. |
So if you don't consider old data, how do you know if there is no data because you stopped receiving data or because you jumped into the future? How do you decide which timestamp to use to emit those samples to keep the continuous ticking? I have the feeling is not that simple, but again, I went deep into this a long time ago so I don't have it fresh. Since I was already asking AI to do a PR review, I asked it to help me consider this simple approach. The reply is long, but it seems to align with my intuition that for continuous aligned ticking it might not be that trivial. AI analysisThat proposal makes sense as a description of why time jumps break the current resampling windows, but it does not look like a complete replacement for the PR/SDK approach. Short version The proposed "just wake up on the monotonic clock and resample the latest wall-clock bucket" can work for a different API, but not cleanly for the current one, because the current logical meter needs a stream with one tick per interval, aligned to wall-clock boundaries as closely as possible, including when there is no input data. Why the simpler idea is not a drop-in replacement
If you compute Without that:
So "no clock is kept at all" is not really true once you define correct behavior.
Example:
Now what? Possible behaviors are all problematic:
So clock shifts do still matter; this proposal just postpones the policy decision.
If wall clock jumps forward by 30 seconds, or the actor is blocked for 30 seconds, then "give me the latest completed bucket" only gives you one bucket. That means you either:
The current code expects exactly one resampled output per tick. You can see that in
Those are different. With the proposal, you wake on a monotonic cadence and only then ask "what was the latest completed wall-clock bucket?". That means the sample may be produced noticeably after the bucket boundary. The current timer tries to do better: it sleeps on monotonic time but schedules ticks against wall-clock boundaries. So if the requirement is "tick every interval, as aligned to the wall clock as possible", the simpler proposal is weaker.
The current resampler is stateful. It has a start/alignment and is advanced over time. The actor rebuilds it after a jump because that internal alignment frame becomes stale. So the simpler idea would fit better with a different resampler API like:
But that is not the current design. With the current design, backward jumps and re-asking old buckets are especially awkward.
Your intuition here is correct. If no telemetry arrives for a while, the logical meter still needs to emit one output per interval, typically A "latest bucket" query can do that only if you are still waking reliably once per interval and tracking which buckets have already been emitted. Again, that brings back timer/state logic. What the proposal gets right The proposal is right about the core bug mechanism:
That part is valid. What it misses It misses that the problem is not only "which bucket is latest now?", but also:
Those are exactly the reasons the PR/SDK use a wall-clock-aligned timer plus explicit resync behavior. Bottom line The proposal is feasible only if you accept a different contract:
It is not equivalent to:
For the current logical meter semantics, the simpler proposal has real flaws, and your intuition is not wrong. |
llucax
left a comment
There was a problem hiding this comment.
OK, this is a first AI-assisted round (I asked it to compare to the SDK implementation, and did a quick check on the findings). To be honest, to do an in-depth round myself will probably take too much time, as it's been a while since I did the SDK implementation and I'm not familiar with rust. We can see if it is necessary after this round.
Some extra comments about commits:
f7e9d09(Add NTP-jump-resilient wall clock timer) andabfa4b4(Use WallClockTimer in LogicalMeterActor for time-jump resilience) are both substantial functional commits, but both have empty commit bodies. Given how much behavior changed here, a short explanation of the design tradeoffs in the commit messages would make the history much easier to follow later.56ebc5e(Remove ChronoError variant from ErrorKind) removes the publicErrorKind::ChronoErrorvariant, but the commit message does not explain why that removal is safe / intended. Since that is upgrade-significant, the rationale should ideally be visible in history too.
There is no simple way. I did say there may be hidden requirements I was unaware of - this just gets you samples from the last sample period and not much else. If we want to monitor the stream and do some stats and e.g. give reasons there were no samples in a window I would probably have that separately - but it would depend on exactly what monitoring we wanted to perform.
I'm not sure what is the continuous ticking that you want? I was just solving the problem of "samples in the last window". If there are other requirements I am unaware of them and they won't be taken into account. This probably isn't the place for this discussion so if you want to continue it we should write out the requirements and then see if there is anything simpler. On the AI comments, most assume more requirements than the last window - the ability to fetch historical windows for example. That could be added but again without clear requirements thinking about it is fruitless. I added the comment because the PR was hard to understand (for me) given I didn't understand the design, and didn't understand the reason for the design, and I thought others might be in the same boat. |
8dde103 to
1af1b40
Compare
Extracts the broadcast-receiver drain loop into a free poll_telemetry function. Both call sites (the regular resample loop and the post-jump drain in rebuild_resamplers_after_jump) shared the same Empty / Lagged / Closed handling — pulling it into one helper keeps the two paths from drifting apart. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Both `start_resamplers` and `rebuild_resamplers_after_jump` constructed their inner `frequenz_resampling::Resampler` with the same lookup chain (metric override → configured default → Average) and the same `max_age_in_intervals` clamp. Pulling that into `build_resampler` keeps the two paths consistent as `LogicalMeterConfig` evolves — important now that the rebuild path is correctness-critical for jump recovery. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Lagged means the receiver fell behind enough that the channel dropped samples — a real data-loss event, not just an internal scheduling detail. It can fire during a wall-clock jump (the server may burst samples to refill the buffer) or under sustained back-pressure; either way operators want to see it without a debug filter. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
`WallClockTimer::try_new` already rejects this internally, but
surfacing the check at the actor boundary gives a slightly more
domain-specific error message ("resampling_interval must be positive")
and pins the contract that callers see InvalidConfig — not whatever
the timer happens to report — for misconfigured intervals.
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The inner `frequenz_resampling::Resampler` takes its max-age as `i32`, so the actor previously had to clamp `config.max_age_in_intervals` (`u32`) with `.min(i32::MAX as u32) as i32`. That silently truncated user configuration without telling them their effective retention window had been altered. Validate at construction instead and surface InvalidConfig, so an out-of-range value fails loudly. The clamp in build_resampler is no longer needed and the cast becomes a plain `as i32`. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1af1b40 to
c7e476f
Compare
`new()` slept up to ~1 s of wall-clock time aligning to the next whole second so the mock's telemetry timestamps would land on the resampler's interval boundaries. The anchor is now built directly from `next_sec_secs` and handed to `TokioSyncedClock::with_wall_anchor`, so the boundary is reported at construction without waiting for real time to reach it. The sleep was incidentally syncing the mock's clock with a later `TokioSyncedClock::new()` (anchored to `Utc::now()`) that `new_logical_meter_handle` was passing to `LogicalMeterHandle`; expose the mock's clock via a new `clock()` accessor and share it with the actor so the two sides remain aligned without depending on real wall time. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Address every lint clippy reports under `--all-targets`: - `inconsistent_digit_grouping` in a frequency test (regroup the literal at the standard 3-digit boundary). - `excessive_precision` in current/energy formatting tests (truncate the f32 literals to a representable mantissa; the formatter still rounds to the asserted output). - `useless_vec` in two `bounds` test fixtures (use array literals). - `redundant_closure` in a `logical_meter_handle` test helper (pass the function reference directly). - `clone_on_copy` on `Option<Timestamp>` and a stray `into_iter()` in `test_utils`'s mock telemetry stream. - `collapsible_if` in `receive_electrical_component_telemetry_stream` (fold the two guards into a single let-chain, edition 2024). - `type_complexity` on `MockComponent::metrics` (factor out a `MockMetricRow` typedef). `cargo fmt --check` was already clean, no formatting changes. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
llucax
left a comment
There was a problem hiding this comment.
Approving from the logical part, not sure if anyone else with more rust-fu should approve too, I leave that to you to decide.
Adds a new internal
WallClockTimerthat schedules resampler ticks against thewall clock while sleeping on tokio's monotonic clock.
When the two diverge by more than one resampling interval in either direction,
the timer realigns and the actor rebuilds its inner resamplers against the new
clock frame.
Subscribers see a single
Nonesample at the resync tick (preserving theevery-interval cadence) and real values resume on the next tick — no catch-up
burst, no duplicate samples.