From 242b90d31144265a4de8e50d3b46c04c80359845 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jun 2026 12:33:24 +0000 Subject: [PATCH 01/12] Stop ComponentTelemetryTracker leaking after its receiver drops The run loop logged "Failed to send component status" on every sample and every missing-data tick once the pool tracker dropped its mpsc receiver, but never exited, leaking the task and its broadcast subscription for the life of the process (scaling with each pool recreation). Break the loop on send failure, matching the RecvError::Closed arm, so the tracker shuts down when there is nothing left to report to. Fixes #43. Signed-off-by: Sahas Subramanian --- .../component_telemetry_tracker.rs | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/microgrid/telemetry_tracker/component_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/component_telemetry_tracker.rs index 3caa84f..d0954da 100644 --- a/src/microgrid/telemetry_tracker/component_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/component_telemetry_tracker.rs @@ -79,14 +79,25 @@ impl ComponentTelemetryTracker { // Reset the interval timer on receiving valid data interval.reset(); let status = self.state_from_data(data); - if let Err(e) = self.component_status_tx.send(status).await { - tracing::error!("Failed to send component status: {}", e); + if self.component_status_tx.send(status).await.is_err() { + // The pool tracker dropped its receiver; there is + // nothing left to report to, so stop tracking + // instead of looping and logging forever. + tracing::debug!( + "Component {} telemetry tracker stopping: pool tracker dropped its receiver.", + self.component_id + ); + break; } } Err(broadcast::error::RecvError::Lagged(_)) => { continue; } Err(broadcast::error::RecvError::Closed) => { + tracing::debug!( + "Component {} telemetry tracker stopping: telemetry stream closed.", + self.component_id + ); drop(self.component_status_tx); break; } @@ -95,8 +106,13 @@ impl ComponentTelemetryTracker { _ = interval.tick() => { // If we reach here, it means no data was received within the tolerance period let status = ComponentHealthStatus::Unhealthy(self.component_id, None); - if let Err(e) = self.component_status_tx.send(status).await { - tracing::error!("Failed to send component status: {}", e); + if self.component_status_tx.send(status).await.is_err() { + // The pool tracker dropped its receiver; stop tracking. + tracing::debug!( + "Component {} telemetry tracker stopping: pool tracker dropped its receiver.", + self.component_id + ); + break; } } } From 7c68309c122de927a837dd82ee4c00dd39f66f0c Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jun 2026 12:34:29 +0000 Subject: [PATCH 02/12] Shut down pool trackers when all receivers drop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The tick arm skips `send()` whenever the partitioning is unchanged, but a failed `send()` is the only thing that tells the tracker every receiver has gone. With a stable partition (e.g. all components silent, each re-emitting the same unhealthy status) the snapshot never changes, so the skip path is taken every tick and the dropped receivers are never noticed — the tracker and its component trackers leak until process exit. Check `receiver_count()` each tick, before the unchanged-skip, and break when it reaches zero. Applies to both the PV and battery pool telemetry trackers. Signed-off-by: Sahas Subramanian --- .../telemetry_tracker/battery_pool_telemetry_tracker.rs | 7 +++++++ .../telemetry_tracker/pv_pool_telemetry_tracker.rs | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs index 0e8e593..1cc6443 100644 --- a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs @@ -192,6 +192,13 @@ impl BatteryPoolTelemetryTracker { } }, _ = interval.tick() => { + // The unchanged-skip below means a stable partition never + // reaches `send()`, whose failure is otherwise the only + // signal that every receiver has dropped. Check for that + // here so the tracker still shuts down instead of leaking. + if self.component_pool_status_tx.receiver_count() == 0 { + break; + } if last_sent_status.as_ref() == Some(&inverter_battery_group_data) { continue; // Skip sending if the status hasn't changed } diff --git a/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs index ba66441..6a93ecf 100644 --- a/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs @@ -125,6 +125,13 @@ impl PvPoolTelemetryTracker { } }, _ = interval.tick() => { + // The unchanged-skip below means a stable partition never + // reaches `send()`, whose failure is otherwise the only + // signal that every receiver has dropped. Check for that + // here so the tracker still shuts down instead of leaking. + if self.component_pool_status_tx.receiver_count() == 0 { + break; + } // Skip sending if the partitioning hasn't changed. let unchanged = last_sent.as_ref().is_some_and(|s| { s.healthy_inverters == healthy_inverters From 3c75f24df83655a55f52c67d03df0e7d97a9b6a4 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jun 2026 12:34:58 +0000 Subject: [PATCH 03/12] Stop returning Result from pool tracker run loops Both pool telemetry trackers now treat every exit from `run` as a normal shutdown: the loop ends when every component tracker has gone or every receiver has dropped, neither of which is an error. The trailing `error!` + `Err` return became a `debug!`, and the in-loop send-failure no longer logs at error. With shutdown no longer fallible, `run` has nothing left to report, so drop its `Result` return (the call sites only `tokio::spawn` it and discard the result). The remaining startup failures -- an empty component set and opening a component's telemetry stream -- now log at their source before the task exits, instead of being propagated into a discarded `JoinHandle`. Signed-off-by: Sahas Subramanian --- .../battery_pool_telemetry_tracker.rs | 56 +++++++++++++------ .../pv_pool_telemetry_tracker.rs | 42 ++++++++------ 2 files changed, 64 insertions(+), 34 deletions(-) diff --git a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs index 1cc6443..81bb7ab 100644 --- a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs @@ -88,7 +88,11 @@ impl BatteryPoolTelemetryTracker { while let Some(battery_id) = unvisited_batteries.iter().next().cloned() { let group_inverters = graph - .predecessors(battery_id)? + .predecessors(battery_id) + .map_err(|e| { + tracing::error!("Failed to query predecessors of battery {battery_id}: {e}"); + e + })? .filter(|c| c.category() == crate::client::ElectricalComponentCategory::Inverter) .map(|c| c.id) .collect::>(); @@ -102,7 +106,13 @@ impl BatteryPoolTelemetryTracker { let mut group_batteries = BTreeSet::new(); for inverter_id in &group_inverters { let connected_batteries = graph - .successors(*inverter_id)? + .successors(*inverter_id) + .map_err(|e| { + tracing::error!( + "Failed to query successors of inverter {inverter_id}: {e}" + ); + e + })? .map(|c| c.id) .collect::>(); @@ -129,7 +139,13 @@ impl BatteryPoolTelemetryTracker { // Ensure that group batteries are only connect to group inverters for battery_id in &group_batteries { let connected_inverters = graph - .predecessors(*battery_id)? + .predecessors(*battery_id) + .map_err(|e| { + tracing::error!( + "Failed to query predecessors of battery {battery_id}: {e}" + ); + e + })? .filter(|c| { c.category() == crate::client::ElectricalComponentCategory::Inverter }) @@ -152,10 +168,13 @@ impl BatteryPoolTelemetryTracker { Ok(groups) } - pub async fn run(self) -> Result<(), Error> { + pub async fn run(self) { let mut inverter_battery_group_data = HashMap::new(); - let inverter_battery_group_ids = self.get_inverter_battery_groups()?; + // Errors are logged at source inside `get_inverter_battery_groups`. + let Ok(inverter_battery_group_ids) = self.get_inverter_battery_groups() else { + return; + }; let (component_status_tx, mut component_status_rx) = tokio::sync::mpsc::channel(100); for inverter_battery_group in inverter_battery_group_ids { @@ -185,9 +204,10 @@ impl BatteryPoolTelemetryTracker { inverter_battery_group_data.insert(group_ids, status); } // Every group tracker has exited and dropped its sender, - // so no further updates will arrive. The `interval.tick()` - // arm never disables, so the `select!` `else` can never - // run; break here instead. + // so no further updates will ever arrive. The `_ = + // interval.tick()` arm below is a catch-all that never + // disables, so the `select!` `else` branch can never run; + // break here instead. None => break, } }, @@ -202,11 +222,13 @@ impl BatteryPoolTelemetryTracker { if last_sent_status.as_ref() == Some(&inverter_battery_group_data) { continue; // Skip sending if the status hasn't changed } - if let Err(e) = self.component_pool_status_tx.send( - BatteryPoolSnapshot(inverter_battery_group_data.clone()) - ) + if self + .component_pool_status_tx + .send(BatteryPoolSnapshot(inverter_battery_group_data.clone())) + .is_err() { - tracing::error!("Failed to send pool snapshot: {}", e); + // All receivers dropped between the check above and here; + // a normal shutdown, recorded by the terminal log below. break; } last_sent_status = Some(inverter_battery_group_data.clone()); @@ -214,14 +236,12 @@ impl BatteryPoolTelemetryTracker { } } - let err = format!( - "BatteryPoolTelemetryTracker (component IDs {:?}) stopped receiving group telemetry updates.", + // Reaching here means every group tracker exited or every receiver + // dropped — a normal shutdown, not an error. + tracing::debug!( + "BatteryPoolTelemetryTracker (component IDs {:?}) stopped: all group trackers or receivers are gone.", self.component_ids ); - - tracing::error!("{}", err); - - Err(Error::component_data_error(err)) } } diff --git a/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs index 6a93ecf..9ff815a 100644 --- a/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs @@ -15,7 +15,7 @@ use std::{ use tokio::sync::{broadcast, mpsc}; use crate::{ - Error, MicrogridClientHandle, + MicrogridClientHandle, client::proto::common::microgrid::electrical_components::{ ElectricalComponentStateCode, ElectricalComponentTelemetry, }, @@ -66,11 +66,10 @@ impl PvPoolTelemetryTracker { } } - pub async fn run(self) -> Result<(), Error> { + pub async fn run(self) { if self.component_ids.is_empty() { - let e = "No component IDs provided for PvPoolTelemetryTracker".to_string(); - tracing::error!("{}", e); - return Err(Error::component_data_error(e)); + tracing::error!("No component IDs provided for PvPoolTelemetryTracker"); + return; } let mut healthy_inverters: HashMap = HashMap::new(); @@ -79,10 +78,19 @@ impl PvPoolTelemetryTracker { let (status_tx, mut status_rx) = mpsc::channel(100); for &inverter_id in &self.component_ids { - let component_data_stream = self + let component_data_stream = match self .client .receive_electrical_component_telemetry_stream(inverter_id) - .await?; + .await + { + Ok(stream) => stream, + Err(e) => { + tracing::error!( + "Internal error opening telemetry stream for inverter {inverter_id}: {e}; PV pool telemetry tracker aborting.", + ); + return; + } + }; let tracker = ComponentTelemetryTracker::new( inverter_id, self.missing_data_tolerance, @@ -118,9 +126,10 @@ impl PvPoolTelemetryTracker { healthy_inverters.remove(&id); } // Every component tracker has exited and dropped its - // sender, so no further updates will arrive. The - // `interval.tick()` arm never disables, so the `select!` - // `else` can never run; break here instead. + // sender, so no further updates will ever arrive. The + // `_ = interval.tick()` arm below is a catch-all that + // never disables, so the `select!` `else` branch can + // never run; break here instead. None => break, } }, @@ -144,8 +153,9 @@ impl PvPoolTelemetryTracker { healthy_inverters: healthy_inverters.clone(), unhealthy_inverters: unhealthy_inverters.clone(), }; - if let Err(e) = self.component_pool_status_tx.send(snapshot.clone()) { - tracing::error!("Failed to send PV pool snapshot: {}", e); + if self.component_pool_status_tx.send(snapshot.clone()).is_err() { + // All receivers dropped between the check above and here; + // a normal shutdown, recorded by the terminal log below. break; } last_sent = Some(snapshot); @@ -153,12 +163,12 @@ impl PvPoolTelemetryTracker { } } - let err = format!( - "PvPoolTelemetryTracker (component IDs {:?}) stopped receiving inverter telemetry updates.", + // Reaching here means every component tracker exited or every receiver + // dropped — a normal shutdown, not an error. + tracing::debug!( + "PvPoolTelemetryTracker (component IDs {:?}) stopped: all component trackers or receivers are gone.", self.component_ids ); - tracing::error!("{}", err); - Err(Error::component_data_error(err)) } } From 951c6efc6988dd68cfbaad2486a85f480d6e44c6 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jun 2026 12:33:32 +0000 Subject: [PATCH 04/12] Treat inverter-battery group tracker shutdown as normal The group tracker logged at error and returned `Err` on three paths that are all ordinary shutdown: its inverter or battery component trackers all exiting (`recv` yields `None`), and the pool tracker dropping its receiver (the send to it fails). The last one fired on every `BatteryPool` teardown, so each group tracker spammed "Failed to send inverter-battery group status" at error level. Log these at debug and just return, matching how the pool telemetry trackers already treat their own shutdown. With shutdown no longer an error, `run` has nothing left to fail with, so drop its `Result` return; the remaining startup failures -- opening a component's telemetry stream -- now log at their source before the task exits, instead of being propagated into a discarded `JoinHandle`. Signed-off-by: Sahas Subramanian --- ...nverter_battery_group_telemetry_tracker.rs | 66 +++++++++++++------ 1 file changed, 46 insertions(+), 20 deletions(-) diff --git a/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs index 164254c..868f940 100644 --- a/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs @@ -15,7 +15,7 @@ use std::{ use tokio::select; use crate::{ - Error, MicrogridClientHandle, + MicrogridClientHandle, client::proto::common::microgrid::electrical_components::{ ElectricalComponentStateCode, ElectricalComponentTelemetry, }, @@ -76,7 +76,7 @@ impl InverterBatteryGroupTelemetryTracker { } } - pub async fn run(self) -> Result<(), Error> { + pub async fn run(self) { let mut healthy_inverters = HashMap::new(); let mut unhealthy_inverters = HashMap::new(); let mut healthy_batteries = HashMap::new(); @@ -85,10 +85,19 @@ impl InverterBatteryGroupTelemetryTracker { let (inverter_status_tx, mut inverter_status_rx) = tokio::sync::mpsc::channel(100); for &inverter_id in &self.inverter_battery_group.inverter_ids { - let component_data_stream = self + let component_data_stream = match self .client .receive_electrical_component_telemetry_stream(inverter_id) - .await?; + .await + { + Ok(stream) => stream, + Err(e) => { + tracing::error!( + "Internal error opening telemetry stream for inverter {inverter_id}: {e}; inverter-battery group tracker aborting.", + ); + return; + } + }; let tracker = ComponentTelemetryTracker::new( inverter_id, self.missing_data_tolerance, @@ -107,10 +116,19 @@ impl InverterBatteryGroupTelemetryTracker { let (battery_status_tx, mut battery_status_rx) = tokio::sync::mpsc::channel(100); for &battery_id in &self.inverter_battery_group.battery_ids { - let component_data_stream = self + let component_data_stream = match self .client .receive_electrical_component_telemetry_stream(battery_id) - .await?; + .await + { + Ok(stream) => stream, + Err(e) => { + tracing::error!( + "Internal error opening telemetry stream for battery {battery_id}: {e}; inverter-battery group tracker aborting.", + ); + return; + } + }; let tracker = ComponentTelemetryTracker::new( battery_id, self.missing_data_tolerance, @@ -136,9 +154,13 @@ impl InverterBatteryGroupTelemetryTracker { select! { inverter_status = inverter_status_rx.recv() => { let Some(inverter_status) = inverter_status else { - let e = String::from("Inverter telemetry tracker stopped receiving status updates."); - tracing::error!("{}", e); - return Err(Error::component_data_error(e)); + // Every inverter component tracker has exited and dropped + // its sender — a normal shutdown, not an error. + tracing::debug!( + "Inverter-battery group tracker (inverters {:?}) stopping: all inverter component trackers have exited.", + self.inverter_battery_group.inverter_ids + ); + return; }; match inverter_status { ComponentHealthStatus::Healthy(component_id, data) => { @@ -152,12 +174,14 @@ impl InverterBatteryGroupTelemetryTracker { } }, battery_status = battery_status_rx.recv() => { - let Some(battery_status) = battery_status else { - let e = String::from( - "Battery telemetry tracker stopped receiving status updates." + let Some(battery_status) = battery_status else { + // Every battery component tracker has exited and dropped + // its sender — a normal shutdown, not an error. + tracing::debug!( + "Inverter-battery group tracker (batteries {:?}) stopping: all battery component trackers have exited.", + self.inverter_battery_group.battery_ids ); - tracing::error!("{}", e); - return Err(Error::component_data_error(e)); + return; }; match battery_status { ComponentHealthStatus::Healthy(component_id, data) => { @@ -171,7 +195,7 @@ impl InverterBatteryGroupTelemetryTracker { } } } - if let Err(e) = self + if self .status_tx .send(( self.inverter_battery_group.clone(), @@ -183,12 +207,14 @@ impl InverterBatteryGroupTelemetryTracker { }, )) .await + .is_err() { - tracing::error!("Failed to send inverter-battery group status: {}", e); - return Err(Error::component_data_error(format!( - "Failed to send inverter-battery group status: {}", - e - ))); + // The pool tracker dropped its receiver — a normal shutdown. + tracing::debug!( + "Inverter-battery group tracker {:?} stopping: the pool tracker dropped its receiver.", + self.inverter_battery_group + ); + return; } } } From 649cbc952526f506496094fc5eb77a31bc765a17 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jun 2026 12:33:38 +0000 Subject: [PATCH 05/12] Log bounds tracker upstream-closed shutdown at debug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `RecvError::Closed` means the upstream telemetry tracker dropped its sender, which happens on a normal teardown of the pool — the bounds tracker has nothing left to aggregate. Log it at debug rather than error, matching the no-receivers path just above. Signed-off-by: Sahas Subramanian --- src/microgrid/battery_bounds_tracker.rs | 4 +++- src/microgrid/pv_bounds_tracker.rs | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/microgrid/battery_bounds_tracker.rs b/src/microgrid/battery_bounds_tracker.rs index 179a2ef..3074c96 100644 --- a/src/microgrid/battery_bounds_tracker.rs +++ b/src/microgrid/battery_bounds_tracker.rs @@ -80,7 +80,9 @@ where ); } Err(broadcast::error::RecvError::Closed) => { - tracing::error!( + // The telemetry tracker upstream has shut down — a normal + // teardown of the whole pool, not an error here. + tracing::debug!( "Pool status channel closed; {}/{} bounds tracker shutting down.", InverterM::str_name(), BatteryM::str_name(), diff --git a/src/microgrid/pv_bounds_tracker.rs b/src/microgrid/pv_bounds_tracker.rs index d918107..9d0a141 100644 --- a/src/microgrid/pv_bounds_tracker.rs +++ b/src/microgrid/pv_bounds_tracker.rs @@ -65,7 +65,9 @@ where ); } Err(broadcast::error::RecvError::Closed) => { - tracing::error!( + // The telemetry tracker upstream has shut down — a normal + // teardown of the whole pool, not an error here. + tracing::debug!( "Pool status channel closed; {} PV bounds tracker shutting down.", M::str_name(), ); From 2c246a950aa7a742af81949dde672621da1c1eac Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jun 2026 14:13:26 +0000 Subject: [PATCH 06/12] Evict ended telemetry streams from the client actor cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The per-component stream task exits once it sees no receivers and reports `StreamStatus::Ended`, but the actor only removed the component from the retry map — the now-dead `broadcast::Sender` stayed cached in `component_streams`. Any later subscription for that component was handed `tx.subscribe()` on that dead sender without a new tonic stream being started, so it never received telemetry. With the telemetry trackers now exiting cleanly instead of leaking (which used to hold receivers forever and mask this), every drop-pool/rebuild cycle hit it: the rebuilt pool saw its components as permanently silent. On `Ended`, drop the cached sender so the next subscription starts a fresh stream. If a subscriber arrived in the window between the stream task's no-receivers check and the actor processing `Ended`, the entry still has receivers — restart the stream for them instead of evicting it, since the old task is already gone either way. Fixes #42. Signed-off-by: Sahas Subramanian --- src/client/microgrid_client_actor.rs | 23 ++++++++++++++++++++++ src/client/microgrid_client_handle.rs | 28 +++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index b42fea7..d2abe3a 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -82,6 +82,29 @@ impl MicrogridClientActor { } Some(StreamStatus::Ended(component_id)) => { components_to_retry.remove(&component_id); + match component_streams.get(&component_id) { + // A subscriber arrived between the stream + // task's no-receivers check and this message; + // the task is gone, so restart the stream for + // the new subscriber. + Some(tx) if tx.receiver_count() > 0 => { + let tx = tx.clone(); + start_electrical_component_telemetry_stream( + &mut self.client, + component_id, + tx, + stream_status_tx.clone(), + ) + .await; + } + // Drop the cached sender: nothing writes to it + // anymore, so handing out further subscriptions + // on it would never yield data. The next + // subscription starts a fresh stream instead. + _ => { + component_streams.remove(&component_id); + } + } } None => { tracing::error!("MicrogridClientActor: Stream status channel closed, exiting."); diff --git a/src/client/microgrid_client_handle.rs b/src/client/microgrid_client_handle.rs index b3721ef..2b68466 100644 --- a/src/client/microgrid_client_handle.rs +++ b/src/client/microgrid_client_handle.rs @@ -405,4 +405,32 @@ mod tests { ] ); } + + #[tokio::test(start_paused = true)] + async fn test_resubscribing_after_all_receivers_dropped_restarts_stream() { + let handle = new_client_handle(); + + let mut telemetry_rx = handle + .receive_electrical_component_telemetry_stream(2) + .await + .unwrap(); + telemetry_rx.recv().await.unwrap(); + drop(telemetry_rx); + + // Let the stream task notice that all receivers are gone (it checks + // before each sample) and the actor process its Ended status, which + // must evict the cached sender. + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // A fresh subscription must get a live stream again, not a + // subscription on the dead cached channel. + let mut telemetry_rx = handle + .receive_electrical_component_telemetry_stream(2) + .await + .unwrap(); + tokio::time::timeout(std::time::Duration::from_secs(10), telemetry_rx.recv()) + .await + .expect("no telemetry after resubscribing; stale stream cache?") + .unwrap(); + } } From 606e4e1493f2ce6abce4f23d61e84f214ed8a783 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Fri, 12 Jun 2026 08:31:21 +0000 Subject: [PATCH 07/12] Update release notes Signed-off-by: Sahas Subramanian --- RELEASE_NOTES.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 5d90c3a..cf4d988 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,3 +10,9 @@ - `BatteryPool::telemetry_snapshots()` exposes the same per-component health partition (`BatteryPoolSnapshot`). - The pool telemetry trackers and snapshot types (`BatteryPoolSnapshot`, `PvPoolSnapshot`, `InverterBatteryGroup`, `InverterBatteryGroupStatus`) are now public and re-exported from the crate root. + +## Bug Fixes + +- The pool, group, and component telemetry trackers no longer leak their tasks (while logging at error level every tick) once their consumers are gone; normal shutdown is now logged at debug. + +- The client now evicts ended per-component telemetry streams from its cache, so a pool recreated on the same client receives telemetry again instead of silently getting none. From ea92d7c2459965ce76087b65435db58b0bab011a Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jun 2026 09:18:29 +0000 Subject: [PATCH 08/12] Extract a shared pool broadcast reuse helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `power_bounds` and `telemetry_snapshots` on both `BatteryPool` and `PvPool` repeated the same weak-sender reuse gate — upgrade the stored `WeakSender`, keep it only if it still has receivers, and subscribe — in four places. Pull it into a single `try_reuse` helper so the reuse policy lives in one spot. Signed-off-by: Sahas Subramanian --- src/microgrid.rs | 1 + src/microgrid/battery_pool.rs | 19 +++++-------------- src/microgrid/pool_broadcast.rs | 25 +++++++++++++++++++++++++ src/microgrid/pv_pool.rs | 19 +++++-------------- 4 files changed, 36 insertions(+), 28 deletions(-) create mode 100644 src/microgrid/pool_broadcast.rs diff --git a/src/microgrid.rs b/src/microgrid.rs index bf37427..f319e62 100644 --- a/src/microgrid.rs +++ b/src/microgrid.rs @@ -4,6 +4,7 @@ //! High-level interface for the Microgrid API. mod bounds_aggregation; +mod pool_broadcast; mod battery_bounds_tracker; mod battery_pool; diff --git a/src/microgrid/battery_pool.rs b/src/microgrid/battery_pool.rs index 41a4d68..af4bd5d 100644 --- a/src/microgrid/battery_pool.rs +++ b/src/microgrid/battery_pool.rs @@ -17,6 +17,7 @@ use crate::{ metric, microgrid::{ battery_bounds_tracker::BatteryPoolBoundsTracker, + pool_broadcast::try_reuse, telemetry_tracker::battery_pool_telemetry_tracker::{ BatteryPoolSnapshot, BatteryPoolTelemetryTracker, }, @@ -94,13 +95,8 @@ impl BatteryPool { /// receivers; otherwise starts a new one (which also starts or reuses the /// underlying telemetry tracker). pub fn power_bounds(&mut self) -> broadcast::Receiver>> { - if let Some(tx) = self - .bounds_tx - .as_ref() - .and_then(broadcast::WeakSender::upgrade) - && tx.receiver_count() > 0 - { - return tx.subscribe(); + if let Some(rx) = try_reuse(&self.bounds_tx) { + return rx; } let snapshot_rx = self.telemetry_snapshots(); let (tx, rx) = broadcast::channel(100); @@ -120,13 +116,8 @@ impl BatteryPool { /// Reuses the running tracker if one exists and still has active receivers /// (including any held by a bounds tracker); otherwise starts a new one. pub fn telemetry_snapshots(&mut self) -> broadcast::Receiver { - if let Some(tx) = self - .snapshot_tx - .as_ref() - .and_then(broadcast::WeakSender::upgrade) - && tx.receiver_count() > 0 - { - return tx.subscribe(); + if let Some(rx) = try_reuse(&self.snapshot_tx) { + return rx; } let (tx, rx) = broadcast::channel(100); self.snapshot_tx = Some(tx.downgrade()); diff --git a/src/microgrid/pool_broadcast.rs b/src/microgrid/pool_broadcast.rs new file mode 100644 index 0000000..fe3e3c4 --- /dev/null +++ b/src/microgrid/pool_broadcast.rs @@ -0,0 +1,25 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! Shared helper for reusing a pool's broadcast producer task. +//! +//! Both [`BatteryPool`](super::BatteryPool) and [`PvPool`](super::PvPool) hand +//! out receivers for a long-lived tracker task whose [`broadcast::Sender`] they +//! hold only as a [`broadcast::WeakSender`]. A new subscription should reuse the +//! running task while it still has live receivers, and start a fresh one +//! otherwise. + +use tokio::sync::broadcast; + +/// Returns a receiver for the broadcast referenced by `weak` if its sender is +/// still alive and has at least one receiver, signalling that the producer task +/// is still running and worth reusing. Returns `None` when the caller must +/// start a new producer. +pub(super) fn try_reuse( + weak: &Option>, +) -> Option> { + weak.as_ref() + .and_then(broadcast::WeakSender::upgrade) + .filter(|tx| tx.receiver_count() > 0) + .map(|tx| tx.subscribe()) +} diff --git a/src/microgrid/pv_pool.rs b/src/microgrid/pv_pool.rs index 8d59f4f..31adc1e 100644 --- a/src/microgrid/pv_pool.rs +++ b/src/microgrid/pv_pool.rs @@ -22,6 +22,7 @@ use crate::{ client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode, metric, microgrid::{ + pool_broadcast::try_reuse, pv_bounds_tracker::PvPoolBoundsTracker, telemetry_tracker::pv_pool_telemetry_tracker::{PvPoolSnapshot, PvPoolTelemetryTracker}, }, @@ -145,13 +146,8 @@ impl PvPool { /// receivers; otherwise starts a new one (which also starts or reuses the /// underlying telemetry tracker). pub fn power_bounds(&mut self) -> broadcast::Receiver>> { - if let Some(tx) = self - .bounds_tx - .as_ref() - .and_then(broadcast::WeakSender::upgrade) - && tx.receiver_count() > 0 - { - return tx.subscribe(); + if let Some(rx) = try_reuse(&self.bounds_tx) { + return rx; } let snapshot_rx = self.telemetry_snapshots(); let (tx, rx) = broadcast::channel(100); @@ -168,13 +164,8 @@ impl PvPool { /// Reuses the running tracker if one exists and still has active receivers /// (including any held by a bounds tracker); otherwise starts a new one. pub fn telemetry_snapshots(&mut self) -> broadcast::Receiver { - if let Some(tx) = self - .snapshot_tx - .as_ref() - .and_then(broadcast::WeakSender::upgrade) - && tx.receiver_count() > 0 - { - return tx.subscribe(); + if let Some(rx) = try_reuse(&self.snapshot_tx) { + return rx; } let (tx, rx) = broadcast::channel(100); self.snapshot_tx = Some(tx.downgrade()); From b03adbb791910fe546f336d83cc1a6147c7c29a1 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jun 2026 09:21:55 +0000 Subject: [PATCH 09/12] Extract a generic pool bounds tracker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `PvPoolBoundsTracker` and `BatteryPoolBoundsTracker` carried byte-for-byte identical `run` loops — receive a snapshot, compute its bounds, broadcast them, and handle lag/closure — differing only in the per-pool aggregation and the log label. Replace both with a single generic `PoolBoundsTracker` that takes the aggregation as a plain `fn(&S) -> Vec>` and a label string. The pool-specific aggregation stays in `pv_bounds_tracker` and `battery_bounds_tracker` as `compute_pool_bounds` free functions (with their existing tests), now passed into the shared tracker. Signed-off-by: Sahas Subramanian --- src/microgrid.rs | 1 + src/microgrid/battery_bounds_tracker.rs | 107 ++++++------------------ src/microgrid/battery_pool.rs | 12 ++- src/microgrid/pool_bounds_tracker.rs | 79 +++++++++++++++++ src/microgrid/pv_bounds_tracker.rs | 76 +++-------------- src/microgrid/pv_pool.rs | 11 ++- 6 files changed, 135 insertions(+), 151 deletions(-) create mode 100644 src/microgrid/pool_bounds_tracker.rs diff --git a/src/microgrid.rs b/src/microgrid.rs index f319e62..0ef5e79 100644 --- a/src/microgrid.rs +++ b/src/microgrid.rs @@ -4,6 +4,7 @@ //! High-level interface for the Microgrid API. mod bounds_aggregation; +mod pool_bounds_tracker; mod pool_broadcast; mod battery_bounds_tracker; diff --git a/src/microgrid/battery_bounds_tracker.rs b/src/microgrid/battery_bounds_tracker.rs index 3074c96..80e4b51 100644 --- a/src/microgrid/battery_bounds_tracker.rs +++ b/src/microgrid/battery_bounds_tracker.rs @@ -19,93 +19,38 @@ //! aggregated bounds are intersected. //! * Groups within a pool are in parallel — their bounds are added together. -use std::marker::PhantomData; - -use tokio::sync::broadcast; - use crate::bounds::{combine_parallel_sets, intersect_bounds_sets}; use crate::client::proto::common::metrics::Bounds as PbBounds; use crate::microgrid::bounds_aggregation::aggregate_parallel; use crate::microgrid::telemetry_tracker::battery_pool_telemetry_tracker::BatteryPoolSnapshot; use crate::{Bounds, metric::Metric}; -/// Tracks and aggregates power bounds for a battery pool. +/// Aggregates the power bounds of a battery pool following the physical +/// topology of its inverter-battery groups (see the module docs). /// /// `InverterM` is the metric used to read bounds from inverters (e.g. /// `AcPowerActive`); `BatteryM` is the metric used to read bounds from /// batteries (e.g. `DcPower`). Both must share the same `QuantityType` so /// their bounds can be intersected and summed. -pub(crate) struct BatteryPoolBoundsTracker { - pool_status_rx: broadcast::Receiver, - pool_bounds_tx: broadcast::Sender>>, - _marker: PhantomData<(InverterM, BatteryM)>, -} - -impl BatteryPoolBoundsTracker +pub(crate) fn compute_pool_bounds( + status: &BatteryPoolSnapshot, +) -> Vec> where InverterM: Metric, BatteryM: Metric, Bounds: From, { - pub(crate) fn new( - pool_status_rx: broadcast::Receiver, - pool_bounds_tx: broadcast::Sender>>, - ) -> Self { - Self { - pool_status_rx, - pool_bounds_tx, - _marker: PhantomData, - } - } - - pub(crate) async fn run(mut self) { - loop { - match self.pool_status_rx.recv().await { - Ok(pool_status) => { - let bounds = Self::compute_pool_bounds(&pool_status); - if self.pool_bounds_tx.send(bounds).is_err() { - tracing::debug!( - "No receivers for {}/{} bounds tracker; shutting down.", - InverterM::str_name(), - BatteryM::str_name(), - ); - break; - } - } - Err(broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!( - "{}/{} bounds tracker lagged by {n} pool status updates.", - InverterM::str_name(), - BatteryM::str_name(), - ); - } - Err(broadcast::error::RecvError::Closed) => { - // The telemetry tracker upstream has shut down — a normal - // teardown of the whole pool, not an error here. - tracing::debug!( - "Pool status channel closed; {}/{} bounds tracker shutting down.", - InverterM::str_name(), - BatteryM::str_name(), - ); - break; - } - } - } - } - - fn compute_pool_bounds(status: &BatteryPoolSnapshot) -> Vec> { - status - .groups() - .values() - .map(|group| { - let inverter_bounds = aggregate_parallel::(&group.healthy_inverters); - let battery_bounds = aggregate_parallel::(&group.healthy_batteries); - intersect_bounds_sets(&inverter_bounds, &battery_bounds) - }) - .fold(Vec::new(), |acc, group_bounds| { - combine_parallel_sets(&acc, &group_bounds) - }) - } + status + .groups() + .values() + .map(|group| { + let inverter_bounds = aggregate_parallel::(&group.healthy_inverters); + let battery_bounds = aggregate_parallel::(&group.healthy_batteries); + intersect_bounds_sets(&inverter_bounds, &battery_bounds) + }) + .fold(Vec::new(), |acc, group_bounds| { + combine_parallel_sets(&acc, &group_bounds) + }) } #[cfg(test)] @@ -124,7 +69,7 @@ mod tests { use crate::microgrid::telemetry_tracker::inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus; use crate::quantity::Power; - use super::BatteryPoolBoundsTracker; + use super::compute_pool_bounds; fn telem_with_power_bounds( id: u64, @@ -186,7 +131,7 @@ mod tests { }, )]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + let bounds = compute_pool_bounds::( &snapshot, ); assert_eq!( @@ -233,7 +178,7 @@ mod tests { }, )]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + let bounds = compute_pool_bounds::( &snapshot, ); assert_eq!( @@ -292,7 +237,7 @@ mod tests { ), ]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + let bounds = compute_pool_bounds::( &snapshot, ); assert_eq!( @@ -307,7 +252,7 @@ mod tests { #[test] fn empty_pool_yields_empty_bounds() { let snapshot = status(vec![]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + let bounds = compute_pool_bounds::( &snapshot, ); assert!(bounds.is_empty()); @@ -340,7 +285,7 @@ mod tests { }, )]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + let bounds = compute_pool_bounds::( &snapshot, ); assert!( @@ -374,7 +319,7 @@ mod tests { }, )]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + let bounds = compute_pool_bounds::( &snapshot, ); assert!( @@ -409,7 +354,7 @@ mod tests { }, )]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + let bounds = compute_pool_bounds::( &snapshot, ); assert!( @@ -443,7 +388,7 @@ mod tests { }, )]); - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + let bounds = compute_pool_bounds::( &snapshot, ); assert!( @@ -490,7 +435,7 @@ mod tests { // Inverter side has no active-power bounds → group produces no // bounds, so the pool bounds are empty. - let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + let bounds = compute_pool_bounds::( &snapshot, ); assert!(bounds.is_empty()); diff --git a/src/microgrid/battery_pool.rs b/src/microgrid/battery_pool.rs index af4bd5d..8acfcbd 100644 --- a/src/microgrid/battery_pool.rs +++ b/src/microgrid/battery_pool.rs @@ -15,8 +15,10 @@ use crate::{ proto::common::microgrid::electrical_components::ElectricalComponentStateCode, }, metric, + metric::Metric, microgrid::{ - battery_bounds_tracker::BatteryPoolBoundsTracker, + battery_bounds_tracker, + pool_bounds_tracker::PoolBoundsTracker, pool_broadcast::try_reuse, telemetry_tracker::battery_pool_telemetry_tracker::{ BatteryPoolSnapshot, BatteryPoolTelemetryTracker, @@ -101,9 +103,15 @@ impl BatteryPool { let snapshot_rx = self.telemetry_snapshots(); let (tx, rx) = broadcast::channel(100); self.bounds_tx = Some(tx.downgrade()); - let tracker = BatteryPoolBoundsTracker::::new( + let tracker = PoolBoundsTracker::new( snapshot_rx, tx, + battery_bounds_tracker::compute_pool_bounds::, + format!( + "{}/{}", + metric::AcPowerActive::str_name(), + metric::DcPower::str_name() + ), ); tokio::spawn(tracker.run()); rx diff --git a/src/microgrid/pool_bounds_tracker.rs b/src/microgrid/pool_bounds_tracker.rs new file mode 100644 index 0000000..9468570 --- /dev/null +++ b/src/microgrid/pool_bounds_tracker.rs @@ -0,0 +1,79 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! A generic bounds tracker for a pool of microgrid components. +//! +//! Subscribes to a pool snapshot stream and, for each update, computes a +//! pool-level set of bounds with a caller-supplied function and broadcasts it. +//! The aggregation logic (which differs per pool type — see +//! [`pv_bounds_tracker`](super::pv_bounds_tracker) and +//! [`battery_bounds_tracker`](super::battery_bounds_tracker)) is injected as a +//! plain function, so this loop is shared across pool types. + +use tokio::sync::broadcast; + +use crate::{Bounds, quantity::Quantity}; + +/// Tracks and aggregates power bounds for a pool. +/// +/// `S` is the pool snapshot type and `Q` the quantity the bounds are expressed +/// in. `compute` maps a snapshot to the aggregated pool bounds; `label` +/// identifies the tracker in log messages. +pub(crate) struct PoolBoundsTracker { + pool_status_rx: broadcast::Receiver, + pool_bounds_tx: broadcast::Sender>>, + compute: fn(&S) -> Vec>, + label: String, +} + +impl PoolBoundsTracker +where + S: Clone, + Q: Quantity, +{ + pub(crate) fn new( + pool_status_rx: broadcast::Receiver, + pool_bounds_tx: broadcast::Sender>>, + compute: fn(&S) -> Vec>, + label: String, + ) -> Self { + Self { + pool_status_rx, + pool_bounds_tx, + compute, + label, + } + } + + pub(crate) async fn run(mut self) { + loop { + match self.pool_status_rx.recv().await { + Ok(pool_status) => { + let bounds = (self.compute)(&pool_status); + if self.pool_bounds_tx.send(bounds).is_err() { + tracing::debug!( + "No receivers for {} bounds tracker; shutting down.", + self.label, + ); + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!( + "{} bounds tracker lagged by {n} pool status updates.", + self.label, + ); + } + Err(broadcast::error::RecvError::Closed) => { + // The telemetry tracker upstream has shut down — a normal + // teardown of the whole pool, not an error here. + tracing::debug!( + "Pool status channel closed; {} bounds tracker shutting down.", + self.label, + ); + break; + } + } + } + } +} diff --git a/src/microgrid/pv_bounds_tracker.rs b/src/microgrid/pv_bounds_tracker.rs index 9d0a141..d146024 100644 --- a/src/microgrid/pv_bounds_tracker.rs +++ b/src/microgrid/pv_bounds_tracker.rs @@ -10,78 +10,22 @@ //! PV inverters in a pool are wired in parallel, so their bounds are simply //! added together. -use std::marker::PhantomData; - -use tokio::sync::broadcast; - use crate::client::proto::common::metrics::Bounds as PbBounds; use crate::microgrid::bounds_aggregation::aggregate_parallel; use crate::microgrid::telemetry_tracker::pv_pool_telemetry_tracker::PvPoolSnapshot; use crate::{Bounds, metric::Metric}; -/// Tracks and aggregates power bounds for a PV pool. +/// Aggregates the bounds of every healthy PV inverter in the pool. The +/// inverters are wired in parallel, so their bounds combine in parallel. /// /// `M` is the metric used to read bounds from the PV inverters (e.g. /// `AcPowerActive`). -pub(crate) struct PvPoolBoundsTracker { - pool_status_rx: broadcast::Receiver, - pool_bounds_tx: broadcast::Sender>>, - _marker: PhantomData, -} - -impl PvPoolBoundsTracker +pub(crate) fn compute_pool_bounds(status: &PvPoolSnapshot) -> Vec> where M: Metric, Bounds: From, { - pub(crate) fn new( - pool_status_rx: broadcast::Receiver, - pool_bounds_tx: broadcast::Sender>>, - ) -> Self { - Self { - pool_status_rx, - pool_bounds_tx, - _marker: PhantomData, - } - } - - pub(crate) async fn run(mut self) { - loop { - match self.pool_status_rx.recv().await { - Ok(pool_status) => { - let bounds = Self::compute_pool_bounds(&pool_status); - if self.pool_bounds_tx.send(bounds).is_err() { - tracing::debug!( - "No receivers for {} PV bounds tracker; shutting down.", - M::str_name(), - ); - break; - } - } - Err(broadcast::error::RecvError::Lagged(n)) => { - tracing::warn!( - "{} PV bounds tracker lagged by {n} pool status updates.", - M::str_name(), - ); - } - Err(broadcast::error::RecvError::Closed) => { - // The telemetry tracker upstream has shut down — a normal - // teardown of the whole pool, not an error here. - tracing::debug!( - "Pool status channel closed; {} PV bounds tracker shutting down.", - M::str_name(), - ); - break; - } - } - } - } - - /// Aggregates the bounds of every healthy PV inverter in the pool. The - /// inverters are wired in parallel, so their bounds combine in parallel. - fn compute_pool_bounds(status: &PvPoolSnapshot) -> Vec> { - aggregate_parallel::(&status.healthy_inverters) - } + aggregate_parallel::(&status.healthy_inverters) } #[cfg(test)] @@ -97,7 +41,7 @@ mod tests { use crate::microgrid::telemetry_tracker::pv_pool_telemetry_tracker::PvPoolSnapshot; use crate::quantity::Power; - use super::PvPoolBoundsTracker; + use super::compute_pool_bounds; fn telem_with_power_bounds( id: u64, @@ -138,7 +82,7 @@ mod tests { 10, vec![(Some(-1000.0), Some(0.0))], )]); - let bounds = PvPoolBoundsTracker::::compute_pool_bounds(&snap); + let bounds = compute_pool_bounds::(&snap); assert_eq!( bounds, vec![Bounds::new( @@ -154,7 +98,7 @@ mod tests { telem_with_power_bounds(10, vec![(Some(-1000.0), Some(0.0))]), telem_with_power_bounds(11, vec![(Some(-2000.0), Some(0.0))]), ]); - let bounds = PvPoolBoundsTracker::::compute_pool_bounds(&snap); + let bounds = compute_pool_bounds::(&snap); assert_eq!( bounds, vec![Bounds::new( @@ -167,7 +111,7 @@ mod tests { #[test] fn empty_pool_yields_empty_bounds() { let snap = healthy_snapshot(vec![]); - let bounds = PvPoolBoundsTracker::::compute_pool_bounds(&snap); + let bounds = compute_pool_bounds::(&snap); assert!(bounds.is_empty()); } @@ -195,7 +139,7 @@ mod tests { unhealthy_inverters: unhealthy, }; - let bounds = PvPoolBoundsTracker::::compute_pool_bounds(&snap); + let bounds = compute_pool_bounds::(&snap); assert_eq!( bounds, vec![Bounds::new( @@ -224,7 +168,7 @@ mod tests { ..Default::default() }; let snap = healthy_snapshot(vec![other]); - let bounds = PvPoolBoundsTracker::::compute_pool_bounds(&snap); + let bounds = compute_pool_bounds::(&snap); assert!(bounds.is_empty()); } } diff --git a/src/microgrid/pv_pool.rs b/src/microgrid/pv_pool.rs index 31adc1e..b5eec20 100644 --- a/src/microgrid/pv_pool.rs +++ b/src/microgrid/pv_pool.rs @@ -21,9 +21,11 @@ use crate::{ Bounds, Error, Formula, LogicalMeterHandle, MicrogridClientHandle, client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode, metric, + metric::Metric, microgrid::{ + pool_bounds_tracker::PoolBoundsTracker, pool_broadcast::try_reuse, - pv_bounds_tracker::PvPoolBoundsTracker, + pv_bounds_tracker, telemetry_tracker::pv_pool_telemetry_tracker::{PvPoolSnapshot, PvPoolTelemetryTracker}, }, quantity::Power, @@ -152,7 +154,12 @@ impl PvPool { let snapshot_rx = self.telemetry_snapshots(); let (tx, rx) = broadcast::channel(100); self.bounds_tx = Some(tx.downgrade()); - let tracker = PvPoolBoundsTracker::::new(snapshot_rx, tx); + let tracker = PoolBoundsTracker::new( + snapshot_rx, + tx, + pv_bounds_tracker::compute_pool_bounds::, + format!("{} PV", metric::AcPowerActive::str_name()), + ); tokio::spawn(tracker.run()); rx } From c9a7bf9b26dde3604e5d9be858e842ecddd0392a Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jun 2026 14:19:59 +0000 Subject: [PATCH 10/12] Share one health-partition type across pool snapshots MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `PvPoolSnapshot` and `InverterBatteryGroupStatus` each carried their own pair(s) of `healthy`/`unhealthy` telemetry maps, and both telemetry trackers open-coded the same insert-here / remove-there bookkeeping on every status update. Introduce `ComponentHealthPartition` — a `{ healthy, unhealthy }` pair with `mark_healthy`/`mark_unhealthy` helpers — and rebuild both snapshot types on it (`PvPoolSnapshot { inverters }`, `InverterBatteryGroupStatus { inverters, batteries }`). The trackers now mutate partitions through the shared helpers, and the PV tracker's unchanged-check compares whole partitions rather than field by field, so a future field can't silently escape change detection. Signed-off-by: Sahas Subramanian --- src/lib.rs | 5 +- src/microgrid.rs | 1 + src/microgrid/battery_bounds_tracker.rs | 149 ++++++++++-------- src/microgrid/pv_bounds_tracker.rs | 12 +- src/microgrid/telemetry_tracker.rs | 1 + .../battery_pool_telemetry_tracker.rs | 32 ++-- .../telemetry_tracker/component_partition.rs | 38 +++++ ...nverter_battery_group_telemetry_tracker.rs | 55 +++---- .../pv_pool_telemetry_tracker.rs | 65 +++----- 9 files changed, 194 insertions(+), 164 deletions(-) create mode 100644 src/microgrid/telemetry_tracker/component_partition.rs diff --git a/src/lib.rs b/src/lib.rs index 44a61dd..29d60ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,8 +37,9 @@ pub(crate) mod wall_clock_timer; mod microgrid; pub use microgrid::{ - BatteryPool, BatteryPoolSnapshot, BatteryPoolTelemetryTracker, InverterBatteryGroup, - InverterBatteryGroupStatus, Microgrid, PvPool, PvPoolSnapshot, PvPoolTelemetryTracker, + BatteryPool, BatteryPoolSnapshot, BatteryPoolTelemetryTracker, ComponentHealthPartition, + InverterBatteryGroup, InverterBatteryGroupStatus, Microgrid, PvPool, PvPoolSnapshot, + PvPoolTelemetryTracker, }; #[cfg(any(test, feature = "test-utils"))] diff --git a/src/microgrid.rs b/src/microgrid.rs index 0ef5e79..615403e 100644 --- a/src/microgrid.rs +++ b/src/microgrid.rs @@ -19,6 +19,7 @@ pub(crate) mod telemetry_tracker; pub use telemetry_tracker::battery_pool_telemetry_tracker::{ BatteryPoolSnapshot, BatteryPoolTelemetryTracker, InverterBatteryGroup, }; +pub use telemetry_tracker::component_partition::ComponentHealthPartition; pub use telemetry_tracker::inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus; pub use telemetry_tracker::pv_pool_telemetry_tracker::{PvPoolSnapshot, PvPoolTelemetryTracker}; diff --git a/src/microgrid/battery_bounds_tracker.rs b/src/microgrid/battery_bounds_tracker.rs index 80e4b51..7d78960 100644 --- a/src/microgrid/battery_bounds_tracker.rs +++ b/src/microgrid/battery_bounds_tracker.rs @@ -44,8 +44,8 @@ where .groups() .values() .map(|group| { - let inverter_bounds = aggregate_parallel::(&group.healthy_inverters); - let battery_bounds = aggregate_parallel::(&group.healthy_batteries); + let inverter_bounds = aggregate_parallel::(&group.inverters.healthy); + let battery_bounds = aggregate_parallel::(&group.batteries.healthy); intersect_bounds_sets(&inverter_bounds, &battery_bounds) }) .fold(Vec::new(), |acc, group_bounds| { @@ -66,6 +66,7 @@ mod tests { use crate::microgrid::telemetry_tracker::battery_pool_telemetry_tracker::{ BatteryPoolSnapshot, InverterBatteryGroup, }; + use crate::microgrid::telemetry_tracker::component_partition::ComponentHealthPartition; use crate::microgrid::telemetry_tracker::inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus; use crate::quantity::Power; @@ -124,16 +125,18 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters, - healthy_batteries, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: healthy_inverters, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: healthy_batteries, + unhealthy: HashMap::new(), + }, }, )]); - let bounds = compute_pool_bounds::( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert_eq!( bounds, vec![ @@ -171,16 +174,18 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters, - healthy_batteries, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: healthy_inverters, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: healthy_batteries, + unhealthy: HashMap::new(), + }, }, )]); - let bounds = compute_pool_bounds::( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert_eq!( bounds, vec![Bounds::new( @@ -220,26 +225,32 @@ mod tests { ( g1, InverterBatteryGroupStatus { - healthy_inverters: h_inv_1, - healthy_batteries: h_bat_1, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: h_inv_1, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: h_bat_1, + unhealthy: HashMap::new(), + }, }, ), ( g2, InverterBatteryGroupStatus { - healthy_inverters: h_inv_2, - healthy_batteries: h_bat_2, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: h_inv_2, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: h_bat_2, + unhealthy: HashMap::new(), + }, }, ), ]); - let bounds = compute_pool_bounds::( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert_eq!( bounds, vec![Bounds::new( @@ -252,9 +263,7 @@ mod tests { #[test] fn empty_pool_yields_empty_bounds() { let snapshot = status(vec![]); - let bounds = compute_pool_bounds::( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert!(bounds.is_empty()); } @@ -278,16 +287,18 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters, - healthy_batteries, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: healthy_inverters, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: healthy_batteries, + unhealthy: HashMap::new(), + }, }, )]); - let bounds = compute_pool_bounds::( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert!( bounds.is_empty(), "group with no inverter bounds must not contribute any bounds" @@ -312,16 +323,18 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters, - healthy_batteries, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: healthy_inverters, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: healthy_batteries, + unhealthy: HashMap::new(), + }, }, )]); - let bounds = compute_pool_bounds::( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert!( bounds.is_empty(), "group with no battery bounds must not contribute any bounds" @@ -347,16 +360,18 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters: HashMap::new(), - healthy_batteries, - unhealthy_inverters, - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: HashMap::new(), + unhealthy: unhealthy_inverters, + }, + batteries: ComponentHealthPartition { + healthy: healthy_batteries, + unhealthy: HashMap::new(), + }, }, )]); - let bounds = compute_pool_bounds::( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert!( bounds.is_empty(), "group with no healthy inverters must not contribute any bounds" @@ -381,16 +396,18 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters, - healthy_batteries: HashMap::new(), - unhealthy_inverters: HashMap::new(), - unhealthy_batteries, + inverters: ComponentHealthPartition { + healthy: healthy_inverters, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: HashMap::new(), + unhealthy: unhealthy_batteries, + }, }, )]); - let bounds = compute_pool_bounds::( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert!( bounds.is_empty(), "group with no healthy batteries must not contribute any bounds" @@ -426,18 +443,20 @@ mod tests { let snapshot = status(vec![( g, InverterBatteryGroupStatus { - healthy_inverters: h_inv, - healthy_batteries: h_bat, - unhealthy_inverters: HashMap::new(), - unhealthy_batteries: HashMap::new(), + inverters: ComponentHealthPartition { + healthy: h_inv, + unhealthy: HashMap::new(), + }, + batteries: ComponentHealthPartition { + healthy: h_bat, + unhealthy: HashMap::new(), + }, }, )]); // Inverter side has no active-power bounds → group produces no // bounds, so the pool bounds are empty. - let bounds = compute_pool_bounds::( - &snapshot, - ); + let bounds = compute_pool_bounds::(&snapshot); assert!(bounds.is_empty()); } } diff --git a/src/microgrid/pv_bounds_tracker.rs b/src/microgrid/pv_bounds_tracker.rs index d146024..a32adb2 100644 --- a/src/microgrid/pv_bounds_tracker.rs +++ b/src/microgrid/pv_bounds_tracker.rs @@ -25,7 +25,7 @@ where M: Metric, Bounds: From, { - aggregate_parallel::(&status.healthy_inverters) + aggregate_parallel::(&status.inverters.healthy) } #[cfg(test)] @@ -38,6 +38,7 @@ mod tests { }; use crate::client::proto::common::microgrid::electrical_components::ElectricalComponentTelemetry; use crate::metric::AcPowerActive; + use crate::microgrid::telemetry_tracker::component_partition::ComponentHealthPartition; use crate::microgrid::telemetry_tracker::pv_pool_telemetry_tracker::PvPoolSnapshot; use crate::quantity::Power; @@ -71,8 +72,10 @@ mod tests { .map(|t| (t.electrical_component_id, t)) .collect(); PvPoolSnapshot { - healthy_inverters: healthy, - unhealthy_inverters: HashMap::new(), + inverters: ComponentHealthPartition { + healthy, + unhealthy: HashMap::new(), + }, } } @@ -135,8 +138,7 @@ mod tests { )), ); let snap = PvPoolSnapshot { - healthy_inverters: healthy, - unhealthy_inverters: unhealthy, + inverters: ComponentHealthPartition { healthy, unhealthy }, }; let bounds = compute_pool_bounds::(&snap); diff --git a/src/microgrid/telemetry_tracker.rs b/src/microgrid/telemetry_tracker.rs index 62d184c..e39fd74 100644 --- a/src/microgrid/telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker.rs @@ -8,6 +8,7 @@ //! the latest telemetry sample for each. pub(crate) mod battery_pool_telemetry_tracker; +pub(crate) mod component_partition; pub(crate) mod component_telemetry_tracker; pub(crate) mod inverter_battery_group_telemetry_tracker; pub(crate) mod pv_pool_telemetry_tracker; diff --git a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs index 81bb7ab..824c8cb 100644 --- a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs @@ -330,10 +330,10 @@ mod tests { let (group, status) = groups.iter().next().unwrap(); assert_eq!(group.inverter_ids, [3].into()); assert_eq!(group.battery_ids, [4].into()); - assert!(status.healthy_inverters.contains_key(&3)); - assert!(status.healthy_batteries.contains_key(&4)); - assert!(status.unhealthy_inverters.is_empty()); - assert!(status.unhealthy_batteries.is_empty()); + assert!(status.inverters.healthy.contains_key(&3)); + assert!(status.batteries.healthy.contains_key(&4)); + assert!(status.inverters.unhealthy.is_empty()); + assert!(status.batteries.unhealthy.is_empty()); } #[tokio::test(start_paused = true)] @@ -375,8 +375,8 @@ mod tests { assert_eq!(all_batteries, [4, 6].into()); for status in groups.values() { - assert!(status.unhealthy_inverters.is_empty()); - assert!(status.unhealthy_batteries.is_empty()); + assert!(status.inverters.unhealthy.is_empty()); + assert!(status.batteries.unhealthy.is_empty()); } } @@ -434,7 +434,7 @@ mod tests { let healthy = last_snapshot(&mut rx, 10).await; let (_, status) = healthy.groups().iter().next().unwrap(); assert!( - status.healthy_inverters.contains_key(&3) && status.healthy_batteries.contains_key(&4), + status.inverters.healthy.contains_key(&3) && status.batteries.healthy.contains_key(&4), "expected components to go healthy after initial samples, got {:?}", status ); @@ -447,17 +447,17 @@ mod tests { let (_, status) = unhealthy.groups().iter().next().unwrap(); assert!( - status.healthy_inverters.is_empty(), + status.inverters.healthy.is_empty(), "inverter should be unhealthy after data stops, got healthy set {:?}", - status.healthy_inverters.keys() + status.inverters.healthy.keys() ); assert!( - status.healthy_batteries.is_empty(), + status.batteries.healthy.is_empty(), "battery should be unhealthy after data stops, got healthy set {:?}", - status.healthy_batteries.keys() + status.batteries.healthy.keys() ); - assert!(status.unhealthy_inverters.contains_key(&3)); - assert!(status.unhealthy_batteries.contains_key(&4)); + assert!(status.inverters.unhealthy.contains_key(&3)); + assert!(status.batteries.unhealthy.contains_key(&4)); } #[tokio::test(start_paused = true)] @@ -482,15 +482,15 @@ mod tests { let (_, status) = snap.groups().iter().next().unwrap(); assert!( - status.healthy_inverters.contains_key(&3), + status.inverters.healthy.contains_key(&3), "inverter with Ready state should be healthy" ); assert!( - !status.healthy_batteries.contains_key(&4), + !status.batteries.healthy.contains_key(&4), "battery with Error state should not be in healthy set" ); assert!( - status.unhealthy_batteries.contains_key(&4), + status.batteries.unhealthy.contains_key(&4), "battery with Error state should be in unhealthy set, got {:?}", status ); diff --git a/src/microgrid/telemetry_tracker/component_partition.rs b/src/microgrid/telemetry_tracker/component_partition.rs new file mode 100644 index 0000000..7e9f9b0 --- /dev/null +++ b/src/microgrid/telemetry_tracker/component_partition.rs @@ -0,0 +1,38 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! A set of components partitioned by health status. + +use std::collections::HashMap; + +use crate::client::proto::common::microgrid::electrical_components::ElectricalComponentTelemetry; + +/// A set of components partitioned by health status and annotated with the +/// latest telemetry sample for each. +/// +/// `healthy` holds the most recent [`ElectricalComponentTelemetry`] observed +/// for each healthy component. `unhealthy` holds the last telemetry observed +/// before the component became unhealthy, or `None` if no sample has been +/// received yet. Consumers can use the telemetry (including per-metric bounds) +/// directly without subscribing to the raw streams again. +#[derive(Clone, Debug, Default, PartialEq)] +pub struct ComponentHealthPartition { + pub healthy: HashMap, + pub unhealthy: HashMap>, +} + +impl ComponentHealthPartition { + /// Records `data` as the latest telemetry for the now-healthy component + /// `id`, removing it from the unhealthy set. + pub(crate) fn mark_healthy(&mut self, id: u64, data: ElectricalComponentTelemetry) { + self.healthy.insert(id, data); + self.unhealthy.remove(&id); + } + + /// Records component `id` as unhealthy, carrying its last telemetry sample + /// if any, and removing it from the healthy set. + pub(crate) fn mark_unhealthy(&mut self, id: u64, data: Option) { + self.unhealthy.insert(id, data); + self.healthy.remove(&id); + } +} diff --git a/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs index 868f940..789986b 100644 --- a/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs @@ -7,21 +7,17 @@ //! components into healthy and unhealthy sets, each annotated with the //! latest telemetry sample. -use std::{ - collections::{HashMap, HashSet}, - time::Duration, -}; +use std::{collections::HashSet, time::Duration}; use tokio::select; use crate::{ MicrogridClientHandle, - client::proto::common::microgrid::electrical_components::{ - ElectricalComponentStateCode, ElectricalComponentTelemetry, - }, + client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode, microgrid::telemetry_tracker::battery_pool_telemetry_tracker::InverterBatteryGroup, }; +use super::component_partition::ComponentHealthPartition; use super::component_telemetry_tracker::{ComponentHealthStatus, ComponentTelemetryTracker}; /// A telemetry tracker for an inverter-battery group, which consists of a set @@ -44,19 +40,12 @@ pub(crate) struct InverterBatteryGroupTelemetryTracker { } /// A snapshot of an inverter-battery group's components, partitioned by health -/// status and annotated with the latest telemetry sample for each component. -/// -/// The `healthy_*` maps hold the most recent [`ElectricalComponentTelemetry`] -/// observed for each healthy component. The `unhealthy_*` maps hold the last -/// telemetry observed before the component became unhealthy, or `None` if no -/// sample has been received yet. Consumers can use the telemetry (including -/// per-metric bounds) directly without subscribing to the raw streams again. -#[derive(Clone, Debug, PartialEq)] +/// status and annotated with the latest telemetry sample for each component +/// (see [`ComponentHealthPartition`]). +#[derive(Clone, Debug, Default, PartialEq)] pub struct InverterBatteryGroupStatus { - pub healthy_inverters: HashMap, - pub healthy_batteries: HashMap, - pub unhealthy_inverters: HashMap>, - pub unhealthy_batteries: HashMap>, + pub inverters: ComponentHealthPartition, + pub batteries: ComponentHealthPartition, } impl InverterBatteryGroupTelemetryTracker { @@ -77,10 +66,8 @@ impl InverterBatteryGroupTelemetryTracker { } pub async fn run(self) { - let mut healthy_inverters = HashMap::new(); - let mut unhealthy_inverters = HashMap::new(); - let mut healthy_batteries = HashMap::new(); - let mut unhealthy_batteries = HashMap::new(); + let mut inverters = ComponentHealthPartition::default(); + let mut batteries = ComponentHealthPartition::default(); let (inverter_status_tx, mut inverter_status_rx) = tokio::sync::mpsc::channel(100); @@ -110,7 +97,7 @@ impl InverterBatteryGroupTelemetryTracker { tracker.run().await; }); // Initially mark the component as unhealthy until we see data. - unhealthy_inverters.insert(inverter_id, None); + inverters.mark_unhealthy(inverter_id, None); } let (battery_status_tx, mut battery_status_rx) = tokio::sync::mpsc::channel(100); @@ -141,7 +128,7 @@ impl InverterBatteryGroupTelemetryTracker { tracker.run().await; }); // Initially mark the component as unhealthy until we see data. - unhealthy_batteries.insert(battery_id, None); + batteries.mark_unhealthy(battery_id, None); } // Drop the original senders in the main task to allow the component @@ -164,12 +151,10 @@ impl InverterBatteryGroupTelemetryTracker { }; match inverter_status { ComponentHealthStatus::Healthy(component_id, data) => { - healthy_inverters.insert(component_id, data); - unhealthy_inverters.remove(&component_id); + inverters.mark_healthy(component_id, data); } ComponentHealthStatus::Unhealthy(component_id, data) => { - unhealthy_inverters.insert(component_id, data); - healthy_inverters.remove(&component_id); + inverters.mark_unhealthy(component_id, data); } } }, @@ -185,12 +170,10 @@ impl InverterBatteryGroupTelemetryTracker { }; match battery_status { ComponentHealthStatus::Healthy(component_id, data) => { - healthy_batteries.insert(component_id, data); - unhealthy_batteries.remove(&component_id); + batteries.mark_healthy(component_id, data); } ComponentHealthStatus::Unhealthy(component_id, data) => { - unhealthy_batteries.insert(component_id, data); - healthy_batteries.remove(&component_id); + batteries.mark_unhealthy(component_id, data); } } } @@ -200,10 +183,8 @@ impl InverterBatteryGroupTelemetryTracker { .send(( self.inverter_battery_group.clone(), InverterBatteryGroupStatus { - healthy_inverters: healthy_inverters.clone(), - healthy_batteries: healthy_batteries.clone(), - unhealthy_inverters: unhealthy_inverters.clone(), - unhealthy_batteries: unhealthy_batteries.clone(), + inverters: inverters.clone(), + batteries: batteries.clone(), }, )) .await diff --git a/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs index 9ff815a..54470ae 100644 --- a/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs @@ -8,7 +8,7 @@ //! sets, whenever any inverter's telemetry or health classification changes. use std::{ - collections::{BTreeSet, HashMap, HashSet}, + collections::{BTreeSet, HashSet}, time::Duration, }; @@ -16,25 +16,18 @@ use tokio::sync::{broadcast, mpsc}; use crate::{ MicrogridClientHandle, - client::proto::common::microgrid::electrical_components::{ - ElectricalComponentStateCode, ElectricalComponentTelemetry, - }, + client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode, }; +use super::component_partition::ComponentHealthPartition; use super::component_telemetry_tracker::{ComponentHealthStatus, ComponentTelemetryTracker}; /// A snapshot of a PV pool's inverters, partitioned by health status and -/// annotated with the latest telemetry sample for each. -/// -/// `healthy_inverters` holds the most recent [`ElectricalComponentTelemetry`] -/// observed for each healthy inverter. `unhealthy_inverters` holds the last -/// telemetry observed before the inverter became unhealthy, or `None` if no -/// sample has been received yet. Consumers can use the telemetry (including -/// per-metric bounds) directly without subscribing to the raw streams again. +/// annotated with the latest telemetry sample for each (see +/// [`ComponentHealthPartition`]). #[derive(Clone, Debug, PartialEq)] pub struct PvPoolSnapshot { - pub healthy_inverters: HashMap, - pub unhealthy_inverters: HashMap>, + pub inverters: ComponentHealthPartition, } /// A tracker that watches every PV inverter in the pool and emits a @@ -72,9 +65,7 @@ impl PvPoolTelemetryTracker { return; } - let mut healthy_inverters: HashMap = HashMap::new(); - let mut unhealthy_inverters: HashMap> = - HashMap::new(); + let mut inverters = ComponentHealthPartition::default(); let (status_tx, mut status_rx) = mpsc::channel(100); for &inverter_id in &self.component_ids { @@ -103,7 +94,7 @@ impl PvPoolTelemetryTracker { tracker.run().await; }); // Initially mark the inverter as unhealthy until we see data. - unhealthy_inverters.insert(inverter_id, None); + inverters.mark_unhealthy(inverter_id, None); } // Drop the original sender so the channel closes once every component @@ -118,12 +109,10 @@ impl PvPoolTelemetryTracker { maybe_status = status_rx.recv() => { match maybe_status { Some(ComponentHealthStatus::Healthy(id, data)) => { - healthy_inverters.insert(id, data); - unhealthy_inverters.remove(&id); + inverters.mark_healthy(id, data); } Some(ComponentHealthStatus::Unhealthy(id, data)) => { - unhealthy_inverters.insert(id, data); - healthy_inverters.remove(&id); + inverters.mark_unhealthy(id, data); } // Every component tracker has exited and dropped its // sender, so no further updates will ever arrive. The @@ -141,17 +130,15 @@ impl PvPoolTelemetryTracker { if self.component_pool_status_tx.receiver_count() == 0 { break; } - // Skip sending if the partitioning hasn't changed. - let unchanged = last_sent.as_ref().is_some_and(|s| { - s.healthy_inverters == healthy_inverters - && s.unhealthy_inverters == unhealthy_inverters - }); + // Skip sending if the partitioning hasn't changed. Comparing + // the whole partition (not field by field) means a future + // field can't silently escape change detection. + let unchanged = last_sent.as_ref().is_some_and(|s| s.inverters == inverters); if unchanged { continue; } let snapshot = PvPoolSnapshot { - healthy_inverters: healthy_inverters.clone(), - unhealthy_inverters: unhealthy_inverters.clone(), + inverters: inverters.clone(), }; if self.component_pool_status_tx.send(snapshot.clone()).is_err() { // All receivers dropped between the check above and here; @@ -227,8 +214,8 @@ mod tests { let mut rx = pool.telemetry_snapshots(); let snap = last_snapshot(&mut rx, 10).await; - assert!(snap.healthy_inverters.contains_key(&3)); - assert!(snap.unhealthy_inverters.is_empty()); + assert!(snap.inverters.healthy.contains_key(&3)); + assert!(snap.inverters.unhealthy.is_empty()); } #[tokio::test(start_paused = true)] @@ -245,9 +232,9 @@ mod tests { let mut rx = pool.telemetry_snapshots(); let snap = last_snapshot(&mut rx, 10).await; - assert!(snap.healthy_inverters.contains_key(&3)); - assert!(snap.healthy_inverters.contains_key(&4)); - assert!(snap.unhealthy_inverters.is_empty()); + assert!(snap.inverters.healthy.contains_key(&3)); + assert!(snap.inverters.healthy.contains_key(&4)); + assert!(snap.inverters.unhealthy.is_empty()); } #[tokio::test(start_paused = true)] @@ -291,7 +278,7 @@ mod tests { // First confirm the inverter reaches a healthy state. let healthy = last_snapshot(&mut rx, 10).await; assert!( - healthy.healthy_inverters.contains_key(&3), + healthy.inverters.healthy.contains_key(&3), "expected inverter to go healthy after initial samples, got {:?}", healthy ); @@ -302,11 +289,11 @@ mod tests { let unhealthy = last_snapshot(&mut rx, 5).await; assert!( - unhealthy.healthy_inverters.is_empty(), + unhealthy.inverters.healthy.is_empty(), "inverter should be unhealthy after data stops, got healthy set {:?}", - unhealthy.healthy_inverters.keys() + unhealthy.inverters.healthy.keys() ); - assert!(unhealthy.unhealthy_inverters.contains_key(&3)); + assert!(unhealthy.inverters.unhealthy.contains_key(&3)); } #[tokio::test(start_paused = true)] @@ -326,11 +313,11 @@ mod tests { let snap = last_snapshot(&mut rx, 10).await; assert!( - !snap.healthy_inverters.contains_key(&3), + !snap.inverters.healthy.contains_key(&3), "inverter with Error state should not be in healthy set" ); assert!( - snap.unhealthy_inverters.contains_key(&3), + snap.inverters.unhealthy.contains_key(&3), "inverter with Error state should be in unhealthy set, got {:?}", snap ); From ba96549799095f1978b489fd591f7767fde52f95 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jun 2026 09:35:39 +0000 Subject: [PATCH 11/12] Share pool component-ID validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `BatteryPool::try_new` and `PvPool::try_new` carried the same two checks — reject an empty explicit set and reject IDs that aren't all of the right kind — differing only in the component kind and the noun used in error messages. Extract them into `validate_pool_ids`, parameterised by the matching-ID set and a noun, and call it from both pools. Signed-off-by: Sahas Subramanian --- src/microgrid.rs | 1 + src/microgrid/battery_pool.rs | 19 ++++++------------ src/microgrid/pool_validation.rs | 34 ++++++++++++++++++++++++++++++++ src/microgrid/pv_pool.rs | 20 ++++++------------- 4 files changed, 47 insertions(+), 27 deletions(-) create mode 100644 src/microgrid/pool_validation.rs diff --git a/src/microgrid.rs b/src/microgrid.rs index 615403e..41d10e0 100644 --- a/src/microgrid.rs +++ b/src/microgrid.rs @@ -6,6 +6,7 @@ mod bounds_aggregation; mod pool_bounds_tracker; mod pool_broadcast; +mod pool_validation; mod battery_bounds_tracker; mod battery_pool; diff --git a/src/microgrid/battery_pool.rs b/src/microgrid/battery_pool.rs index 8acfcbd..4f2c89a 100644 --- a/src/microgrid/battery_pool.rs +++ b/src/microgrid/battery_pool.rs @@ -20,6 +20,7 @@ use crate::{ battery_bounds_tracker, pool_bounds_tracker::PoolBoundsTracker, pool_broadcast::try_reuse, + pool_validation::validate_pool_ids, telemetry_tracker::battery_pool_telemetry_tracker::{ BatteryPoolSnapshot, BatteryPoolTelemetryTracker, }, @@ -51,19 +52,11 @@ impl BatteryPool { snapshot_tx: None, bounds_tx: None, }; - if let Some(ids) = &this.component_ids { - if ids.is_empty() { - let e = "component_ids cannot be an empty set".to_string(); - tracing::error!("{e}"); - return Err(Error::invalid_component(e)); - } - // Validate that all provided IDs correspond to batteries in the graph. - if !ids.is_subset(&this.get_all_battery_ids()) { - let e = format!("All component_ids {:?} must be batteries.", ids); - tracing::error!("{e}"); - return Err(Error::invalid_component(e)); - } - } + validate_pool_ids( + &this.component_ids, + &this.get_all_battery_ids(), + "batteries", + )?; Ok(this) } diff --git a/src/microgrid/pool_validation.rs b/src/microgrid/pool_validation.rs new file mode 100644 index 0000000..4cc88a1 --- /dev/null +++ b/src/microgrid/pool_validation.rs @@ -0,0 +1,34 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! Shared validation for pool component-ID selections. + +use std::collections::BTreeSet; + +use crate::Error; + +/// Validates the `component_ids` a pool was constructed with against the set of +/// `all_matching` IDs of the right kind in the component graph. +/// +/// `Some(ids)` must be a non-empty subset of `all_matching`; `None` ("all +/// components of this kind") is accepted as-is. `noun` names the component kind +/// in error messages (e.g. `"batteries"`). +pub(super) fn validate_pool_ids( + component_ids: &Option>, + all_matching: &BTreeSet, + noun: &str, +) -> Result<(), Error> { + if let Some(ids) = component_ids { + if ids.is_empty() { + let e = "component_ids cannot be an empty set".to_string(); + tracing::error!("{e}"); + return Err(Error::invalid_component(e)); + } + if !ids.is_subset(all_matching) { + let e = format!("All component_ids {ids:?} must be {noun}."); + tracing::error!("{e}"); + return Err(Error::invalid_component(e)); + } + } + Ok(()) +} diff --git a/src/microgrid/pv_pool.rs b/src/microgrid/pv_pool.rs index b5eec20..626a3e5 100644 --- a/src/microgrid/pv_pool.rs +++ b/src/microgrid/pv_pool.rs @@ -25,6 +25,7 @@ use crate::{ microgrid::{ pool_bounds_tracker::PoolBoundsTracker, pool_broadcast::try_reuse, + pool_validation::validate_pool_ids, pv_bounds_tracker, telemetry_tracker::pv_pool_telemetry_tracker::{PvPoolSnapshot, PvPoolTelemetryTracker}, }, @@ -101,20 +102,11 @@ impl PvPool { snapshot_tx: None, bounds_tx: None, }; - if let Some(ids) = &this.component_ids { - if ids.is_empty() { - let e = "component_ids cannot be an empty set".to_string(); - tracing::error!("{e}"); - return Err(Error::invalid_component(e)); - } - // Validate that all provided IDs correspond to PV inverters in the - // graph. - if !ids.is_subset(&this.get_all_pv_inverter_ids()) { - let e = format!("All component_ids {:?} must be PV inverters.", ids); - tracing::error!("{e}"); - return Err(Error::invalid_component(e)); - } - } + validate_pool_ids( + &this.component_ids, + &this.get_all_pv_inverter_ids(), + "PV inverters", + )?; Ok(this) } From d27715d5a9b77299de6e47648556f831baf1937a Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jun 2026 09:38:45 +0000 Subject: [PATCH 12/12] Deduplicate pool test helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `telem_with_power_bounds`, `handles`, and the `last_snapshot` broadcast drainer were copy-pasted verbatim across the pool and bounds-tracker test modules. Collect them into a shared `#[cfg(test)] test_support` module — `last_snapshot` generic over the snapshot type — and have the test modules (and `new_pool`) use them. Signed-off-by: Sahas Subramanian --- src/microgrid.rs | 3 + src/microgrid/battery_bounds_tracker.rs | 21 +------ src/microgrid/pv_bounds_tracker.rs | 21 +------ src/microgrid/pv_pool.rs | 21 +------ .../battery_pool_telemetry_tracker.rs | 47 +++----------- .../pv_pool_telemetry_tracker.rs | 40 ++---------- src/microgrid/test_support.rs | 62 +++++++++++++++++++ 7 files changed, 81 insertions(+), 134 deletions(-) create mode 100644 src/microgrid/test_support.rs diff --git a/src/microgrid.rs b/src/microgrid.rs index 41d10e0..ad6e16d 100644 --- a/src/microgrid.rs +++ b/src/microgrid.rs @@ -8,6 +8,9 @@ mod pool_bounds_tracker; mod pool_broadcast; mod pool_validation; +#[cfg(test)] +mod test_support; + mod battery_bounds_tracker; mod battery_pool; pub use battery_pool::BatteryPool; diff --git a/src/microgrid/battery_bounds_tracker.rs b/src/microgrid/battery_bounds_tracker.rs index 7d78960..499076d 100644 --- a/src/microgrid/battery_bounds_tracker.rs +++ b/src/microgrid/battery_bounds_tracker.rs @@ -71,26 +71,7 @@ mod tests { use crate::quantity::Power; use super::compute_pool_bounds; - - fn telem_with_power_bounds( - id: u64, - bounds: Vec<(Option, Option)>, - ) -> ElectricalComponentTelemetry { - ElectricalComponentTelemetry { - electrical_component_id: id, - metric_samples: vec![MetricSample { - sample_time: None, - metric: MetricPb::AcPowerActive as i32, - value: None, - bounds: bounds - .into_iter() - .map(|(lower, upper)| PbBounds { lower, upper }) - .collect(), - ..Default::default() - }], - ..Default::default() - } - } + use crate::microgrid::test_support::telem_with_power_bounds; fn group(inverter_ids: &[u64], battery_ids: &[u64]) -> InverterBatteryGroup { InverterBatteryGroup::new( diff --git a/src/microgrid/pv_bounds_tracker.rs b/src/microgrid/pv_bounds_tracker.rs index a32adb2..08dc912 100644 --- a/src/microgrid/pv_bounds_tracker.rs +++ b/src/microgrid/pv_bounds_tracker.rs @@ -43,26 +43,7 @@ mod tests { use crate::quantity::Power; use super::compute_pool_bounds; - - fn telem_with_power_bounds( - id: u64, - bounds: Vec<(Option, Option)>, - ) -> ElectricalComponentTelemetry { - ElectricalComponentTelemetry { - electrical_component_id: id, - metric_samples: vec![MetricSample { - sample_time: None, - metric: MetricPb::AcPowerActive as i32, - value: None, - bounds: bounds - .into_iter() - .map(|(lower, upper)| PbBounds { lower, upper }) - .collect(), - ..Default::default() - }], - ..Default::default() - } - } + use crate::microgrid::test_support::telem_with_power_bounds; /// Builds a snapshot whose healthy set holds the given telemetry, keyed by /// component ID, and an empty unhealthy set. diff --git a/src/microgrid/pv_pool.rs b/src/microgrid/pv_pool.rs index 626a3e5..d29393d 100644 --- a/src/microgrid/pv_pool.rs +++ b/src/microgrid/pv_pool.rs @@ -191,26 +191,9 @@ impl PvPool { mod tests { use std::collections::BTreeSet; - use chrono::TimeDelta; - use super::PvPool; - use crate::{ - LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, - client::test_utils::{MockComponent, MockMicrogridApiClient}, - }; - - /// Builds client and logical-meter handles backed by the given mock graph. - async fn handles(graph: MockComponent) -> (MicrogridClientHandle, LogicalMeterHandle) { - let api = MockMicrogridApiClient::new(graph); - let client = MicrogridClientHandle::new_from_client(api); - let lm = LogicalMeterHandle::try_new( - client.clone(), - LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()), - ) - .await - .unwrap(); - (client, lm) - } + use crate::client::test_utils::MockComponent; + use crate::microgrid::test_support::handles; /// grid → meter → [pv meter → pv_inverter(4), pv_inverter(5)], /// [battery meter → battery_inverter(7) → battery(8)] diff --git a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs index 824c8cb..911283e 100644 --- a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs @@ -249,23 +249,13 @@ impl BatteryPoolTelemetryTracker { mod tests { use std::collections::HashMap; - use chrono::TimeDelta; - use super::BatteryPoolSnapshot; - use crate::{ - LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, - client::{ - proto::common::microgrid::electrical_components::ElectricalComponentStateCode, - test_utils::{MockComponent, MockMicrogridApiClient}, - }, - microgrid::{ - battery_pool::BatteryPool, - telemetry_tracker::{ - battery_pool_telemetry_tracker::InverterBatteryGroup, - inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus, - }, - }, - }; + use crate::client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode; + use crate::client::test_utils::MockComponent; + use crate::microgrid::battery_pool::BatteryPool; + use crate::microgrid::telemetry_tracker::battery_pool_telemetry_tracker::InverterBatteryGroup; + use crate::microgrid::telemetry_tracker::inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus; + use crate::microgrid::test_support::{handles, last_snapshot}; impl BatteryPoolSnapshot { pub(crate) fn from_groups( @@ -275,33 +265,10 @@ mod tests { } } async fn new_pool(graph: MockComponent) -> BatteryPool { - let api = MockMicrogridApiClient::new(graph); - let client = MicrogridClientHandle::new_from_client(api); - let lm = LogicalMeterHandle::try_new( - client.clone(), - LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()), - ) - .await - .unwrap(); + let (client, lm) = handles(graph).await; BatteryPool::try_new(None, client, lm).unwrap() } - /// Drains `rx` for up to `steps` * 100ms of simulated time, returning the - /// last snapshot seen. - async fn last_snapshot( - rx: &mut tokio::sync::broadcast::Receiver, - steps: u32, - ) -> BatteryPoolSnapshot { - let mut last = None; - for _ in 0..steps { - tokio::time::advance(std::time::Duration::from_millis(100)).await; - while let Ok(snap) = rx.try_recv() { - last = Some(snap); - } - } - last.expect("no snapshot received") - } - #[tokio::test(start_paused = true)] async fn single_group_reaches_healthy_state() { // grid → meter → battery_inverter(3) → battery(4) diff --git a/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs index 54470ae..68b4576 100644 --- a/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/pv_pool_telemetry_tracker.rs @@ -161,46 +161,16 @@ impl PvPoolTelemetryTracker { #[cfg(test)] mod tests { - use chrono::TimeDelta; - - use super::PvPoolSnapshot; - use crate::{ - LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, - client::{ - proto::common::microgrid::electrical_components::ElectricalComponentStateCode, - test_utils::{MockComponent, MockMicrogridApiClient}, - }, - microgrid::pv_pool::PvPool, - }; + use crate::client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode; + use crate::client::test_utils::MockComponent; + use crate::microgrid::pv_pool::PvPool; + use crate::microgrid::test_support::{handles, last_snapshot}; async fn new_pool(graph: MockComponent) -> PvPool { - let api = MockMicrogridApiClient::new(graph); - let client = MicrogridClientHandle::new_from_client(api); - let lm = LogicalMeterHandle::try_new( - client.clone(), - LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()), - ) - .await - .unwrap(); + let (client, lm) = handles(graph).await; PvPool::try_new(None, client, lm).unwrap() } - /// Drains `rx` for up to `steps` * 100ms of simulated time, returning the - /// last snapshot seen. - async fn last_snapshot( - rx: &mut tokio::sync::broadcast::Receiver, - steps: u32, - ) -> PvPoolSnapshot { - let mut last = None; - for _ in 0..steps { - tokio::time::advance(std::time::Duration::from_millis(100)).await; - while let Ok(snap) = rx.try_recv() { - last = Some(snap); - } - } - last.expect("no snapshot received") - } - #[tokio::test(start_paused = true)] async fn single_inverter_reaches_healthy_state() { // grid → meter → pv_inverter(3) diff --git a/src/microgrid/test_support.rs b/src/microgrid/test_support.rs new file mode 100644 index 0000000..e453cd0 --- /dev/null +++ b/src/microgrid/test_support.rs @@ -0,0 +1,62 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! Shared test helpers for the pool modules. + +use chrono::TimeDelta; + +use crate::client::proto::common::metrics::{Bounds as PbBounds, Metric as MetricPb, MetricSample}; +use crate::client::proto::common::microgrid::electrical_components::ElectricalComponentTelemetry; +use crate::client::test_utils::{MockComponent, MockMicrogridApiClient}; +use crate::{LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle}; + +/// Builds an [`ElectricalComponentTelemetry`] for `id` carrying a single +/// active-power sample whose `bounds` are the given `(lower, upper)` pairs. +pub(crate) fn telem_with_power_bounds( + id: u64, + bounds: Vec<(Option, Option)>, +) -> ElectricalComponentTelemetry { + ElectricalComponentTelemetry { + electrical_component_id: id, + metric_samples: vec![MetricSample { + sample_time: None, + metric: MetricPb::AcPowerActive as i32, + value: None, + bounds: bounds + .into_iter() + .map(|(lower, upper)| PbBounds { lower, upper }) + .collect(), + ..Default::default() + }], + ..Default::default() + } +} + +/// Builds client and logical-meter handles backed by the given mock graph. +pub(crate) async fn handles(graph: MockComponent) -> (MicrogridClientHandle, LogicalMeterHandle) { + let api = MockMicrogridApiClient::new(graph); + let client = MicrogridClientHandle::new_from_client(api); + let lm = LogicalMeterHandle::try_new( + client.clone(), + LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()), + ) + .await + .unwrap(); + (client, lm) +} + +/// Drains `rx` for up to `steps` * 100ms of simulated time, returning the last +/// value seen. Panics if no value arrives. +pub(crate) async fn last_snapshot( + rx: &mut tokio::sync::broadcast::Receiver, + steps: u32, +) -> T { + let mut last = None; + for _ in 0..steps { + tokio::time::advance(std::time::Duration::from_millis(100)).await; + while let Ok(snap) = rx.try_recv() { + last = Some(snap); + } + } + last.expect("no snapshot received") +}