diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 5d90c3a..cf4d988 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,3 +10,9 @@ - `BatteryPool::telemetry_snapshots()` exposes the same per-component health partition (`BatteryPoolSnapshot`). - The pool telemetry trackers and snapshot types (`BatteryPoolSnapshot`, `PvPoolSnapshot`, `InverterBatteryGroup`, `InverterBatteryGroupStatus`) are now public and re-exported from the crate root. + +## Bug Fixes + +- The pool, group, and component telemetry trackers no longer leak their tasks (while logging at error level every tick) once their consumers are gone; normal shutdown is now logged at debug. + +- The client now evicts ended per-component telemetry streams from its cache, so a pool recreated on the same client receives telemetry again instead of silently getting none. diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index b42fea7..d2abe3a 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -82,6 +82,29 @@ impl MicrogridClientActor { } Some(StreamStatus::Ended(component_id)) => { components_to_retry.remove(&component_id); + match component_streams.get(&component_id) { + // A subscriber arrived between the stream + // task's no-receivers check and this message; + // the task is gone, so restart the stream for + // the new subscriber. + Some(tx) if tx.receiver_count() > 0 => { + let tx = tx.clone(); + start_electrical_component_telemetry_stream( + &mut self.client, + component_id, + tx, + stream_status_tx.clone(), + ) + .await; + } + // Drop the cached sender: nothing writes to it + // anymore, so handing out further subscriptions + // on it would never yield data. The next + // subscription starts a fresh stream instead. + _ => { + component_streams.remove(&component_id); + } + } } None => { tracing::error!("MicrogridClientActor: Stream status channel closed, exiting."); diff --git a/src/client/microgrid_client_handle.rs b/src/client/microgrid_client_handle.rs index b3721ef..2b68466 100644 --- a/src/client/microgrid_client_handle.rs +++ b/src/client/microgrid_client_handle.rs @@ -405,4 +405,32 @@ mod tests { ] ); } + + #[tokio::test(start_paused = true)] + async fn test_resubscribing_after_all_receivers_dropped_restarts_stream() { + let handle = new_client_handle(); + + let mut telemetry_rx = handle + .receive_electrical_component_telemetry_stream(2) + .await + .unwrap(); + telemetry_rx.recv().await.unwrap(); + drop(telemetry_rx); + + // Let the stream task notice that all receivers are gone (it checks + // before each sample) and the actor process its Ended status, which + // must evict the cached sender. + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // A fresh subscription must get a live stream again, not a + // subscription on the dead cached channel. + let mut telemetry_rx = handle + .receive_electrical_component_telemetry_stream(2) + .await + .unwrap(); + tokio::time::timeout(std::time::Duration::from_secs(10), telemetry_rx.recv()) + .await + .expect("no telemetry after resubscribing; stale stream cache?") + .unwrap(); + } } diff --git a/src/lib.rs b/src/lib.rs index 44a61dd..29d60ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,8 +37,9 @@ pub(crate) mod wall_clock_timer; mod microgrid; pub use microgrid::{ - BatteryPool, BatteryPoolSnapshot, BatteryPoolTelemetryTracker, InverterBatteryGroup, - InverterBatteryGroupStatus, Microgrid, PvPool, PvPoolSnapshot, PvPoolTelemetryTracker, + BatteryPool, BatteryPoolSnapshot, BatteryPoolTelemetryTracker, ComponentHealthPartition, + InverterBatteryGroup, InverterBatteryGroupStatus, Microgrid, PvPool, PvPoolSnapshot, + PvPoolTelemetryTracker, }; #[cfg(any(test, feature = "test-utils"))] diff --git a/src/microgrid.rs b/src/microgrid.rs index bf37427..ad6e16d 100644 --- a/src/microgrid.rs +++ b/src/microgrid.rs @@ -4,6 +4,12 @@ //! High-level interface for the Microgrid API. mod bounds_aggregation; +mod pool_bounds_tracker; +mod pool_broadcast; +mod pool_validation; + +#[cfg(test)] +mod test_support; mod battery_bounds_tracker; mod battery_pool; @@ -17,6 +23,7 @@ pub(crate) mod telemetry_tracker; pub use telemetry_tracker::battery_pool_telemetry_tracker::{ BatteryPoolSnapshot, BatteryPoolTelemetryTracker, InverterBatteryGroup, }; +pub use telemetry_tracker::component_partition::ComponentHealthPartition; pub use telemetry_tracker::inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus; pub use telemetry_tracker::pv_pool_telemetry_tracker::{PvPoolSnapshot, PvPoolTelemetryTracker}; diff --git a/src/microgrid/battery_bounds_tracker.rs b/src/microgrid/battery_bounds_tracker.rs index 179a2ef..499076d 100644 --- a/src/microgrid/battery_bounds_tracker.rs +++ b/src/microgrid/battery_bounds_tracker.rs @@ -19,91 +19,38 @@ //! aggregated bounds are intersected. //! * Groups within a pool are in parallel — their bounds are added together. -use std::marker::PhantomData; - -use tokio::sync::broadcast; - use crate::bounds::{combine_parallel_sets, intersect_bounds_sets}; use crate::client::proto::common::metrics::Bounds as PbBounds; use crate::microgrid::bounds_aggregation::aggregate_parallel; use crate::microgrid::telemetry_tracker::battery_pool_telemetry_tracker::BatteryPoolSnapshot; use crate::{Bounds, metric::Metric}; -/// Tracks and aggregates power bounds for a battery pool. +/// Aggregates the power bounds of a battery pool following the physical +/// topology of its inverter-battery groups (see the module docs). /// /// `InverterM` is the metric used to read bounds from inverters (e.g. /// `AcPowerActive`); `BatteryM` is the metric used to read bounds from /// batteries (e.g. `DcPower`). Both must share the same `QuantityType` so /// their bounds can be intersected and summed. -pub(crate) struct BatteryPoolBoundsTracker { - pool_status_rx: broadcast::Receiver, - pool_bounds_tx: broadcast::Sender>>, - _marker: PhantomData<(InverterM, BatteryM)>, -} - -impl BatteryPoolBoundsTracker +pub(crate) fn compute_pool_bounds( + status: &BatteryPoolSnapshot, +) -> Vec> where InverterM: Metric, BatteryM: Metric, Bounds: From, { - pub(crate) fn new( - pool_status_rx: broadcast::Receiver, - pool_bounds_tx: broadcast::Sender>>, - ) -> Self { - Self { - pool_status_rx, - pool_bounds_tx, - _marker: PhantomData, - } - } - - pub(crate) async fn run(mut self) { - loop { - match self.pool_status_rx.recv().await { - Ok(pool_status) => { - let bounds = Self::compute_pool_bounds(&pool_status); - if self.pool_bounds_tx.send(bounds).is_err() { - tracing::debug!( - "No receivers for {}/{} bounds tracker; shutting down.", - InverterM::str_name(), - BatteryM::str_name(), - ); - break; - } - } - Err(broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!( - "{}/{} bounds tracker lagged by {n} pool status updates.", - InverterM::str_name(), - BatteryM::str_name(), - ); - } - Err(broadcast::error::RecvError::Closed) => { - tracing::error!( - "Pool status channel closed; {}/{} bounds tracker shutting down.", - InverterM::str_name(), - BatteryM::str_name(), - ); - break; - } - } - } - } - - fn compute_pool_bounds(status: &BatteryPoolSnapshot) -> Vec> { - status - .groups() - .values() - .map(|group| { - let inverter_bounds = aggregate_parallel::(&group.healthy_inverters); - let battery_bounds = aggregate_parallel::(&group.healthy_batteries); - intersect_bounds_sets(&inverter_bounds, &battery_bounds) - }) - .fold(Vec::new(), |acc, group_bounds| { - combine_parallel_sets(&acc, &group_bounds) - }) - } + status + .groups() + .values() + .map(|group| { + let inverter_bounds = aggregate_parallel::(&group.inverters.healthy); + let battery_bounds = aggregate_parallel::(&group.batteries.healthy); + intersect_bounds_sets(&inverter_bounds, &battery_bounds) + }) + .fold(Vec::new(), |acc, group_bounds| { + combine_parallel_sets(&acc, &group_bounds) + }) } #[cfg(test)] @@ -119,30 +66,12 @@ mod tests { use crate::microgrid::telemetry_tracker::battery_pool_telemetry_tracker::{ BatteryPoolSnapshot, InverterBatteryGroup, }; + use crate::microgrid::telemetry_tracker::component_partition::ComponentHealthPartition; use crate::microgrid::telemetry_tracker::inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus; use crate::quantity::Power; - use super::BatteryPoolBoundsTracker; - - fn telem_with_power_bounds( - id: u64, - bounds: Vec<(Option, Option)>, - ) -> ElectricalComponentTelemetry { - ElectricalComponentTelemetry { - electrical_component_id: id, - metric_samples: vec![MetricSample { - sample_time: None, - metric: MetricPb::AcPowerActive as i32, - value: None, - bounds: bounds - .into_iter() - .map(|(lower, upper)| PbBounds { lower, upper }) - .collect(), - ..Default::default() - }], - ..Default::default() - } - } + use super::compute_pool_bounds; + use crate::microgrid::test_support::telem_with_power_bounds; fn group(inverter_ids: &[u64], battery_ids: &[u64]) -> InverterBatteryGroup { InverterBatteryGroup::new( @@ -177,16 +106,18 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters, - healthy_batteries, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: healthy_inverters, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: healthy_batteries, + unhealthy: HashMap::new(), + }, }, )]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert_eq!( bounds, vec![ @@ -224,16 +155,18 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters, - healthy_batteries, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: healthy_inverters, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: healthy_batteries, + unhealthy: HashMap::new(), + }, }, )]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert_eq!( bounds, vec![Bounds::new( @@ -273,26 +206,32 @@ mod tests { ( g1, InverterBatteryGroupStatus { - healthy_inverters: h_inv_1, - healthy_batteries: h_bat_1, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: h_inv_1, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: h_bat_1, + unhealthy: HashMap::new(), + }, }, ), ( g2, InverterBatteryGroupStatus { - healthy_inverters: h_inv_2, - healthy_batteries: h_bat_2, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: h_inv_2, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: h_bat_2, + unhealthy: HashMap::new(), + }, }, ), ]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert_eq!( bounds, vec![Bounds::new( @@ -305,9 +244,7 @@ mod tests { #[test] fn empty_pool_yields_empty_bounds() { let snapshot = status(vec![]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert!(bounds.is_empty()); } @@ -331,16 +268,18 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters, - healthy_batteries, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: healthy_inverters, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: healthy_batteries, + unhealthy: HashMap::new(), + }, }, )]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert!( bounds.is_empty(), "group with no inverter bounds must not contribute any bounds" @@ -365,16 +304,18 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters, - healthy_batteries, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: healthy_inverters, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: healthy_batteries, + unhealthy: HashMap::new(), + }, }, )]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert!( bounds.is_empty(), "group with no battery bounds must not contribute any bounds" @@ -400,16 +341,18 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters: HashMap::new(), - healthy_batteries, - unhealthy_inverters, - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: HashMap::new(), + unhealthy: unhealthy_inverters, + }, + batteries: ComponentHealthPartition { + healthy: healthy_batteries, + unhealthy: HashMap::new(), + }, }, )]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert!( bounds.is_empty(), "group with no healthy inverters must not contribute any bounds" @@ -434,16 +377,18 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters, - healthy_batteries: HashMap::new(), - unhealthy_inverters: HashMap::new(), - unhealthy_batteries, + inverters: ComponentHealthPartition { + healthy: healthy_inverters, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: HashMap::new(), + unhealthy: unhealthy_batteries, + }, }, )]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert!( bounds.is_empty(), "group with no healthy batteries must not contribute any bounds" @@ -479,18 +424,20 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters: h_inv, - healthy_batteries: h_bat, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: h_inv, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: h_bat, + unhealthy: HashMap::new(), + }, }, )]); // Inverter side has no active-power bounds → group produces no // bounds, so the pool bounds are empty. - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert!(bounds.is_empty()); } } diff --git a/src/microgrid/battery_pool.rs b/src/microgrid/battery_pool.rs index 41a4d68..4f2c89a 100644 --- a/src/microgrid/battery_pool.rs +++ b/src/microgrid/battery_pool.rs @@ -15,8 +15,12 @@ use crate::{ proto::common::microgrid::electrical_components::ElectricalComponentStateCode, }, metric, + metric::Metric, microgrid::{ - battery_bounds_tracker::BatteryPoolBoundsTracker, + battery_bounds_tracker, + pool_bounds_tracker::PoolBoundsTracker, + pool_broadcast::try_reuse, + pool_validation::validate_pool_ids, telemetry_tracker::battery_pool_telemetry_tracker::{ BatteryPoolSnapshot, BatteryPoolTelemetryTracker, }, @@ -48,19 +52,11 @@ impl BatteryPool { snapshot_tx: None, bounds_tx: None, }; - if let Some(ids) = &this.component_ids { - if ids.is_empty() { - let e = "component_ids cannot be an empty set".to_string(); - tracing::error!("{e}"); - return Err(Error::invalid_component(e)); - } - // Validate that all provided IDs correspond to batteries in the graph. - if !ids.is_subset(&this.get_all_battery_ids()) { - let e = format!("All component_ids {:?} must be batteries.", ids); - tracing::error!("{e}"); - return Err(Error::invalid_component(e)); - } - } + validate_pool_ids( + &this.component_ids, + &this.get_all_battery_ids(), + "batteries", + )?; Ok(this) } @@ -94,20 +90,21 @@ impl BatteryPool { /// receivers; otherwise starts a new one (which also starts or reuses the /// underlying telemetry tracker). pub fn power_bounds(&mut self) -> broadcast::Receiver>> { - if let Some(tx) = self - .bounds_tx - .as_ref() - .and_then(broadcast::WeakSender::upgrade) - && tx.receiver_count() > 0 - { - return tx.subscribe(); + if let Some(rx) = try_reuse(&self.bounds_tx) { + return rx; } let snapshot_rx = self.telemetry_snapshots(); let (tx, rx) = broadcast::channel(100); self.bounds_tx = Some(tx.downgrade()); - let tracker = BatteryPoolBoundsTracker::::new( + let tracker = PoolBoundsTracker::new( snapshot_rx, tx, + battery_bounds_tracker::compute_pool_bounds::, + format!( + "{}/{}", + metric::AcPowerActive::str_name(), + metric::DcPower::str_name() + ), ); tokio::spawn(tracker.run()); rx @@ -120,13 +117,8 @@ impl BatteryPool { /// Reuses the running tracker if one exists and still has active receivers /// (including any held by a bounds tracker); otherwise starts a new one. pub fn telemetry_snapshots(&mut self) -> broadcast::Receiver { - if let Some(tx) = self - .snapshot_tx - .as_ref() - .and_then(broadcast::WeakSender::upgrade) - && tx.receiver_count() > 0 - { - return tx.subscribe(); + if let Some(rx) = try_reuse(&self.snapshot_tx) { + return rx; } let (tx, rx) = broadcast::channel(100); self.snapshot_tx = Some(tx.downgrade()); diff --git a/src/microgrid/pool_bounds_tracker.rs b/src/microgrid/pool_bounds_tracker.rs new file mode 100644 index 0000000..9468570 --- /dev/null +++ b/src/microgrid/pool_bounds_tracker.rs @@ -0,0 +1,79 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! A generic bounds tracker for a pool of microgrid components. +//! +//! Subscribes to a pool snapshot stream and, for each update, computes a +//! pool-level set of bounds with a caller-supplied function and broadcasts it. +//! The aggregation logic (which differs per pool type — see +//! [`pv_bounds_tracker`](super::pv_bounds_tracker) and +//! [`battery_bounds_tracker`](super::battery_bounds_tracker)) is injected as a +//! plain function, so this loop is shared across pool types. + +use tokio::sync::broadcast; + +use crate::{Bounds, quantity::Quantity}; + +/// Tracks and aggregates power bounds for a pool. +/// +/// `S` is the pool snapshot type and `Q` the quantity the bounds are expressed +/// in. `compute` maps a snapshot to the aggregated pool bounds; `label` +/// identifies the tracker in log messages. +pub(crate) struct PoolBoundsTracker { + pool_status_rx: broadcast::Receiver, + pool_bounds_tx: broadcast::Sender>>, + compute: fn(&S) -> Vec>, + label: String, +} + +impl PoolBoundsTracker +where + S: Clone, + Q: Quantity, +{ + pub(crate) fn new( + pool_status_rx: broadcast::Receiver, + pool_bounds_tx: broadcast::Sender>>, + compute: fn(&S) -> Vec>, + label: String, + ) -> Self { + Self { + pool_status_rx, + pool_bounds_tx, + compute, + label, + } + } + + pub(crate) async fn run(mut self) { + loop { + match self.pool_status_rx.recv().await { + Ok(pool_status) => { + let bounds = (self.compute)(&pool_status); + if self.pool_bounds_tx.send(bounds).is_err() { + tracing::debug!( + "No receivers for {} bounds tracker; shutting down.", + self.label, + ); + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!( + "{} bounds tracker lagged by {n} pool status updates.", + self.label, + ); + } + Err(broadcast::error::RecvError::Closed) => { + // The telemetry tracker upstream has shut down — a normal + // teardown of the whole pool, not an error here. + tracing::debug!( + "Pool status channel closed; {} bounds tracker shutting down.", + self.label, + ); + break; + } + } + } + } +} diff --git a/src/microgrid/pool_broadcast.rs b/src/microgrid/pool_broadcast.rs new file mode 100644 index 0000000..fe3e3c4 --- /dev/null +++ b/src/microgrid/pool_broadcast.rs @@ -0,0 +1,25 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! Shared helper for reusing a pool's broadcast producer task. +//! +//! Both [`BatteryPool`](super::BatteryPool) and [`PvPool`](super::PvPool) hand +//! out receivers for a long-lived tracker task whose [`broadcast::Sender`] they +//! hold only as a [`broadcast::WeakSender`]. A new subscription should reuse the +//! running task while it still has live receivers, and start a fresh one +//! otherwise. + +use tokio::sync::broadcast; + +/// Returns a receiver for the broadcast referenced by `weak` if its sender is +/// still alive and has at least one receiver, signalling that the producer task +/// is still running and worth reusing. Returns `None` when the caller must +/// start a new producer. +pub(super) fn try_reuse( + weak: &Option>, +) -> Option> { + weak.as_ref() + .and_then(broadcast::WeakSender::upgrade) + .filter(|tx| tx.receiver_count() > 0) + .map(|tx| tx.subscribe()) +} diff --git a/src/microgrid/pool_validation.rs b/src/microgrid/pool_validation.rs new file mode 100644 index 0000000..4cc88a1 --- /dev/null +++ b/src/microgrid/pool_validation.rs @@ -0,0 +1,34 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! Shared validation for pool component-ID selections. + +use std::collections::BTreeSet; + +use crate::Error; + +/// Validates the `component_ids` a pool was constructed with against the set of +/// `all_matching` IDs of the right kind in the component graph. +/// +/// `Some(ids)` must be a non-empty subset of `all_matching`; `None` ("all +/// components of this kind") is accepted as-is. `noun` names the component kind +/// in error messages (e.g. `"batteries"`). +pub(super) fn validate_pool_ids( + component_ids: &Option>, + all_matching: &BTreeSet, + noun: &str, +) -> Result<(), Error> { + if let Some(ids) = component_ids { + if ids.is_empty() { + let e = "component_ids cannot be an empty set".to_string(); + tracing::error!("{e}"); + return Err(Error::invalid_component(e)); + } + if !ids.is_subset(all_matching) { + let e = format!("All component_ids {ids:?} must be {noun}."); + tracing::error!("{e}"); + return Err(Error::invalid_component(e)); + } + } + Ok(()) +} diff --git a/src/microgrid/pv_bounds_tracker.rs b/src/microgrid/pv_bounds_tracker.rs index d918107..08dc912 100644 --- a/src/microgrid/pv_bounds_tracker.rs +++ b/src/microgrid/pv_bounds_tracker.rs @@ -10,76 +10,22 @@ //! PV inverters in a pool are wired in parallel, so their bounds are simply //! added together. -use std::marker::PhantomData; - -use tokio::sync::broadcast; - use crate::client::proto::common::metrics::Bounds as PbBounds; use crate::microgrid::bounds_aggregation::aggregate_parallel; use crate::microgrid::telemetry_tracker::pv_pool_telemetry_tracker::PvPoolSnapshot; use crate::{Bounds, metric::Metric}; -/// Tracks and aggregates power bounds for a PV pool. +/// Aggregates the bounds of every healthy PV inverter in the pool. The +/// inverters are wired in parallel, so their bounds combine in parallel. /// /// `M` is the metric used to read bounds from the PV inverters (e.g. /// `AcPowerActive`). -pub(crate) struct PvPoolBoundsTracker { - pool_status_rx: broadcast::Receiver, - pool_bounds_tx: broadcast::Sender>>, - _marker: PhantomData, -} - -impl PvPoolBoundsTracker +pub(crate) fn compute_pool_bounds(status: &PvPoolSnapshot) -> Vec> where M: Metric, Bounds: From, { - pub(crate) fn new( - pool_status_rx: broadcast::Receiver, - pool_bounds_tx: broadcast::Sender>>, - ) -> Self { - Self { - pool_status_rx, - pool_bounds_tx, - _marker: PhantomData, - } - } - - pub(crate) async fn run(mut self) { - loop { - match self.pool_status_rx.recv().await { - Ok(pool_status) => { - let bounds = Self::compute_pool_bounds(&pool_status); - if self.pool_bounds_tx.send(bounds).is_err() { - tracing::debug!( - "No receivers for {} PV bounds tracker; shutting down.", - M::str_name(), - ); - break; - } - } - Err(broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!( - "{} PV bounds tracker lagged by {n} pool status updates.", - M::str_name(), - ); - } - Err(broadcast::error::RecvError::Closed) => { - tracing::error!( - "Pool status channel closed; {} PV bounds tracker shutting down.", - M::str_name(), - ); - break; - } - } - } - } - - /// Aggregates the bounds of every healthy PV inverter in the pool. The - /// inverters are wired in parallel, so their bounds combine in parallel. - fn compute_pool_bounds(status: &PvPoolSnapshot) -> Vec> { - aggregate_parallel::(&status.healthy_inverters) - } + aggregate_parallel::(&status.inverters.healthy) } #[cfg(test)] @@ -92,30 +38,12 @@ mod tests { }; use crate::client::proto::common::microgrid::electrical_components::ElectricalComponentTelemetry; use crate::metric::AcPowerActive; + use crate::microgrid::telemetry_tracker::component_partition::ComponentHealthPartition; use crate::microgrid::telemetry_tracker::pv_pool_telemetry_tracker::PvPoolSnapshot; use crate::quantity::Power; - use super::PvPoolBoundsTracker; - - fn telem_with_power_bounds( - id: u64, - bounds: Vec<(Option, Option)>, - ) -> ElectricalComponentTelemetry { - ElectricalComponentTelemetry { - electrical_component_id: id, - metric_samples: vec![MetricSample { - sample_time: None, - metric: MetricPb::AcPowerActive as i32, - value: None, - bounds: bounds - .into_iter() - .map(|(lower, upper)| PbBounds { lower, upper }) - .collect(), - ..Default::default() - }], - ..Default::default() - } - } + use super::compute_pool_bounds; + use crate::microgrid::test_support::telem_with_power_bounds; /// Builds a snapshot whose healthy set holds the given telemetry, keyed by /// component ID, and an empty unhealthy set. @@ -125,8 +53,10 @@ mod tests { .map(|t| (t.electrical_component_id, t)) .collect(); PvPoolSnapshot { - healthy_inverters: healthy, - unhealthy_inverters: HashMap::new(), + inverters: ComponentHealthPartition { + healthy, + unhealthy: HashMap::new(), + }, } } @@ -136,7 +66,7 @@ mod tests { 10, vec![(Some(-1000.0), Some(0.0))], )]); - let bounds = PvPoolBoundsTracker::::compute_pool_bounds(&snap); + let bounds = compute_pool_bounds::(&snap); assert_eq!( bounds, vec![Bounds::new( @@ -152,7 +82,7 @@ mod tests { telem_with_power_bounds(10, vec![(Some(-1000.0), Some(0.0))]), telem_with_power_bounds(11, vec![(Some(-2000.0), Some(0.0))]), ]); - let bounds = PvPoolBoundsTracker::::compute_pool_bounds(&snap); + let bounds = compute_pool_bounds::(&snap); assert_eq!( bounds, vec![Bounds::new( @@ -165,7 +95,7 @@ mod tests { #[test] fn empty_pool_yields_empty_bounds() { let snap = healthy_snapshot(vec![]); - let bounds = PvPoolBoundsTracker::::compute_pool_bounds(&snap); + let bounds = compute_pool_bounds::(&snap); assert!(bounds.is_empty()); } @@ -189,11 +119,10 @@ mod tests { )), ); let snap = PvPoolSnapshot { - healthy_inverters: healthy, - unhealthy_inverters: unhealthy, + inverters: ComponentHealthPartition { healthy, unhealthy }, }; - let bounds = PvPoolBoundsTracker::::compute_pool_bounds(&snap); + let bounds = compute_pool_bounds::(&snap); assert_eq!( bounds, vec![Bounds::new( @@ -222,7 +151,7 @@ mod tests { ..Default::default() }; let snap = healthy_snapshot(vec![other]); - let bounds = PvPoolBoundsTracker::::compute_pool_bounds(&snap); + let bounds = compute_pool_bounds::(&snap); assert!(bounds.is_empty()); } } diff --git a/src/microgrid/pv_pool.rs b/src/microgrid/pv_pool.rs index 8d59f4f..d29393d 100644 --- a/src/microgrid/pv_pool.rs +++ b/src/microgrid/pv_pool.rs @@ -21,8 +21,12 @@ use crate::{ Bounds, Error, Formula, LogicalMeterHandle, MicrogridClientHandle, client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode, metric, + metric::Metric, microgrid::{ - pv_bounds_tracker::PvPoolBoundsTracker, + pool_bounds_tracker::PoolBoundsTracker, + pool_broadcast::try_reuse, + pool_validation::validate_pool_ids, + pv_bounds_tracker, telemetry_tracker::pv_pool_telemetry_tracker::{PvPoolSnapshot, PvPoolTelemetryTracker}, }, quantity::Power, @@ -98,20 +102,11 @@ impl PvPool { snapshot_tx: None, bounds_tx: None, }; - if let Some(ids) = &this.component_ids { - if ids.is_empty() { - let e = "component_ids cannot be an empty set".to_string(); - tracing::error!("{e}"); - return Err(Error::invalid_component(e)); - } - // Validate that all provided IDs correspond to PV inverters in the - // graph. - if !ids.is_subset(&this.get_all_pv_inverter_ids()) { - let e = format!("All component_ids {:?} must be PV inverters.", ids); - tracing::error!("{e}"); - return Err(Error::invalid_component(e)); - } - } + validate_pool_ids( + &this.component_ids, + &this.get_all_pv_inverter_ids(), + "PV inverters", + )?; Ok(this) } @@ -145,18 +140,18 @@ impl PvPool { /// receivers; otherwise starts a new one (which also starts or reuses the /// underlying telemetry tracker). pub fn power_bounds(&mut self) -> broadcast::Receiver>> { - if let Some(tx) = self - .bounds_tx - .as_ref() - .and_then(broadcast::WeakSender::upgrade) - && tx.receiver_count() > 0 - { - return tx.subscribe(); + if let Some(rx) = try_reuse(&self.bounds_tx) { + return rx; } let snapshot_rx = self.telemetry_snapshots(); let (tx, rx) = broadcast::channel(100); self.bounds_tx = Some(tx.downgrade()); - let tracker = PvPoolBoundsTracker::::new(snapshot_rx, tx); + let tracker = PoolBoundsTracker::new( + snapshot_rx, + tx, + pv_bounds_tracker::compute_pool_bounds::, + format!("{} PV", metric::AcPowerActive::str_name()), + ); tokio::spawn(tracker.run()); rx } @@ -168,13 +163,8 @@ impl PvPool { /// Reuses the running tracker if one exists and still has active receivers /// (including any held by a bounds tracker); otherwise starts a new one. pub fn telemetry_snapshots(&mut self) -> broadcast::Receiver { - if let Some(tx) = self - .snapshot_tx - .as_ref() - .and_then(broadcast::WeakSender::upgrade) - && tx.receiver_count() > 0 - { - return tx.subscribe(); + if let Some(rx) = try_reuse(&self.snapshot_tx) { + return rx; } let (tx, rx) = broadcast::channel(100); self.snapshot_tx = Some(tx.downgrade()); @@ -201,26 +191,9 @@ impl PvPool { mod tests { use std::collections::BTreeSet; - use chrono::TimeDelta; - use super::PvPool; - use crate::{ - LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, - client::test_utils::{MockComponent, MockMicrogridApiClient}, - }; - - /// Builds client and logical-meter handles backed by the given mock graph. - async fn handles(graph: MockComponent) -> (MicrogridClientHandle, LogicalMeterHandle) { - let api = MockMicrogridApiClient::new(graph); - let client = MicrogridClientHandle::new_from_client(api); - let lm = LogicalMeterHandle::try_new( - client.clone(), - LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()), - ) - .await - .unwrap(); - (client, lm) - } + use crate::client::test_utils::MockComponent; + use crate::microgrid::test_support::handles; /// grid → meter → [pv meter → pv_inverter(4), pv_inverter(5)], /// [battery meter → battery_inverter(7) → battery(8)] diff --git a/src/microgrid/telemetry_tracker.rs b/src/microgrid/telemetry_tracker.rs index 62d184c..e39fd74 100644 --- a/src/microgrid/telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker.rs @@ -8,6 +8,7 @@ //! the latest telemetry sample for each. pub(crate) mod battery_pool_telemetry_tracker; +pub(crate) mod component_partition; pub(crate) mod component_telemetry_tracker; pub(crate) mod inverter_battery_group_telemetry_tracker; pub(crate) mod pv_pool_telemetry_tracker; diff --git a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs index 0e8e593..911283e 100644 --- a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs @@ -88,7 +88,11 @@ impl BatteryPoolTelemetryTracker { while let Some(battery_id) = unvisited_batteries.iter().next().cloned() { let group_inverters = graph - .predecessors(battery_id)? + .predecessors(battery_id) + .map_err(|e| { + tracing::error!("Failed to query predecessors of battery {battery_id}: {e}"); + e + })? .filter(|c| c.category() == crate::client::ElectricalComponentCategory::Inverter) .map(|c| c.id) .collect::>(); @@ -102,7 +106,13 @@ impl BatteryPoolTelemetryTracker { let mut group_batteries = BTreeSet::new(); for inverter_id in &group_inverters { let connected_batteries = graph - .successors(*inverter_id)? + .successors(*inverter_id) + .map_err(|e| { + tracing::error!( + "Failed to query successors of inverter {inverter_id}: {e}" + ); + e + })? .map(|c| c.id) .collect::>(); @@ -129,7 +139,13 @@ impl BatteryPoolTelemetryTracker { // Ensure that group batteries are only connect to group inverters for battery_id in &group_batteries { let connected_inverters = graph - .predecessors(*battery_id)? + .predecessors(*battery_id) + .map_err(|e| { + tracing::error!( + "Failed to query predecessors of battery {battery_id}: {e}" + ); + e + })? .filter(|c| { c.category() == crate::client::ElectricalComponentCategory::Inverter }) @@ -152,10 +168,13 @@ impl BatteryPoolTelemetryTracker { Ok(groups) } - pub async fn run(self) -> Result<(), Error> { + pub async fn run(self) { let mut inverter_battery_group_data = HashMap::new(); - let inverter_battery_group_ids = self.get_inverter_battery_groups()?; + // Errors are logged at source inside `get_inverter_battery_groups`. + let Ok(inverter_battery_group_ids) = self.get_inverter_battery_groups() else { + return; + }; let (component_status_tx, mut component_status_rx) = tokio::sync::mpsc::channel(100); for inverter_battery_group in inverter_battery_group_ids { @@ -185,21 +204,31 @@ impl BatteryPoolTelemetryTracker { inverter_battery_group_data.insert(group_ids, status); } // Every group tracker has exited and dropped its sender, - // so no further updates will arrive. The `interval.tick()` - // arm never disables, so the `select!` `else` can never - // run; break here instead. + // so no further updates will ever arrive. The `_ = + // interval.tick()` arm below is a catch-all that never + // disables, so the `select!` `else` branch can never run; + // break here instead. None => break, } }, _ = interval.tick() => { + // The unchanged-skip below means a stable partition never + // reaches `send()`, whose failure is otherwise the only + // signal that every receiver has dropped. Check for that + // here so the tracker still shuts down instead of leaking. + if self.component_pool_status_tx.receiver_count() == 0 { + break; + } if last_sent_status.as_ref() == Some(&inverter_battery_group_data) { continue; // Skip sending if the status hasn't changed } - if let Err(e) = self.component_pool_status_tx.send( - BatteryPoolSnapshot(inverter_battery_group_data.clone()) - ) + if self + .component_pool_status_tx + .send(BatteryPoolSnapshot(inverter_battery_group_data.clone())) + .is_err() { - tracing::error!("Failed to send pool snapshot: {}", e); + // All receivers dropped between the check above and here; + // a normal shutdown, recorded by the terminal log below. break; } last_sent_status = Some(inverter_battery_group_data.clone()); @@ -207,14 +236,12 @@ impl BatteryPoolTelemetryTracker { } } - let err = format!( - "BatteryPoolTelemetryTracker (component IDs {:?}) stopped receiving group telemetry updates.", + // Reaching here means every group tracker exited or every receiver + // dropped — a normal shutdown, not an error. + tracing::debug!( + "BatteryPoolTelemetryTracker (component IDs {:?}) stopped: all group trackers or receivers are gone.", self.component_ids ); - - tracing::error!("{}", err); - - Err(Error::component_data_error(err)) } } @@ -222,23 +249,13 @@ impl BatteryPoolTelemetryTracker { mod tests { use std::collections::HashMap; - use chrono::TimeDelta; - use super::BatteryPoolSnapshot; - use crate::{ - LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, - client::{ - proto::common::microgrid::electrical_components::ElectricalComponentStateCode, - test_utils::{MockComponent, MockMicrogridApiClient}, - }, - microgrid::{ - battery_pool::BatteryPool, - telemetry_tracker::{ - battery_pool_telemetry_tracker::InverterBatteryGroup, - inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus, - }, - }, - }; + use crate::client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode; + use crate::client::test_utils::MockComponent; + use crate::microgrid::battery_pool::BatteryPool; + use crate::microgrid::telemetry_tracker::battery_pool_telemetry_tracker::InverterBatteryGroup; + use crate::microgrid::telemetry_tracker::inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus; + use crate::microgrid::test_support::{handles, last_snapshot}; impl BatteryPoolSnapshot { pub(crate) fn from_groups( @@ -248,33 +265,10 @@ mod tests { } } async fn new_pool(graph: MockComponent) -> BatteryPool { - let api = MockMicrogridApiClient::new(graph); - let client = MicrogridClientHandle::new_from_client(api); - let lm = LogicalMeterHandle::try_new( - client.clone(), - LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()), - ) - .await - .unwrap(); + let (client, lm) = handles(graph).await; BatteryPool::try_new(None, client, lm).unwrap() } - /// Drains `rx` for up to `steps` * 100ms of simulated time, returning the - /// last snapshot seen. - async fn last_snapshot( - rx: &mut tokio::sync::broadcast::Receiver, - steps: u32, - ) -> BatteryPoolSnapshot { - let mut last = None; - for _ in 0..steps { - tokio::time::advance(std::time::Duration::from_millis(100)).await; - while let Ok(snap) = rx.try_recv() { - last = Some(snap); - } - } - last.expect("no snapshot received") - } - #[tokio::test(start_paused = true)] async fn single_group_reaches_healthy_state() { // grid → meter → battery_inverter(3) → battery(4) @@ -303,10 +297,10 @@ mod tests { let (group, status) = groups.iter().next().unwrap(); assert_eq!(group.inverter_ids, [3].into()); assert_eq!(group.battery_ids, [4].into()); - assert!(status.healthy_inverters.contains_key(&3)); - assert!(status.healthy_batteries.contains_key(&4)); - assert!(status.unhealthy_inverters.is_empty()); - assert!(status.unhealthy_batteries.is_empty()); + assert!(status.inverters.healthy.contains_key(&3)); + assert!(status.batteries.healthy.contains_key(&4)); + assert!(status.inverters.unhealthy.is_empty()); + assert!(status.batteries.unhealthy.is_empty()); } #[tokio::test(start_paused = true)] @@ -348,8 +342,8 @@ mod tests { assert_eq!(all_batteries, [4, 6].into()); for status in groups.values() { - assert!(status.unhealthy_inverters.is_empty()); - assert!(status.unhealthy_batteries.is_empty()); + assert!(status.inverters.unhealthy.is_empty()); + assert!(status.batteries.unhealthy.is_empty()); } } @@ -407,7 +401,7 @@ mod tests { let healthy = last_snapshot(&mut rx, 10).await; let (_, status) = healthy.groups().iter().next().unwrap(); assert!( - status.healthy_inverters.contains_key(&3) && status.healthy_batteries.contains_key(&4), + status.inverters.healthy.contains_key(&3) && status.batteries.healthy.contains_key(&4), "expected components to go healthy after initial samples, got {:?}", status ); @@ -420,17 +414,17 @@ mod tests { let (_, status) = unhealthy.groups().iter().next().unwrap(); assert!( - status.healthy_inverters.is_empty(), + status.inverters.healthy.is_empty(), "inverter should be unhealthy after data stops, got healthy set {:?}", - status.healthy_inverters.keys() + status.inverters.healthy.keys() ); assert!( - status.healthy_batteries.is_empty(), + status.batteries.healthy.is_empty(), "battery should be unhealthy after data stops, got healthy set {:?}", - status.healthy_batteries.keys() + status.batteries.healthy.keys() ); - assert!(status.unhealthy_inverters.contains_key(&3)); - assert!(status.unhealthy_batteries.contains_key(&4)); + assert!(status.inverters.unhealthy.contains_key(&3)); + assert!(status.batteries.unhealthy.contains_key(&4)); } #[tokio::test(start_paused = true)] @@ -455,15 +449,15 @@ mod tests { let (_, status) = snap.groups().iter().next().unwrap(); assert!( - status.healthy_inverters.contains_key(&3), + status.inverters.healthy.contains_key(&3), "inverter with Ready state should be healthy" ); assert!( - !status.healthy_batteries.contains_key(&4), + !status.batteries.healthy.contains_key(&4), "battery with Error state should not be in healthy set" ); assert!( - status.unhealthy_batteries.contains_key(&4), + status.batteries.unhealthy.contains_key(&4), "battery with Error state should be in unhealthy set, got {:?}", status ); diff --git a/src/microgrid/telemetry_tracker/component_partition.rs b/src/microgrid/telemetry_tracker/component_partition.rs new file mode 100644 index 0000000..7e9f9b0 --- /dev/null +++ b/src/microgrid/telemetry_tracker/component_partition.rs @@ -0,0 +1,38 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! A set of components partitioned by health status. + +use std::collections::HashMap; + +use crate::client::proto::common::microgrid::electrical_components::ElectricalComponentTelemetry; + +/// A set of components partitioned by health status and annotated with the +/// latest telemetry sample for each. +/// +/// `healthy` holds the most recent [`ElectricalComponentTelemetry`] observed +/// for each healthy component. `unhealthy` holds the last telemetry observed +/// before the component became unhealthy, or `None` if no sample has been +/// received yet. Consumers can use the telemetry (including per-metric bounds) +/// directly without subscribing to the raw streams again. +#[derive(Clone, Debug, Default, PartialEq)] +pub struct ComponentHealthPartition { + pub healthy: HashMap, + pub unhealthy: HashMap>, +} + +impl ComponentHealthPartition { + /// Records `data` as the latest telemetry for the now-healthy component + /// `id`, removing it from the unhealthy set. + pub(crate) fn mark_healthy(&mut self, id: u64, data: ElectricalComponentTelemetry) { + self.healthy.insert(id, data); + self.unhealthy.remove(&id); + } + + /// Records component `id` as unhealthy, carrying its last telemetry sample + /// if any, and removing it from the healthy set. + pub(crate) fn mark_unhealthy(&mut self, id: u64, data: Option) { + self.unhealthy.insert(id, data); + self.healthy.remove(&id); + } +} diff --git a/src/microgrid/telemetry_tracker/component_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/component_telemetry_tracker.rs index 3caa84f..d0954da 100644 --- a/src/microgrid/telemetry_tracker/component_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/component_telemetry_tracker.rs @@ -79,14 +79,25 @@ impl ComponentTelemetryTracker { // Reset the interval timer on receiving valid data interval.reset(); let status = self.state_from_data(data); - if let Err(e) = self.component_status_tx.send(status).await { - tracing::error!("Failed to send component status: {}", e); + if self.component_status_tx.send(status).await.is_err() { + // The pool tracker dropped its receiver; there is + // nothing left to report to, so stop tracking + // instead of looping and logging forever. + tracing::debug!( + "Component {} telemetry tracker stopping: pool tracker dropped its receiver.", + self.component_id + ); + break; } } Err(broadcast::error::RecvError::Lagged(_)) => { continue; } Err(broadcast::error::RecvError::Closed) => { + tracing::debug!( + "Component {} telemetry tracker stopping: telemetry stream closed.", + self.component_id + ); drop(self.component_status_tx); break; } @@ -95,8 +106,13 @@ impl ComponentTelemetryTracker { _ = interval.tick() => { // If we reach here, it means no data was received within the tolerance period let status = ComponentHealthStatus::Unhealthy(self.component_id, None); - if let Err(e) = self.component_status_tx.send(status).await { - tracing::error!("Failed to send component status: {}", e); + if self.component_status_tx.send(status).await.is_err() { + // The pool tracker dropped its receiver; stop tracking. + tracing::debug!( + "Component {} telemetry tracker stopping: pool tracker dropped its receiver.", + self.component_id + ); + break; } } } diff --git a/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs index 164254c..789986b 100644 --- a/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs @@ -7,21 +7,17 @@ //! components into healthy and unhealthy sets, each annotated with the //! latest telemetry sample. -use std::{ - collections::{HashMap, HashSet}, - time::Duration, -}; +use std::{collections::HashSet, time::Duration}; use tokio::select; use crate::{ - Error, MicrogridClientHandle, - client::proto::common::microgrid::electrical_components::{ - ElectricalComponentStateCode, ElectricalComponentTelemetry, - }, + MicrogridClientHandle, + client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode, microgrid::telemetry_tracker::battery_pool_telemetry_tracker::InverterBatteryGroup, }; +use super::component_partition::ComponentHealthPartition; use super::component_telemetry_tracker::{ComponentHealthStatus, ComponentTelemetryTracker}; /// A telemetry tracker for an inverter-battery group, which consists of a set @@ -44,19 +40,12 @@ pub(crate) struct InverterBatteryGroupTelemetryTracker { } /// A snapshot of an inverter-battery group's components, partitioned by health -/// status and annotated with the latest telemetry sample for each component. -/// -/// The `healthy_*` maps hold the most recent [`ElectricalComponentTelemetry`] -/// observed for each healthy component. The `unhealthy_*` maps hold the last -/// telemetry observed before the component became unhealthy, or `None` if no -/// sample has been received yet. Consumers can use the telemetry (including -/// per-metric bounds) directly without subscribing to the raw streams again. -#[derive(Clone, Debug, PartialEq)] +/// status and annotated with the latest telemetry sample for each component +/// (see [`ComponentHealthPartition`]). +#[derive(Clone, Debug, Default, PartialEq)] pub struct InverterBatteryGroupStatus { - pub healthy_inverters: HashMap, - pub healthy_batteries: HashMap, - pub unhealthy_inverters: HashMap>, - pub unhealthy_batteries: HashMap>, + pub inverters: ComponentHealthPartition, + pub batteries: ComponentHealthPartition, } impl InverterBatteryGroupTelemetryTracker { @@ -76,19 +65,26 @@ impl InverterBatteryGroupTelemetryTracker { } } - pub async fn run(self) -> Result<(), Error> { - let mut healthy_inverters = HashMap::new(); - let mut unhealthy_inverters = HashMap::new(); - let mut healthy_batteries = HashMap::new(); - let mut unhealthy_batteries = HashMap::new(); + pub async fn run(self) { + let mut inverters = ComponentHealthPartition::default(); + let mut batteries = ComponentHealthPartition::default(); let (inverter_status_tx, mut inverter_status_rx) = tokio::sync::mpsc::channel(100); for &inverter_id in &self.inverter_battery_group.inverter_ids { - let component_data_stream = self + let component_data_stream = match self .client .receive_electrical_component_telemetry_stream(inverter_id) - .await?; + .await + { + Ok(stream) => stream, + Err(e) => { + tracing::error!( + "Internal error opening telemetry stream for inverter {inverter_id}: {e}; inverter-battery group tracker aborting.", + ); + return; + } + }; let tracker = ComponentTelemetryTracker::new( inverter_id, self.missing_data_tolerance, @@ -101,16 +97,25 @@ impl InverterBatteryGroupTelemetryTracker { tracker.run().await; }); // Initially mark the component as unhealthy until we see data. - unhealthy_inverters.insert(inverter_id, None); + inverters.mark_unhealthy(inverter_id, None); } let (battery_status_tx, mut battery_status_rx) = tokio::sync::mpsc::channel(100); for &battery_id in &self.inverter_battery_group.battery_ids { - let component_data_stream = self + let component_data_stream = match self .client .receive_electrical_component_telemetry_stream(battery_id) - .await?; + .await + { + Ok(stream) => stream, + Err(e) => { + tracing::error!( + "Internal error opening telemetry stream for battery {battery_id}: {e}; inverter-battery group tracker aborting.", + ); + return; + } + }; let tracker = ComponentTelemetryTracker::new( battery_id, self.missing_data_tolerance, @@ -123,7 +128,7 @@ impl InverterBatteryGroupTelemetryTracker { tracker.run().await; }); // Initially mark the component as unhealthy until we see data. - unhealthy_batteries.insert(battery_id, None); + batteries.mark_unhealthy(battery_id, None); } // Drop the original senders in the main task to allow the component @@ -136,59 +141,61 @@ impl InverterBatteryGroupTelemetryTracker { select! { inverter_status = inverter_status_rx.recv() => { let Some(inverter_status) = inverter_status else { - let e = String::from("Inverter telemetry tracker stopped receiving status updates."); - tracing::error!("{}", e); - return Err(Error::component_data_error(e)); + // Every inverter component tracker has exited and dropped + // its sender — a normal shutdown, not an error. + tracing::debug!( + "Inverter-battery group tracker (inverters {:?}) stopping: all inverter component trackers have exited.", + self.inverter_battery_group.inverter_ids + ); + return; }; match inverter_status { ComponentHealthStatus::Healthy(component_id, data) => { - healthy_inverters.insert(component_id, data); - unhealthy_inverters.remove(&component_id); + inverters.mark_healthy(component_id, data); } ComponentHealthStatus::Unhealthy(component_id, data) => { - unhealthy_inverters.insert(component_id, data); - healthy_inverters.remove(&component_id); + inverters.mark_unhealthy(component_id, data); } } }, battery_status = battery_status_rx.recv() => { - let Some(battery_status) = battery_status else { - let e = String::from( - "Battery telemetry tracker stopped receiving status updates." + let Some(battery_status) = battery_status else { + // Every battery component tracker has exited and dropped + // its sender — a normal shutdown, not an error. + tracing::debug!( + "Inverter-battery group tracker (batteries {:?}) stopping: all battery component trackers have exited.", + self.inverter_battery_group.battery_ids ); - tracing::error!("{}", e); - return Err(Error::component_data_error(e)); + return; }; match battery_status { ComponentHealthStatus::Healthy(component_id, data) => { - healthy_batteries.insert(component_id, data); - unhealthy_batteries.remove(&component_id); + batteries.mark_healthy(component_id, data); } ComponentHealthStatus::Unhealthy(component_id, data) => { - unhealthy_batteries.insert(component_id, data); - healthy_batteries.remove(&component_id); + batteries.mark_unhealthy(component_id, data); } } } } - if let Err(e) = self + if self .status_tx .send(( self.inverter_battery_group.clone(), InverterBatteryGroupStatus { - healthy_inverters: healthy_inverters.clone(), - healthy_batteries: healthy_batteries.clone(), - unhealthy_inverters: unhealthy_inverters.clone(), - unhealthy_batteries: unhealthy_batteries.clone(), + inverters: inverters.clone(), + batteries: batteries.clone(), }, )) .await + .is_err() { - tracing::error!("Failed to send inverter-battery group status: {}", e); - return Err(Error::component_data_error(format!( - "Failed to send inverter-battery group status: {}", - e - ))); + // The pool tracker dropped its receiver — a normal shutdown. + tracing::debug!( + "Inverter-battery group tracker {:?} stopping: the pool tracker dropped its receiver.", + self.inverter_battery_group + ); + return; } } } diff --git a/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs index ba66441..68b4576 100644 --- a/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs @@ -8,33 +8,26 @@ //! sets, whenever any inverter's telemetry or health classification changes. use std::{ - collections::{BTreeSet, HashMap, HashSet}, + collections::{BTreeSet, HashSet}, time::Duration, }; use tokio::sync::{broadcast, mpsc}; use crate::{ - Error, MicrogridClientHandle, - client::proto::common::microgrid::electrical_components::{ - ElectricalComponentStateCode, ElectricalComponentTelemetry, - }, + MicrogridClientHandle, + client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode, }; +use super::component_partition::ComponentHealthPartition; use super::component_telemetry_tracker::{ComponentHealthStatus, ComponentTelemetryTracker}; /// A snapshot of a PV pool's inverters, partitioned by health status and -/// annotated with the latest telemetry sample for each. -/// -/// `healthy_inverters` holds the most recent [`ElectricalComponentTelemetry`] -/// observed for each healthy inverter. `unhealthy_inverters` holds the last -/// telemetry observed before the inverter became unhealthy, or `None` if no -/// sample has been received yet. Consumers can use the telemetry (including -/// per-metric bounds) directly without subscribing to the raw streams again. +/// annotated with the latest telemetry sample for each (see +/// [`ComponentHealthPartition`]). #[derive(Clone, Debug, PartialEq)] pub struct PvPoolSnapshot { - pub healthy_inverters: HashMap, - pub unhealthy_inverters: HashMap>, + pub inverters: ComponentHealthPartition, } /// A tracker that watches every PV inverter in the pool and emits a @@ -66,23 +59,29 @@ impl PvPoolTelemetryTracker { } } - pub async fn run(self) -> Result<(), Error> { + pub async fn run(self) { if self.component_ids.is_empty() { - let e = "No component IDs provided for PvPoolTelemetryTracker".to_string(); - tracing::error!("{}", e); - return Err(Error::component_data_error(e)); + tracing::error!("No component IDs provided for PvPoolTelemetryTracker"); + return; } - let mut healthy_inverters: HashMap = HashMap::new(); - let mut unhealthy_inverters: HashMap> = - HashMap::new(); + let mut inverters = ComponentHealthPartition::default(); let (status_tx, mut status_rx) = mpsc::channel(100); for &inverter_id in &self.component_ids { - let component_data_stream = self + let component_data_stream = match self .client .receive_electrical_component_telemetry_stream(inverter_id) - .await?; + .await + { + Ok(stream) => stream, + Err(e) => { + tracing::error!( + "Internal error opening telemetry stream for inverter {inverter_id}: {e}; PV pool telemetry tracker aborting.", + ); + return; + } + }; let tracker = ComponentTelemetryTracker::new( inverter_id, self.missing_data_tolerance, @@ -95,7 +94,7 @@ impl PvPoolTelemetryTracker { tracker.run().await; }); // Initially mark the inverter as unhealthy until we see data. - unhealthy_inverters.insert(inverter_id, None); + inverters.mark_unhealthy(inverter_id, None); } // Drop the original sender so the channel closes once every component @@ -110,35 +109,40 @@ impl PvPoolTelemetryTracker { maybe_status = status_rx.recv() => { match maybe_status { Some(ComponentHealthStatus::Healthy(id, data)) => { - healthy_inverters.insert(id, data); - unhealthy_inverters.remove(&id); + inverters.mark_healthy(id, data); } Some(ComponentHealthStatus::Unhealthy(id, data)) => { - unhealthy_inverters.insert(id, data); - healthy_inverters.remove(&id); + inverters.mark_unhealthy(id, data); } // Every component tracker has exited and dropped its - // sender, so no further updates will arrive. The - // `interval.tick()` arm never disables, so the `select!` - // `else` can never run; break here instead. + // sender, so no further updates will ever arrive. The + // `_ = interval.tick()` arm below is a catch-all that + // never disables, so the `select!` `else` branch can + // never run; break here instead. None => break, } }, _ = interval.tick() => { - // Skip sending if the partitioning hasn't changed. - let unchanged = last_sent.as_ref().is_some_and(|s| { - s.healthy_inverters == healthy_inverters - && s.unhealthy_inverters == unhealthy_inverters - }); + // The unchanged-skip below means a stable partition never + // reaches `send()`, whose failure is otherwise the only + // signal that every receiver has dropped. Check for that + // here so the tracker still shuts down instead of leaking. + if self.component_pool_status_tx.receiver_count() == 0 { + break; + } + // Skip sending if the partitioning hasn't changed. Comparing + // the whole partition (not field by field) means a future + // field can't silently escape change detection. + let unchanged = last_sent.as_ref().is_some_and(|s| s.inverters == inverters); if unchanged { continue; } let snapshot = PvPoolSnapshot { - healthy_inverters: healthy_inverters.clone(), - unhealthy_inverters: unhealthy_inverters.clone(), + inverters: inverters.clone(), }; - if let Err(e) = self.component_pool_status_tx.send(snapshot.clone()) { - tracing::error!("Failed to send PV pool snapshot: {}", e); + if self.component_pool_status_tx.send(snapshot.clone()).is_err() { + // All receivers dropped between the check above and here; + // a normal shutdown, recorded by the terminal log below. break; } last_sent = Some(snapshot); @@ -146,57 +150,27 @@ impl PvPoolTelemetryTracker { } } - let err = format!( - "PvPoolTelemetryTracker (component IDs {:?}) stopped receiving inverter telemetry updates.", + // Reaching here means every component tracker exited or every receiver + // dropped — a normal shutdown, not an error. + tracing::debug!( + "PvPoolTelemetryTracker (component IDs {:?}) stopped: all component trackers or receivers are gone.", self.component_ids ); - tracing::error!("{}", err); - Err(Error::component_data_error(err)) } } #[cfg(test)] mod tests { - use chrono::TimeDelta; - - use super::PvPoolSnapshot; - use crate::{ - LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, - client::{ - proto::common::microgrid::electrical_components::ElectricalComponentStateCode, - test_utils::{MockComponent, MockMicrogridApiClient}, - }, - microgrid::pv_pool::PvPool, - }; + use crate::client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode; + use crate::client::test_utils::MockComponent; + use crate::microgrid::pv_pool::PvPool; + use crate::microgrid::test_support::{handles, last_snapshot}; async fn new_pool(graph: MockComponent) -> PvPool { - let api = MockMicrogridApiClient::new(graph); - let client = MicrogridClientHandle::new_from_client(api); - let lm = LogicalMeterHandle::try_new( - client.clone(), - LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()), - ) - .await - .unwrap(); + let (client, lm) = handles(graph).await; PvPool::try_new(None, client, lm).unwrap() } - /// Drains `rx` for up to `steps` * 100ms of simulated time, returning the - /// last snapshot seen. - async fn last_snapshot( - rx: &mut tokio::sync::broadcast::Receiver, - steps: u32, - ) -> PvPoolSnapshot { - let mut last = None; - for _ in 0..steps { - tokio::time::advance(std::time::Duration::from_millis(100)).await; - while let Ok(snap) = rx.try_recv() { - last = Some(snap); - } - } - last.expect("no snapshot received") - } - #[tokio::test(start_paused = true)] async fn single_inverter_reaches_healthy_state() { // grid → meter → pv_inverter(3) @@ -210,8 +184,8 @@ mod tests { let mut rx = pool.telemetry_snapshots(); let snap = last_snapshot(&mut rx, 10).await; - assert!(snap.healthy_inverters.contains_key(&3)); - assert!(snap.unhealthy_inverters.is_empty()); + assert!(snap.inverters.healthy.contains_key(&3)); + assert!(snap.inverters.unhealthy.is_empty()); } #[tokio::test(start_paused = true)] @@ -228,9 +202,9 @@ mod tests { let mut rx = pool.telemetry_snapshots(); let snap = last_snapshot(&mut rx, 10).await; - assert!(snap.healthy_inverters.contains_key(&3)); - assert!(snap.healthy_inverters.contains_key(&4)); - assert!(snap.unhealthy_inverters.is_empty()); + assert!(snap.inverters.healthy.contains_key(&3)); + assert!(snap.inverters.healthy.contains_key(&4)); + assert!(snap.inverters.unhealthy.is_empty()); } #[tokio::test(start_paused = true)] @@ -274,7 +248,7 @@ mod tests { // First confirm the inverter reaches a healthy state. let healthy = last_snapshot(&mut rx, 10).await; assert!( - healthy.healthy_inverters.contains_key(&3), + healthy.inverters.healthy.contains_key(&3), "expected inverter to go healthy after initial samples, got {:?}", healthy ); @@ -285,11 +259,11 @@ mod tests { let unhealthy = last_snapshot(&mut rx, 5).await; assert!( - unhealthy.healthy_inverters.is_empty(), + unhealthy.inverters.healthy.is_empty(), "inverter should be unhealthy after data stops, got healthy set {:?}", - unhealthy.healthy_inverters.keys() + unhealthy.inverters.healthy.keys() ); - assert!(unhealthy.unhealthy_inverters.contains_key(&3)); + assert!(unhealthy.inverters.unhealthy.contains_key(&3)); } #[tokio::test(start_paused = true)] @@ -309,11 +283,11 @@ mod tests { let snap = last_snapshot(&mut rx, 10).await; assert!( - !snap.healthy_inverters.contains_key(&3), + !snap.inverters.healthy.contains_key(&3), "inverter with Error state should not be in healthy set" ); assert!( - snap.unhealthy_inverters.contains_key(&3), + snap.inverters.unhealthy.contains_key(&3), "inverter with Error state should be in unhealthy set, got {:?}", snap ); diff --git a/src/microgrid/test_support.rs b/src/microgrid/test_support.rs new file mode 100644 index 0000000..e453cd0 --- /dev/null +++ b/src/microgrid/test_support.rs @@ -0,0 +1,62 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! Shared test helpers for the pool modules. + +use chrono::TimeDelta; + +use crate::client::proto::common::metrics::{Bounds as PbBounds, Metric as MetricPb, MetricSample}; +use crate::client::proto::common::microgrid::electrical_components::ElectricalComponentTelemetry; +use crate::client::test_utils::{MockComponent, MockMicrogridApiClient}; +use crate::{LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle}; + +/// Builds an [`ElectricalComponentTelemetry`] for `id` carrying a single +/// active-power sample whose `bounds` are the given `(lower, upper)` pairs. +pub(crate) fn telem_with_power_bounds( + id: u64, + bounds: Vec<(Option, Option)>, +) -> ElectricalComponentTelemetry { + ElectricalComponentTelemetry { + electrical_component_id: id, + metric_samples: vec![MetricSample { + sample_time: None, + metric: MetricPb::AcPowerActive as i32, + value: None, + bounds: bounds + .into_iter() + .map(|(lower, upper)| PbBounds { lower, upper }) + .collect(), + ..Default::default() + }], + ..Default::default() + } +} + +/// Builds client and logical-meter handles backed by the given mock graph. +pub(crate) async fn handles(graph: MockComponent) -> (MicrogridClientHandle, LogicalMeterHandle) { + let api = MockMicrogridApiClient::new(graph); + let client = MicrogridClientHandle::new_from_client(api); + let lm = LogicalMeterHandle::try_new( + client.clone(), + LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()), + ) + .await + .unwrap(); + (client, lm) +} + +/// Drains `rx` for up to `steps` * 100ms of simulated time, returning the last +/// value seen. Panics if no value arrives. +pub(crate) async fn last_snapshot( + rx: &mut tokio::sync::broadcast::Receiver, + steps: u32, +) -> T { + let mut last = None; + for _ in 0..steps { + tokio::time::advance(std::time::Duration::from_millis(100)).await; + while let Ok(snap) = rx.try_recv() { + last = Some(snap); + } + } + last.expect("no snapshot received") +}