From bd435e3d10a0597f6ab4717566f840ff0dd2a820 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 7 Apr 2026 17:45:55 +0200 Subject: [PATCH 01/18] Add numeric operations as trait methods for Quantity Signed-off-by: Sahas Subramanian --- src/quantity.rs | 111 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 101 insertions(+), 10 deletions(-) diff --git a/src/quantity.rs b/src/quantity.rs index 1a46527..1db45cc 100644 --- a/src/quantity.rs +++ b/src/quantity.rs @@ -24,6 +24,17 @@ pub trait Quantity: fn zero() -> Self { Self::default() } + + fn abs(self) -> Self; + fn floor(self) -> Self; + fn ceil(self) -> Self; + fn round(self) -> Self; + fn trunc(self) -> Self; + fn fract(self) -> Self; + fn is_nan(self) -> bool; + fn is_infinite(self) -> bool; + fn min(self, other: Self) -> Self; + fn max(self, other: Self) -> Self; } impl std::ops::Mul for f32 { @@ -34,7 +45,47 @@ impl std::ops::Mul for f32 { } } -impl Quantity for f32 {} +impl Quantity for f32 { + fn abs(self) -> Self { + self.abs() + } + + fn floor(self) -> Self { + self.floor() + } + + fn ceil(self) -> Self { + self.ceil() + } + + fn round(self) -> Self { + self.round() + } + + fn trunc(self) -> Self { + self.trunc() + } + + fn fract(self) -> Self { + self.fract() + } + + fn is_nan(self) -> bool { + self.is_nan() + } + + fn is_infinite(self) -> bool { + self.is_infinite() + } + + fn min(self, other: Self) -> Self { + self.min(other) + } + + fn max(self, other: Self) -> Self { + self.max(other) + } +} /// Formats an f32 with a given precision and removes trailing zeros fn format_float(value: f32, precision: usize) -> String { @@ -190,47 +241,47 @@ macro_rules! qty_ctor { impl $typename { qty_ctor!(@impl $($rest)*); - pub const fn abs(&self) -> Self { + pub const fn abs(self) -> Self { Self { value: self.value.abs(), } } - pub const fn floor(&self) -> Self { + pub const fn floor(self) -> Self { Self { value: self.value.floor(), } } - pub const fn ceil(&self) -> Self { + pub const fn ceil(self) -> Self { Self { value: self.value.ceil(), } } - pub const fn round(&self) -> Self { + pub const fn round(self) -> Self { Self { value: self.value.round(), } } - pub const fn trunc(&self) -> Self { + pub const fn trunc(self) -> Self { Self { value: self.value.trunc(), } } - pub const fn fract(&self) -> Self { + pub const fn fract(self) -> Self { Self { value: self.value.fract(), } } - pub const fn is_nan(&self) -> bool { + pub const fn is_nan(self) -> bool { self.value.is_nan() } - pub const fn is_infinite(&self) -> bool { + pub const fn is_infinite(self) -> bool { self.value.is_infinite() } @@ -250,7 +301,47 @@ macro_rules! qty_ctor { qty_ctor!{@impl_arith_ops $typename} qty_format!{$typename => {$($rest)*}} - impl super::Quantity for $typename {} + impl super::Quantity for $typename { + fn abs(self) -> Self { + self.abs() + } + + fn floor(self) -> Self { + self.floor() + } + + fn ceil(self) -> Self { + self.ceil() + } + + fn round(self) -> Self { + self.round() + } + + fn trunc(self) -> Self { + self.trunc() + } + + fn fract(self) -> Self { + self.fract() + } + + fn is_nan(self) -> bool { + self.is_nan() + } + + fn is_infinite(self) -> bool { + self.is_infinite() + } + + fn min(self, other: Self) -> Self { + self.min(other) + } + + fn max(self, other: Self) -> Self { + self.max(other) + } + } }; } From 08361b25e45c1bf7710d77b83267aeab783c4e99 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 8 Apr 2026 11:40:24 +0200 Subject: [PATCH 02/18] Add a new ErrorKind - ComponentDataError Signed-off-by: Sahas Subramanian --- src/error.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/error.rs b/src/error.rs index 5794900..a95c860 100644 --- a/src/error.rs +++ b/src/error.rs @@ -54,6 +54,7 @@ macro_rules! ErrorKind { ErrorKind!( (ComponentGraphError, component_graph_error), + (ComponentDataError, component_data_error), (ConnectionFailure, connection_failure), (ChronoError, chrono_error), (DroppedUnusedFormulas, dropped_unused_formulas), From c89f6c00775236bca8e5c3970a3af1edfdd21b98 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 8 Apr 2026 11:38:07 +0200 Subject: [PATCH 03/18] Expose the component graph from the logical meter handle Signed-off-by: Sahas Subramanian --- src/logical_meter/logical_meter_handle.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/logical_meter/logical_meter_handle.rs b/src/logical_meter/logical_meter_handle.rs index 636cc28..006b9aa 100644 --- a/src/logical_meter/logical_meter_handle.rs +++ b/src/logical_meter/logical_meter_handle.rs @@ -159,6 +159,11 @@ impl LogicalMeterHandle { component_id, )?))) } + + /// Returns a reference to the component graph. + pub fn graph(&self) -> &ComponentGraph { + &self.graph + } } #[cfg(test)] From e1aa378b4190809a374c63b77dd33129ddb11fbc Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 8 Apr 2026 12:11:05 +0200 Subject: [PATCH 04/18] Implement conversion from protobuf bounds to typed bounds Signed-off-by: Sahas Subramanian --- src/bounds.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/bounds.rs b/src/bounds.rs index 80aff8f..7e70193 100644 --- a/src/bounds.rs +++ b/src/bounds.rs @@ -65,3 +65,34 @@ impl From> for PbBounds { } } } + +impl From for Bounds { + fn from(pb_bounds: PbBounds) -> Self { + Self::new( + pb_bounds.lower.map(Power::from_watts), + pb_bounds.upper.map(Power::from_watts), + ) + } +} + +impl From for Bounds { + fn from(pb_bounds: PbBounds) -> Self { + Self::new( + pb_bounds.lower.map(Current::from_amperes), + pb_bounds.upper.map(Current::from_amperes), + ) + } +} + +impl From for Bounds { + fn from(pb_bounds: PbBounds) -> Self { + Self::new( + pb_bounds + .lower + .map(ReactivePower::from_volt_amperes_reactive), + pb_bounds + .upper + .map(ReactivePower::from_volt_amperes_reactive), + ) + } +} From 8238ea84f278d3f2f16810aa058e844783f6c0d9 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 31 Mar 2026 16:00:28 +0200 Subject: [PATCH 05/18] Add a top-level `Microgrid` struct Signed-off-by: Sahas Subramanian --- src/lib.rs | 3 +++ src/microgrid.rs | 54 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 src/microgrid.rs diff --git a/src/lib.rs b/src/lib.rs index f6b36ba..575fbbc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,3 +22,6 @@ mod logical_meter; pub use logical_meter::{Formula, FormulaSubscriber, LogicalMeterConfig, LogicalMeterHandle}; pub mod metric; + +mod microgrid; +pub use microgrid::Microgrid; diff --git a/src/microgrid.rs b/src/microgrid.rs new file mode 100644 index 0000000..61b7671 --- /dev/null +++ b/src/microgrid.rs @@ -0,0 +1,54 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! High-level interface for the Microgrid API. + +use crate::{Error, LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle}; + +/// A high-level interface for the Microgrid API. +pub struct Microgrid { + client: MicrogridClientHandle, + logical_meter: LogicalMeterHandle, +} + +impl Microgrid { + /// Creates a new `Microgrid` instance with the given microgrid API URL and + /// logical meter configuration. + /// + /// Returns an error if the URL is unreachable, or if the component graph + /// cannot be created with the given configuration. + pub async fn try_new( + url: impl Into, + config: LogicalMeterConfig, + ) -> Result { + let client = MicrogridClientHandle::try_new(url).await?; + let logical_meter = LogicalMeterHandle::try_new(client.clone(), config).await?; + + Ok(Microgrid { + client, + logical_meter, + }) + } + + /// Creates a new `Microgrid` instance from the given client and logical + /// meter handles. + pub fn new_from_handles( + client: MicrogridClientHandle, + logical_meter: LogicalMeterHandle, + ) -> Self { + Microgrid { + client, + logical_meter, + } + } + + /// Returns a handle to the Microgrid client. + pub fn client(&self) -> MicrogridClientHandle { + self.client.clone() + } + + /// Returns a handle to the logical meter. + pub fn logical_meter(&self) -> LogicalMeterHandle { + self.logical_meter.clone() + } +} From 9bbf7f06079249abdf1a899915d9287380f3f1a2 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 31 Mar 2026 16:04:52 +0200 Subject: [PATCH 06/18] Add a `BatteryPool` accessible through `Microgrid` Signed-off-by: Sahas Subramanian --- src/lib.rs | 2 +- src/microgrid.rs | 11 +++++++++++ src/microgrid/battery_pool.rs | 37 +++++++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 src/microgrid/battery_pool.rs diff --git a/src/lib.rs b/src/lib.rs index 575fbbc..dd69599 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,4 +24,4 @@ pub use logical_meter::{Formula, FormulaSubscriber, LogicalMeterConfig, LogicalM pub mod metric; mod microgrid; -pub use microgrid::Microgrid; +pub use microgrid::{BatteryPool, Microgrid}; diff --git a/src/microgrid.rs b/src/microgrid.rs index 61b7671..024cca6 100644 --- a/src/microgrid.rs +++ b/src/microgrid.rs @@ -3,6 +3,9 @@ //! High-level interface for the Microgrid API. +mod battery_pool; +pub use battery_pool::BatteryPool; + use crate::{Error, LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle}; /// A high-level interface for the Microgrid API. @@ -51,4 +54,12 @@ impl Microgrid { pub fn logical_meter(&self) -> LogicalMeterHandle { self.logical_meter.clone() } + + pub fn battery_pool(&self, component_ids: Option>) -> BatteryPool { + BatteryPool::new( + component_ids.map(|ids| ids.into_iter().collect()), + self.client.clone(), + self.logical_meter.clone(), + ) + } } diff --git a/src/microgrid/battery_pool.rs b/src/microgrid/battery_pool.rs new file mode 100644 index 0000000..ff8ada8 --- /dev/null +++ b/src/microgrid/battery_pool.rs @@ -0,0 +1,37 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! Representation of a pool of batteries in the microgrid. + +use std::collections::BTreeSet; + +use crate::{Error, Formula, LogicalMeterHandle, MicrogridClientHandle, metric, quantity::Power}; + +/// An interface for abstracting over a pool of batteries in the microgrid. +pub struct BatteryPool { + component_ids: Option>, + client: MicrogridClientHandle, + logical_meter: LogicalMeterHandle, +} + +impl BatteryPool { + /// Creates a new `BatteryPool` instance with the given component IDs, + /// client and logical meter handles. + pub(crate) fn new( + component_ids: Option>, + client: MicrogridClientHandle, + logical_meter: LogicalMeterHandle, + ) -> Self { + Self { + component_ids, + client, + logical_meter, + } + } + + /// Returns a formula for the active power of the battery pool. + pub fn power(&mut self) -> Result, Error> { + self.logical_meter + .battery::(self.component_ids.clone()) + } +} From bbd6ab93ee746bb869fed2cfc318fe997233a44b Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 31 Mar 2026 17:20:45 +0200 Subject: [PATCH 07/18] Remove top-level re-export of the `proto` module Signed-off-by: Sahas Subramanian --- src/bounds.rs | 2 +- src/client.rs | 2 +- src/client/instruction.rs | 2 +- src/client/microgrid_api_client.rs | 17 ++++++++++------- src/client/microgrid_client_actor.rs | 11 +++++++---- src/client/microgrid_client_handle.rs | 8 ++++---- src/client/proto/electrical_component.rs | 4 ++-- src/client/test_utils.rs | 2 +- src/lib.rs | 1 - src/logical_meter/config.rs | 2 +- .../formula/graph_formula_provider.rs | 6 +++--- src/logical_meter/logical_meter_actor.rs | 4 ++-- src/logical_meter/logical_meter_handle.rs | 6 +++--- src/metric.rs | 4 ++-- 14 files changed, 38 insertions(+), 33 deletions(-) diff --git a/src/bounds.rs b/src/bounds.rs index 7e70193..a03ad10 100644 --- a/src/bounds.rs +++ b/src/bounds.rs @@ -3,7 +3,7 @@ //! A representation of Bounds for any metric. -use crate::proto::common::metrics::Bounds as PbBounds; +use crate::client::proto::common::metrics::Bounds as PbBounds; use crate::quantity::{Current, Power, Quantity, ReactivePower}; /// A set of lower and upper bounds for any metric. diff --git a/src/client.rs b/src/client.rs index 9a33a00..59acbd7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -13,7 +13,7 @@ pub(crate) use microgrid_api_client::MicrogridApiClient; mod microgrid_client_handle; pub use microgrid_client_handle::MicrogridClientHandle; -pub(crate) mod proto; +pub mod proto; pub use proto::common::microgrid::electrical_components::{ ElectricalComponent, ElectricalComponentCategory, }; diff --git a/src/client/instruction.rs b/src/client/instruction.rs index 62b8770..fcaef04 100644 --- a/src/client/instruction.rs +++ b/src/client/instruction.rs @@ -8,7 +8,7 @@ use tokio::sync::{broadcast, oneshot}; use crate::{ Error, - proto::common::{ + client::proto::common::{ metrics::{Bounds, Metric}, microgrid::electrical_components::{ ElectricalComponent, ElectricalComponentCategory, ElectricalComponentConnection, diff --git a/src/client/microgrid_api_client.rs b/src/client/microgrid_api_client.rs index 8786c49..102f83c 100644 --- a/src/client/microgrid_api_client.rs +++ b/src/client/microgrid_api_client.rs @@ -6,12 +6,15 @@ use tonic::transport::Channel; -use crate::proto::microgrid::{ - AugmentElectricalComponentBoundsRequest, AugmentElectricalComponentBoundsResponse, - ListElectricalComponentConnectionsRequest, ListElectricalComponentConnectionsResponse, - ListElectricalComponentsRequest, ListElectricalComponentsResponse, - ReceiveElectricalComponentTelemetryStreamRequest, - ReceiveElectricalComponentTelemetryStreamResponse, +use crate::client::proto::{ + self, + microgrid::{ + AugmentElectricalComponentBoundsRequest, AugmentElectricalComponentBoundsResponse, + ListElectricalComponentConnectionsRequest, ListElectricalComponentConnectionsResponse, + ListElectricalComponentsRequest, ListElectricalComponentsResponse, + ReceiveElectricalComponentTelemetryStreamRequest, + ReceiveElectricalComponentTelemetryStreamResponse, + }, }; /// A trait representing the microgrid API client. @@ -56,7 +59,7 @@ pub trait MicrogridApiClient: Send + Sync + 'static { /// /// Forwards calls to the underlying gRPC client methods, without any additional logic. #[async_trait::async_trait] -impl MicrogridApiClient for crate::proto::microgrid::microgrid_client::MicrogridClient { +impl MicrogridApiClient for proto::microgrid::microgrid_client::MicrogridClient { async fn list_electrical_components( &mut self, request: impl tonic::IntoRequest + Send, diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index 9e489f9..b42fea7 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -3,13 +3,16 @@ //! The microgrid client actor that handles communication with the microgrid API. -use crate::{ - client::{MicrogridApiClient, instruction::Instruction, retry_tracker::RetryTracker}, +use crate::client::{ + MicrogridApiClient, + instruction::Instruction, + proto::common::microgrid::electrical_components::ElectricalComponentTelemetry, proto::microgrid::{ ListElectricalComponentConnectionsRequest, ListElectricalComponentsRequest, ReceiveElectricalComponentTelemetryStreamRequest, ReceiveElectricalComponentTelemetryStreamResponse, }, + retry_tracker::RetryTracker, }; use chrono::DateTime; use futures::{Stream, StreamExt}; @@ -20,7 +23,7 @@ use tokio::{ }; use tracing::Instrument as _; -use crate::{Error, proto::common::microgrid::electrical_components::ElectricalComponentTelemetry}; +use crate::Error; enum StreamStatus { Failed(u64), @@ -189,7 +192,7 @@ async fn handle_instruction( }) => { let response = client .augment_electrical_component_bounds( - crate::proto::microgrid::AugmentElectricalComponentBoundsRequest { + crate::client::proto::microgrid::AugmentElectricalComponentBoundsRequest { electrical_component_id, target_metric: target_metric as i32, bounds, diff --git a/src/client/microgrid_client_handle.rs b/src/client/microgrid_client_handle.rs index e7a6566..c8325f5 100644 --- a/src/client/microgrid_client_handle.rs +++ b/src/client/microgrid_client_handle.rs @@ -13,8 +13,7 @@ use tonic::transport::Channel; use crate::{ Bounds, Error, client::MicrogridApiClient, - metric::Metric, - proto::{ + client::proto::{ common::metrics::Bounds as PbBounds, common::microgrid::electrical_components::{ ElectricalComponent, ElectricalComponentCategory, ElectricalComponentConnection, @@ -22,6 +21,7 @@ use crate::{ }, microgrid::microgrid_client::MicrogridClient, }, + metric::Metric, }; use super::{instruction::Instruction, microgrid_client_actor::MicrogridClientActor}; @@ -236,11 +236,11 @@ mod tests { use crate::{ MicrogridClientHandle, - client::test_utils::{MockComponent, MockMicrogridApiClient}, - proto::common::{ + client::proto::common::{ metrics::{SimpleMetricValue, metric_value_variant}, microgrid::electrical_components::ElectricalComponentCategory, }, + client::test_utils::{MockComponent, MockMicrogridApiClient}, }; fn new_client_handle() -> MicrogridClientHandle { diff --git a/src/client/proto/electrical_component.rs b/src/client/proto/electrical_component.rs index 1fc165f..f2a16e6 100644 --- a/src/client/proto/electrical_component.rs +++ b/src/client/proto/electrical_component.rs @@ -3,8 +3,8 @@ //! Extensions to the generated protobuf code for electrical components. -use crate::{ - client::{ElectricalComponent, ElectricalComponentCategory}, +use crate::client::{ + ElectricalComponent, ElectricalComponentCategory, proto::common::microgrid::electrical_components::{ InverterType, electrical_component_category_specific_info::Kind, }, diff --git a/src/client/test_utils.rs b/src/client/test_utils.rs index d300af4..07a0752 100644 --- a/src/client/test_utils.rs +++ b/src/client/test_utils.rs @@ -9,7 +9,7 @@ use tokio_stream::wrappers::ReceiverStream; use tonic::Response; use crate::{ - proto::{ + client::proto::{ common::{ metrics::{ Metric, MetricSample, MetricValueVariant, SimpleMetricValue, metric_value_variant, diff --git a/src/lib.rs b/src/lib.rs index dd69599..be38afc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,6 @@ pub use bounds::Bounds; pub mod client; pub use client::MicrogridClientHandle; -pub(crate) use client::proto; mod error; pub use error::{Error, ErrorKind}; diff --git a/src/logical_meter/config.rs b/src/logical_meter/config.rs index 158e63f..b3ada64 100644 --- a/src/logical_meter/config.rs +++ b/src/logical_meter/config.rs @@ -4,7 +4,7 @@ //! This module defines the configuration for the logical meter. use crate::Sample; -use crate::proto::common::metrics::Metric; +use crate::client::proto::common::metrics::Metric; use chrono::TimeDelta; use frequenz_resampling::ResamplingFunction; use std::collections::HashMap; diff --git a/src/logical_meter/formula/graph_formula_provider.rs b/src/logical_meter/formula/graph_formula_provider.rs index 7329076..b9ca1fc 100644 --- a/src/logical_meter/formula/graph_formula_provider.rs +++ b/src/logical_meter/formula/graph_formula_provider.rs @@ -4,14 +4,14 @@ //! A composable formula type, that can be subscribed to. use crate::Error; +use crate::client::proto::common::microgrid::electrical_components::{ + ElectricalComponent, ElectricalComponentConnection, +}; use crate::logical_meter::formula::FormulaParams; use crate::logical_meter::formula::aggregation_formula::AggregationFormula; use crate::logical_meter::formula::coalesce_formula::CoalesceFormula; use crate::logical_meter::logical_meter_actor; use crate::metric::Metric; -use crate::proto::common::microgrid::electrical_components::{ - ElectricalComponent, ElectricalComponentConnection, -}; use frequenz_microgrid_component_graph::ComponentGraph; use std::collections::BTreeSet; diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index 7b35371..267ffa9 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -13,11 +13,11 @@ use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{MissedTickBehavior, interval}; use crate::ErrorKind; -use crate::proto::common::metrics::{Metric, metric_value_variant::MetricValueVariant}; +use crate::client::proto::common::metrics::{Metric, metric_value_variant::MetricValueVariant}; use crate::quantity::{Current, Power, Quantity, ReactivePower, Voltage}; use crate::{ Error, MicrogridClientHandle, Sample, - proto::common::microgrid::electrical_components::ElectricalComponentTelemetry, + client::proto::common::microgrid::electrical_components::ElectricalComponentTelemetry, }; use super::config::LogicalMeterConfig; diff --git a/src/logical_meter/logical_meter_handle.rs b/src/logical_meter/logical_meter_handle.rs index 006b9aa..feb7aba 100644 --- a/src/logical_meter/logical_meter_handle.rs +++ b/src/logical_meter/logical_meter_handle.rs @@ -5,11 +5,11 @@ use crate::logical_meter::formula::Formula; use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider; use crate::{ client::MicrogridClientHandle, - error::Error, - metric, - proto::common::microgrid::electrical_components::{ + client::proto::common::microgrid::electrical_components::{ ElectricalComponent, ElectricalComponentConnection, }, + error::Error, + metric, }; use frequenz_microgrid_component_graph::{self, ComponentGraph}; use std::collections::BTreeSet; diff --git a/src/metric.rs b/src/metric.rs index 3ae2b5d..f07232b 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -6,8 +6,8 @@ use crate::logical_meter::formula::aggregation_formula::AggregationFormula; use crate::logical_meter::formula::coalesce_formula::CoalesceFormula; use crate::{ - logical_meter::formula, logical_meter::formula::FormulaSubscriber, - proto::common::metrics::Metric as MetricPb, + client::proto::common::metrics::Metric as MetricPb, logical_meter::formula, + logical_meter::formula::FormulaSubscriber, }; pub trait Metric: From d003406c235b932c2eac54f5761cad3f49d38a13 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 8 Apr 2026 15:17:30 +0200 Subject: [PATCH 08/18] Add const {MIN, MAX} on the Quantity trait Signed-off-by: Sahas Subramanian --- src/quantity.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/quantity.rs b/src/quantity.rs index 1db45cc..bcc0604 100644 --- a/src/quantity.rs +++ b/src/quantity.rs @@ -21,6 +21,9 @@ pub trait Quantity: + Send + Sync { + const MIN: Self; + const MAX: Self; + fn zero() -> Self { Self::default() } @@ -46,6 +49,9 @@ impl std::ops::Mul for f32 { } impl Quantity for f32 { + const MIN: Self = f32::MIN; + const MAX: Self = f32::MAX; + fn abs(self) -> Self { self.abs() } @@ -302,6 +308,9 @@ macro_rules! qty_ctor { qty_format!{$typename => {$($rest)*}} impl super::Quantity for $typename { + const MIN: Self = Self { value: f32::MIN }; + const MAX: Self = Self { value: f32::MAX }; + fn abs(self) -> Self { self.abs() } From 1456325f16d5d9ddb06d1a20c3064c3c13bb3f7d Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 8 Apr 2026 15:55:27 +0200 Subject: [PATCH 09/18] Add functions for adding or intersecting a set of bounds Signed-off-by: Sahas Subramanian --- src/bounds.rs | 303 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 303 insertions(+) diff --git a/src/bounds.rs b/src/bounds.rs index a03ad10..f0cf8d5 100644 --- a/src/bounds.rs +++ b/src/bounds.rs @@ -7,6 +7,7 @@ use crate::client::proto::common::metrics::Bounds as PbBounds; use crate::quantity::{Current, Power, Quantity, ReactivePower}; /// A set of lower and upper bounds for any metric. +#[derive(Debug, Clone, PartialEq)] pub struct Bounds { /// The lower bound. /// If None, there is no lower bound. @@ -31,6 +32,72 @@ impl Bounds { pub fn upper(&self) -> Option { self.upper } + + /// Combines two bounds as if their components were connected in parallel. + pub(crate) fn combine_parallel(&self, other: &Self) -> Vec { + if self.intersect(other).is_none() { + return vec![self.clone(), other.clone()]; + } + // Lower side: if both lowers are ≤ 0, the components can both + // discharge, so the combined floor is the sum (more negative). + // Otherwise at least one range sits entirely above zero and the + // combined floor is just the lower of the two individual floors. + let lower = self.lower.and_then(|a| { + other.lower.map(|b| { + if a <= Q::zero() && b <= Q::zero() { + a + b + } else { + a.min(b) + } + }) + }); + // Upper side: mirror of the above — both ≥ 0 means both can charge and + // contributions add; otherwise take the higher of the two. + let upper = self.upper.and_then(|a| { + other.upper.map(|b| { + if a >= Q::zero() && b >= Q::zero() { + a + b + } else { + a.max(b) + } + }) + }); + vec![Bounds { lower, upper }] + } + + /// Returns the intersection of `self` and `other`, or `None` if the + /// intersection is empty. + pub(crate) fn intersect(&self, other: &Self) -> Option { + let lower = Self::map_or_any(Q::max, self.lower, other.lower); + let upper = Self::map_or_any(Q::min, self.upper, other.upper); + if let (Some(lower), Some(upper)) = (lower, upper) + && lower > upper + { + return None; + } + Some(Bounds { lower, upper }) + } + + /// If `self` and `other` overlap, returns the smallest single interval + /// that contains both; otherwise returns `None`. + pub(crate) fn merge_if_overlapping(&self, other: &Self) -> Option { + self.intersect(other)?; + Some(Bounds { + lower: self.lower.and_then(|a| other.lower.map(|b| a.min(b))), + upper: self.upper.and_then(|a| other.upper.map(|b| a.max(b))), + }) + } + + /// Combines two `Option` values with `f`, treating `None` as the + /// identity: if exactly one side is `Some`, that value is returned + /// unchanged. Only `(None, None)` yields `None`. + fn map_or_any(f: impl FnOnce(Q, Q) -> Q, a: Option, b: Option) -> Option { + match (a, b) { + (Some(a), Some(b)) => Some(f(a, b)), + (Some(a), None) | (None, Some(a)) => Some(a), + (None, None) => None, + } + } } impl From<(Option, Option)> for Bounds { @@ -96,3 +163,239 @@ impl From for Bounds { ) } } + +/// Combines two sets of bounds from components connected in parallel. +pub(crate) fn combine_parallel_sets( + a: &[Bounds], + b: &[Bounds], +) -> Vec> { + match (a, b) { + (a, []) | ([], a) => a.to_vec(), + (a, b) => { + let mut result = Vec::new(); + for b1 in a { + for b2 in b { + result.extend(b1.combine_parallel(b2)); + } + } + squash_bounds_sets(result) + } + } +} + +/// Intersects two sets of bounds together, returning the intersection of the +/// given sets. +/// +/// This is used for calculating the combined bounds of two components connected +/// in series. +pub(crate) fn intersect_bounds_sets( + a: &[Bounds], + b: &[Bounds], +) -> Vec> { + let mut result = Vec::new(); + for b1 in a { + for b2 in b { + if let Some(int) = b1.intersect(b2) { + result.push(int); + } + } + } + squash_bounds_sets(result) +} + +/// Merges overlapping bounds into disjoint intervals. +fn squash_bounds_sets(mut input: Vec>) -> Vec> { + if input.is_empty() { + return input; + } + + input.sort_by(|a, b| { + a.lower + .unwrap_or(Q::MIN) + .partial_cmp(&b.lower.unwrap_or(Q::MIN)) + .unwrap_or(std::cmp::Ordering::Equal) + }); + + let mut squashed = Vec::new(); + let mut current = input[0].clone(); + + for next in &input[1..] { + if let Some(merged_bounds) = current.merge_if_overlapping(next) { + current = merged_bounds; + } else { + squashed.push(current); + current = next.clone(); + } + } + squashed.push(current); + + squashed +} + +#[cfg(test)] +mod tests { + use super::{Bounds, combine_parallel_sets, intersect_bounds_sets}; + + #[test] + fn test_bounds_addition() { + let b1 = Bounds::new(Some(-5.0), Some(5.0)); + let b2 = Bounds::new(Some(-3.0), Some(3.0)); + assert_eq!( + b1.combine_parallel(&b2), + vec![Bounds::new(Some(-8.0), Some(8.0))] + ); + + let b1 = Bounds::new(Some(-15.0), Some(-5.0)); + let b2 = Bounds::new(Some(-10.0), Some(-2.0)); + assert_eq!( + b1.combine_parallel(&b2), + vec![Bounds::new(Some(-25.0), Some(-2.0))] + ); + + let b1 = Bounds::new(Some(5.0), Some(15.0)); + let b2 = Bounds::new(Some(2.0), Some(10.0)); + assert_eq!( + b1.combine_parallel(&b2), + vec![Bounds::new(Some(2.0), Some(25.0))] + ); + + let b1 = Bounds::new(Some(5.0), Some(15.0)); + let b2 = Bounds::new(None, Some(10.0)); + assert_eq!( + b1.combine_parallel(&b2), + vec![Bounds::new(None, Some(25.0))] + ); + + let b1 = Bounds::new(Some(5.0), Some(15.0)); + let b2 = Bounds::new(Some(-5.0), None); + assert_eq!( + b1.combine_parallel(&b2), + vec![Bounds::new(Some(-5.0), None)] + ); + + let b1 = Bounds::new(Some(5.0), Some(15.0)); + let b2 = Bounds::new(None, None); + assert_eq!(b1.combine_parallel(&b2), vec![Bounds::new(None, None)]); + + let b1 = Bounds::new(Some(-10.0), Some(-5.0)); + let b2 = Bounds::new(Some(5.0), Some(15.0)); + assert_eq!(b1.combine_parallel(&b2), vec![b1, b2]); + } + + #[test] + fn test_combine_parallel_sets() { + let b1 = vec![Bounds::new(Some(-5.0), Some(5.0))]; + let b2 = vec![ + Bounds::new(Some(-5.0), Some(-2.0)), + Bounds::new(Some(2.0), Some(5.0)), + ]; + let result = combine_parallel_sets(&b1, &b2); + assert_eq!(result, vec![Bounds::new(Some(-10.0), Some(10.0))]); + + let b1 = vec![Bounds::new(Some(-5.0), Some(-1.0))]; + let b2 = vec![ + Bounds::new(Some(-5.0), Some(-2.0)), + Bounds::new(Some(2.0), Some(5.0)), + ]; + let result = combine_parallel_sets(&b1, &b2); + assert_eq!( + result, + vec![ + Bounds::new(Some(-10.0), Some(-1.0)), + Bounds::new(Some(2.0), Some(5.0)) + ] + ); + } + + #[test] + fn test_intersect_bounds_sets() { + let vb1 = vec![ + Bounds::new(Some(-30.0), Some(-10.0)), + Bounds::new(Some(10.0), Some(30.0)), + ]; + let vb2 = vec![ + Bounds::new(Some(-20.0), Some(0.0)), + Bounds::new(Some(20.0), Some(40.0)), + ]; + let intersection = intersect_bounds_sets(&vb1, &vb2); + assert_eq!( + intersection, + vec![ + Bounds::new(Some(-20.0), Some(-10.0)), + Bounds::new(Some(20.0), Some(30.0)), + ] + ); + + let vb2 = vec![ + Bounds::new(Some(-20.0), None), + Bounds::new(None, Some(40.0)), + ]; + let intersection = intersect_bounds_sets(&vb1, &vb2); + assert_eq!( + intersection, + vec![ + Bounds::new(Some(-30.0), Some(-10.0)), + Bounds::new(Some(10.0), Some(30.0)), + ] + ); + + let vb2 = vec![ + Bounds::new(None, Some(-20.0)), + Bounds::new(Some(20.0), None), + ]; + let intersection = intersect_bounds_sets(&vb1, &vb2); + assert_eq!( + intersection, + vec![ + Bounds::new(Some(-30.0), Some(-20.0)), + Bounds::new(Some(20.0), Some(30.0)), + ] + ); + + let vb2 = vec![Bounds::new(Some(-25.0), Some(25.0))]; + let intersection = intersect_bounds_sets(&vb1, &vb2); + assert_eq!( + intersection, + vec![ + Bounds::new(Some(-25.0), Some(-10.0)), + Bounds::new(Some(10.0), Some(25.0)), + ] + ); + + let vb2 = vec![Bounds::new(Some(-5.0), Some(5.0))]; + let intersection = intersect_bounds_sets(&vb1, &vb2); + assert_eq!(intersection, vec![]); + } + + /// Bounds are closed intervals: intersecting at a shared endpoint yields a + /// degenerate single-point interval rather than an empty result. + #[test] + fn intersect_single_point_is_non_empty() { + let a = Bounds::new(Some(5.0), Some(10.0)); + let b = Bounds::new(Some(10.0), Some(15.0)); + assert_eq!(a.intersect(&b), Some(Bounds::new(Some(10.0), Some(10.0)))); + } + + /// Closed-interval semantics in `squash`: two intervals that touch at a + /// single endpoint merge into one. + #[test] + fn squash_merges_touching_endpoints() { + let a = vec![Bounds::new(Some(1.0), Some(5.0))]; + let b = vec![Bounds::new(Some(5.0), Some(10.0))]; + // `intersect_bounds_sets` runs the pairwise intersect through squash. + let result = intersect_bounds_sets( + &[Bounds::new(Some(0.0), Some(20.0))], + &a.iter().chain(b.iter()).cloned().collect::>(), + ); + assert_eq!(result, vec![Bounds::new(Some(1.0), Some(10.0))]); + } + + /// Fully-unbounded inputs are preserved through `combine_parallel`: + /// `(−∞, ∞) ⊕ (−∞, ∞)` is still `(−∞, ∞)`, not empty. + #[test] + fn combine_parallel_preserves_fully_unbounded() { + let a = Bounds::::new(None, None); + let b = Bounds::::new(None, None); + assert_eq!(a.combine_parallel(&b), vec![Bounds::new(None, None)]); + } +} From 6fac30f44c30561c4c4231553ae2281128b536b5 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 20 Apr 2026 14:51:22 +0200 Subject: [PATCH 10/18] Restrict GraphFormulaProvider visibility to crate Signed-off-by: Sahas Subramanian --- src/logical_meter/formula/graph_formula_provider.rs | 2 +- src/metric.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/logical_meter/formula/graph_formula_provider.rs b/src/logical_meter/formula/graph_formula_provider.rs index b9ca1fc..5fdcbfd 100644 --- a/src/logical_meter/formula/graph_formula_provider.rs +++ b/src/logical_meter/formula/graph_formula_provider.rs @@ -44,7 +44,7 @@ macro_rules! graph_formula_provider { /// The component graph exposes methods to retrieve `AggregationFormula`s and /// `CoalesceFormula`s for each of these metrics. This trait provides a /// way to generalize them. -pub trait GraphFormulaProvider: Sized { +pub(crate) trait GraphFormulaProvider: Sized { type MetricType: Metric; graph_formula_provider!( diff --git a/src/metric.rs b/src/metric.rs index f07232b..7d9b497 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -13,6 +13,7 @@ use crate::{ pub trait Metric: std::fmt::Display + std::fmt::Debug + Clone + Copy + PartialEq + Eq + Sync + 'static { + #[expect(private_bounds)] type FormulaType: FormulaSubscriber + formula::graph_formula_provider::GraphFormulaProvider + 'static; From 0980fe74475ff41a425f4b60b347aa616936fb83 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 20 Apr 2026 14:53:35 +0200 Subject: [PATCH 11/18] Add DcPower metric definition Signed-off-by: Sahas Subramanian --- src/metric.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/metric.rs b/src/metric.rs index 7d9b497..2d32b6b 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -58,6 +58,7 @@ macro_rules! define_metric { } define_metric! { + { name: DcPower, formula: AggregationFormula, quantity: Power }, { name: AcPowerActive, formula: AggregationFormula, quantity: Power }, { name: AcPowerReactive, formula: AggregationFormula, quantity: ReactivePower }, { name: AcCurrent, formula: AggregationFormula, quantity: Current }, From 7cf1dcb9ad8ceb05bbefc99dfaa7d3d10dee27ed Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 20 Apr 2026 17:22:46 +0200 Subject: [PATCH 12/18] Add state-override, silence-after-metrics options to MockComponent Signed-off-by: Sahas Subramanian --- src/client/test_utils.rs | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/src/client/test_utils.rs b/src/client/test_utils.rs index 07a0752..e184310 100644 --- a/src/client/test_utils.rs +++ b/src/client/test_utils.rs @@ -55,6 +55,14 @@ pub struct MockComponent { Option, Option, )>, + /// Overrides the state code reported in each telemetry sample. `None` + /// defaults to `Ready`. + state_code: Option, + /// When `true`, the mock stream task holds the sender open (silent) + /// after the `metrics` vec is exhausted instead of dropping it, which + /// prevents the client actor from reconnecting and replaying the same + /// data. Useful for testing missing-data timeouts. + silence_after_metrics: bool, start_ts: Option, start_instant: Option, } @@ -219,6 +227,20 @@ impl MockComponent { self } + /// Overrides the state code reported in each telemetry sample. + pub fn with_state(mut self, code: ElectricalComponentStateCode) -> Self { + self.state_code = Some(code); + self + } + + /// Keeps the telemetry stream open and silent after the configured + /// metrics are exhausted, so the client actor doesn't reconnect and + /// replay the data. Useful for testing missing-data timeouts. + pub fn with_silence_after_metrics(mut self) -> Self { + self.silence_after_metrics = true; + self + } + pub fn with_start_times(mut self, ts: SystemTime, instant: tokio::time::Instant) -> Self { self.start_ts = Some(ts); self.start_instant = Some(instant); @@ -333,6 +355,9 @@ impl MicrogridApiClient for MockMicrogridApiClient { if let Some(component) = component { if !component.metrics.is_empty() { let metrics = component.metrics.clone(); + let state_code = + component.state_code.unwrap_or(ElectricalComponentStateCode::Ready); + let silence_after_metrics = component.silence_after_metrics; tokio::spawn(async move { let dur = std::time::Duration::from_millis(200); let mut interval = tokio::time::interval(dur); @@ -425,7 +450,7 @@ impl MicrogridApiClient for MockMicrogridApiClient { // TODO: support sending errors state_snapshots: vec![ElectricalComponentStateSnapshot { origin_time: ts, - states: vec![ElectricalComponentStateCode::Ready as i32], + states: vec![state_code as i32], warnings: vec![], errors: vec![], }], @@ -435,6 +460,12 @@ impl MicrogridApiClient for MockMicrogridApiClient { break; } } + if silence_after_metrics { + // Hold the sender open indefinitely so the client + // actor doesn't see the stream end and reconnect. + let _keep_open = tx; + std::future::pending::<()>().await; + } }); } } From af46d0b1d342881c49b073968a21a24e756e63f5 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 20 Apr 2026 14:59:18 +0200 Subject: [PATCH 13/18] Add telemetry tracking for battery pool snapshots This returns `BatteryPoolSnapshot` instances with the latest telemetry from all batteries and attached inverters, grouped by whether they're healthy or not. Signed-off-by: Sahas Subramanian --- src/error.rs | 6 + src/microgrid.rs | 2 + src/microgrid/battery_pool.rs | 69 ++- src/microgrid/telemetry_tracker.rs | 12 + .../battery_pool_telemetry_tracker.rs | 465 ++++++++++++++++++ .../component_telemetry_tracker.rs | 105 ++++ ...nverter_battery_group_telemetry_tracker.rs | 195 ++++++++ 7 files changed, 852 insertions(+), 2 deletions(-) create mode 100644 src/microgrid/telemetry_tracker.rs create mode 100644 src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs create mode 100644 src/microgrid/telemetry_tracker/component_telemetry_tracker.rs create mode 100644 src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs diff --git a/src/error.rs b/src/error.rs index a95c860..8dd2e00 100644 --- a/src/error.rs +++ b/src/error.rs @@ -78,3 +78,9 @@ impl std::fmt::Display for Error { } impl std::error::Error for Error {} + +impl From for Error { + fn from(error: frequenz_microgrid_component_graph::Error) -> Self { + Self::component_graph_error(error.to_string()) + } +} diff --git a/src/microgrid.rs b/src/microgrid.rs index 024cca6..9afe849 100644 --- a/src/microgrid.rs +++ b/src/microgrid.rs @@ -6,6 +6,8 @@ mod battery_pool; pub use battery_pool::BatteryPool; +pub(crate) mod telemetry_tracker; + use crate::{Error, LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle}; /// A high-level interface for the Microgrid API. diff --git a/src/microgrid/battery_pool.rs b/src/microgrid/battery_pool.rs index ff8ada8..4a378ea 100644 --- a/src/microgrid/battery_pool.rs +++ b/src/microgrid/battery_pool.rs @@ -3,15 +3,30 @@ //! Representation of a pool of batteries in the microgrid. -use std::collections::BTreeSet; +use tokio::sync::broadcast; -use crate::{Error, Formula, LogicalMeterHandle, MicrogridClientHandle, metric, quantity::Power}; +use std::collections::{BTreeSet, HashSet}; +use std::time::Duration; + +use crate::{ + Bounds, Error, Formula, LogicalMeterHandle, MicrogridClientHandle, + client::{ + ElectricalComponentCategory, + proto::common::microgrid::electrical_components::ElectricalComponentStateCode, + }, + metric, + microgrid::telemetry_tracker::battery_pool_telemetry_tracker::{ + BatteryPoolSnapshot, BatteryPoolTelemetryTracker, + }, + quantity::Power, +}; /// An interface for abstracting over a pool of batteries in the microgrid. pub struct BatteryPool { component_ids: Option>, client: MicrogridClientHandle, logical_meter: LogicalMeterHandle, + snapshot_tx: Option>, } impl BatteryPool { @@ -26,6 +41,21 @@ impl BatteryPool { component_ids, client, logical_meter, + snapshot_tx: None, + bounds_tx: None, + } + } + + pub(crate) fn get_battery_ids(&self) -> BTreeSet { + if let Some(ids) = &self.component_ids { + ids.clone() + } else { + self.logical_meter + .graph() + .components() + .filter(|c| c.category() == ElectricalComponentCategory::Battery) + .map(|c| c.id) + .collect() } } @@ -34,4 +64,39 @@ impl BatteryPool { self.logical_meter .battery::(self.component_ids.clone()) } + + /// Returns a receiver for a stream of [`BatteryPoolSnapshot`] values, + /// each reflecting the latest component telemetry partitioned into + /// healthy and unhealthy sets. + /// + /// Reuses the running tracker if one exists and still has active receivers + /// (including any held by a bounds tracker); otherwise starts a new one. + pub(crate) fn telemetry_snapshots(&mut self) -> broadcast::Receiver { + if let Some(tx) = self + .snapshot_tx + .as_ref() + .and_then(broadcast::WeakSender::upgrade) + && tx.receiver_count() > 0 + { + return tx.subscribe(); + } + let (tx, rx) = broadcast::channel(100); + self.snapshot_tx = Some(tx.downgrade()); + let tracker = BatteryPoolTelemetryTracker::new( + self.get_battery_ids(), + Duration::from_secs(10), + HashSet::from([ + ElectricalComponentStateCode::Ready, + ElectricalComponentStateCode::Standby, + ElectricalComponentStateCode::Charging, + ElectricalComponentStateCode::Discharging, + ElectricalComponentStateCode::RelayClosed, + ]), + self.client.clone(), + self.logical_meter.clone(), + tx, + ); + tokio::spawn(tracker.run()); + rx + } } diff --git a/src/microgrid/telemetry_tracker.rs b/src/microgrid/telemetry_tracker.rs new file mode 100644 index 0000000..850f37d --- /dev/null +++ b/src/microgrid/telemetry_tracker.rs @@ -0,0 +1,12 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! Telemetry trackers for microgrid component pools. +//! +//! Each tracker watches a set of components and emits a stream of snapshots +//! that partition the components into healthy and unhealthy sets, carrying +//! the latest telemetry sample for each. + +pub(crate) mod battery_pool_telemetry_tracker; +pub(crate) mod component_telemetry_tracker; +pub(crate) mod inverter_battery_group_telemetry_tracker; diff --git a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs new file mode 100644 index 0000000..3a47272 --- /dev/null +++ b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs @@ -0,0 +1,465 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! A telemetry tracker for a pool of batteries and their associated inverters. + +use std::{ + collections::{BTreeSet, HashMap, HashSet}, + time::Duration, +}; + +use crate::{ + Error, LogicalMeterHandle, MicrogridClientHandle, + client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode, + microgrid::telemetry_tracker::inverter_battery_group_telemetry_tracker::{ + InverterBatteryGroupStatus, InverterBatteryGroupTelemetryTracker, + }, +}; + +/// A set of inverters and batteries wired together in an `MxN` configuration: +/// M inverters in parallel on the AC side, N batteries in parallel on the DC +/// side, with the inverter side in series with the battery side. +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub(crate) struct InverterBatteryGroup { + pub inverter_ids: BTreeSet, + pub battery_ids: BTreeSet, +} + +impl InverterBatteryGroup { + pub(crate) fn new(inverter_ids: BTreeSet, battery_ids: BTreeSet) -> Self { + Self { + inverter_ids, + battery_ids, + } + } +} + +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct BatteryPoolSnapshot(HashMap); + +impl BatteryPoolSnapshot { + pub(crate) fn groups(&self) -> &HashMap { + &self.0 + } +} + +/// A tracker that watches every inverter-battery group in the pool and emits +/// a [`BatteryPoolSnapshot`] whenever any component's telemetry or health +/// classification changes. +#[derive(Clone)] +pub(crate) struct BatteryPoolTelemetryTracker { + component_ids: BTreeSet, + component_pool_status_tx: tokio::sync::broadcast::Sender, + missing_data_tolerance: Duration, + healthy_state_codes: HashSet, + client: MicrogridClientHandle, + logical_meter: LogicalMeterHandle, +} + +impl BatteryPoolTelemetryTracker { + pub(crate) fn new( + component_ids: BTreeSet, + missing_data_tolerance: Duration, + healthy_state_codes: HashSet, + client: MicrogridClientHandle, + logical_meter: LogicalMeterHandle, + component_pool_status_tx: tokio::sync::broadcast::Sender, + ) -> Self { + Self { + component_ids, + component_pool_status_tx, + missing_data_tolerance, + healthy_state_codes, + client, + logical_meter, + } + } + + pub(crate) fn get_inverter_battery_groups(&self) -> Result, Error> { + if self.component_ids.is_empty() { + let e = "No component IDs provided for BatteryPoolTelemetryTracker".to_string(); + tracing::error!("{}", e); + return Err(Error::component_data_error(e)); + } + let mut unvisited_batteries = self.component_ids.clone(); + let mut groups = Vec::new(); + + let graph = self.logical_meter.graph(); + + while let Some(battery_id) = unvisited_batteries.iter().next().cloned() { + let group_inverters = graph + .predecessors(battery_id)? + .filter(|c| c.category() == crate::client::ElectricalComponentCategory::Inverter) + .map(|c| c.id) + .collect::>(); + + if group_inverters.is_empty() { + let e = format!("Battery {} is not connected to any inverters.", battery_id); + tracing::error!("{}", e); + return Err(Error::component_data_error(e)); + } + + let mut group_batteries = BTreeSet::new(); + for inverter_id in &group_inverters { + let connected_batteries = graph + .successors(*inverter_id)? + .map(|c| c.id) + .collect::>(); + + group_batteries.extend(connected_batteries); + } + + // Ensure that all group batteries are part of the request. + if !group_batteries.is_subset(&self.component_ids) { + let e = format!( + concat!( + "Inverter {} is connected to batteries {:?} which are not all in the ", + "requested component IDs {:?}" + ), + group_inverters.iter().next().unwrap(), + group_batteries, + self.component_ids + ); + + tracing::error!("{}", e); + return Err(Error::component_data_error(e)); + } + + // Remove the group batteries from the unvisited set + unvisited_batteries.retain(|b| !group_batteries.contains(b)); + + // Ensure that group batteries are only connect to group inverters + for battery_id in &group_batteries { + let connected_inverters = graph + .predecessors(*battery_id)? + .filter(|c| { + c.category() == crate::client::ElectricalComponentCategory::Inverter + }) + .map(|c| c.id) + .collect::>(); + + if !connected_inverters.is_subset(&group_inverters) { + let e = format!( + "Battery {} is connected to inverters {:?} which are not all in the same group {:?}", + battery_id, connected_inverters, group_inverters + ); + tracing::error!("{}", e); + return Err(Error::component_data_error(e)); + } + } + + groups.push(InverterBatteryGroup::new(group_inverters, group_batteries)); + } + + Ok(groups) + } + + pub async fn run(self) -> Result<(), Error> { + let mut inverter_battery_group_data = HashMap::new(); + + let inverter_battery_group_ids = self.get_inverter_battery_groups()?; + + let (component_status_tx, mut component_status_rx) = tokio::sync::mpsc::channel(100); + for inverter_battery_group in inverter_battery_group_ids { + let tracker = InverterBatteryGroupTelemetryTracker::new( + inverter_battery_group, + self.missing_data_tolerance, + self.healthy_state_codes.clone(), + self.client.clone(), + component_status_tx.clone(), + ); + // Spawn a task for each group telemetry tracker + tokio::spawn(tracker.run()); + } + + // Drop the original sender so that the channel will close when all + // trackers finish. + drop(component_status_tx); + + let mut interval = tokio::time::interval(Duration::from_millis(200)); + let mut last_sent_status = None; + + loop { + tokio::select! { + Some((group_ids, status)) = component_status_rx.recv() => { + inverter_battery_group_data.insert(group_ids, status); + }, + _ = interval.tick() => { + if last_sent_status.as_ref() == Some(&inverter_battery_group_data) { + continue; // Skip sending if the status hasn't changed + } + if let Err(e) = self.component_pool_status_tx.send( + BatteryPoolSnapshot(inverter_battery_group_data.clone()) + ) + { + tracing::error!("Failed to send pool snapshot: {}", e); + break; + } + last_sent_status = Some(inverter_battery_group_data.clone()); + }, + else => break, + } + } + + let err = format!( + "BatteryPoolTelemetryTracker (component IDs {:?}) stopped receiving group telemetry updates.", + self.component_ids + ); + + tracing::error!("{}", err); + + Err(Error::component_data_error(err)) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use chrono::TimeDelta; + + use super::BatteryPoolSnapshot; + use crate::{ + LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, + client::{ + proto::common::microgrid::electrical_components::ElectricalComponentStateCode, + test_utils::{MockComponent, MockMicrogridApiClient}, + }, + microgrid::{ + battery_pool::BatteryPool, + telemetry_tracker::{ + battery_pool_telemetry_tracker::InverterBatteryGroup, + inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus, + }, + }, + }; + + impl BatteryPoolSnapshot { + pub(crate) fn from_groups( + groups: HashMap, + ) -> Self { + Self(groups) + } + } + async fn new_pool(graph: MockComponent) -> BatteryPool { + let api = MockMicrogridApiClient::new(graph); + let client = MicrogridClientHandle::new_from_client(api); + let lm = LogicalMeterHandle::try_new( + client.clone(), + LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()), + ) + .await + .unwrap(); + BatteryPool::new(None, client, lm) + } + + /// Drains `rx` for up to `steps` * 100ms of simulated time, returning the + /// last snapshot seen. + async fn last_snapshot( + rx: &mut tokio::sync::broadcast::Receiver, + steps: u32, + ) -> BatteryPoolSnapshot { + let mut last = None; + for _ in 0..steps { + tokio::time::advance(std::time::Duration::from_millis(100)).await; + while let Ok(snap) = rx.try_recv() { + last = Some(snap); + } + } + last.expect("no snapshot received") + } + + #[tokio::test(start_paused = true)] + async fn single_group_reaches_healthy_state() { + // grid → meter → battery_inverter(3) → battery(4) + let mut pool = new_pool(MockComponent::grid(1).with_children(vec![ + MockComponent::meter(2).with_children(vec![ + MockComponent::battery_inverter(3) + .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]) + .with_children(vec![ + MockComponent::battery(4) + .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), + ]), + ]), + ])) + .await; + + let mut rx = pool.telemetry_snapshots(); + let snap = last_snapshot(&mut rx, 10).await; + + let groups = snap.groups(); + assert_eq!( + groups.len(), + 1, + "expected exactly one inverter-battery group" + ); + + let (group, status) = groups.iter().next().unwrap(); + assert_eq!(group.inverter_ids, [3].into()); + assert_eq!(group.battery_ids, [4].into()); + assert!(status.healthy_inverters.contains_key(&3)); + assert!(status.healthy_batteries.contains_key(&4)); + assert!(status.unhealthy_inverters.is_empty()); + assert!(status.unhealthy_batteries.is_empty()); + } + + #[tokio::test(start_paused = true)] + async fn two_disjoint_groups_both_appear_in_snapshot() { + // grid → meter → [battery_inverter(3)→battery(4), battery_inverter(5)→battery(6)] + let mut pool = new_pool(MockComponent::grid(1).with_children(vec![ + MockComponent::meter(2).with_children(vec![ + MockComponent::battery_inverter(3) + .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]) + .with_children(vec![ + MockComponent::battery(4) + .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), + ]), + MockComponent::battery_inverter(5) + .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]) + .with_children(vec![ + MockComponent::battery(6) + .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), + ]), + ]), + ])) + .await; + + let mut rx = pool.telemetry_snapshots(); + let snap = last_snapshot(&mut rx, 10).await; + + let groups = snap.groups(); + assert_eq!(groups.len(), 2); + + let all_inverters: std::collections::BTreeSet = groups + .keys() + .flat_map(|g| g.inverter_ids.iter().copied()) + .collect(); + let all_batteries: std::collections::BTreeSet = groups + .keys() + .flat_map(|g| g.battery_ids.iter().copied()) + .collect(); + assert_eq!(all_inverters, [3, 5].into()); + assert_eq!(all_batteries, [4, 6].into()); + + for status in groups.values() { + assert!(status.unhealthy_inverters.is_empty()); + assert!(status.unhealthy_batteries.is_empty()); + } + } + + #[tokio::test(start_paused = true)] + async fn calling_telemetry_snapshots_twice_reuses_sender() { + let mut pool = new_pool(MockComponent::grid(1).with_children(vec![ + MockComponent::meter(2).with_children(vec![ + MockComponent::battery_inverter(3) + .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]) + .with_children(vec![ + MockComponent::battery(4) + .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), + ]), + ]), + ])) + .await; + + let mut rx1 = pool.telemetry_snapshots(); + let mut rx2 = pool.telemetry_snapshots(); + + // Advance so both receivers see at least one snapshot. + tokio::time::advance(std::time::Duration::from_millis(300)).await; + + let snap1 = rx1.recv().await.unwrap(); + let snap2 = rx2.recv().await.unwrap(); + assert_eq!( + snap1, snap2, + "both subscriptions should observe the same snapshot" + ); + } + + #[tokio::test(start_paused = true)] + async fn components_become_unhealthy_when_data_stops() { + // Both components emit only a handful of samples and then go silent; + // the stream stays open so the client actor doesn't reconnect and + // resupply data. + let mut pool = new_pool(MockComponent::grid(1).with_children(vec![ + MockComponent::meter(2).with_children(vec![ + MockComponent::battery_inverter(3) + .with_power(vec![0.0, 0.0, 0.0]) + .with_silence_after_metrics() + .with_children(vec![ + MockComponent::battery(4) + .with_power(vec![0.0, 0.0, 0.0]) + .with_silence_after_metrics(), + ]), + ]), + ])) + .await; + + let mut rx = pool.telemetry_snapshots(); + + // First: drain past the healthy phase and confirm components reach a + // healthy state (3 samples over ~600ms). + let healthy = last_snapshot(&mut rx, 10).await; + let (_, status) = healthy.groups().iter().next().unwrap(); + assert!( + status.healthy_inverters.contains_key(&3) && status.healthy_batteries.contains_key(&4), + "expected components to go healthy after initial samples, got {:?}", + status + ); + + // Now advance well past the 10s missing-data tolerance — the + // component telemetry trackers should fire their interval and + // reclassify both components as unhealthy. + tokio::time::advance(std::time::Duration::from_secs(15)).await; + let unhealthy = last_snapshot(&mut rx, 5).await; + + let (_, status) = unhealthy.groups().iter().next().unwrap(); + assert!( + status.healthy_inverters.is_empty(), + "inverter should be unhealthy after data stops, got healthy set {:?}", + status.healthy_inverters.keys() + ); + assert!( + status.healthy_batteries.is_empty(), + "battery should be unhealthy after data stops, got healthy set {:?}", + status.healthy_batteries.keys() + ); + assert!(status.unhealthy_inverters.contains_key(&3)); + assert!(status.unhealthy_batteries.contains_key(&4)); + } + + #[tokio::test(start_paused = true)] + async fn component_with_bad_state_is_unhealthy() { + // Battery reports an Error state — it must land in the unhealthy + // set even though samples keep arriving. + let mut pool = new_pool(MockComponent::grid(1).with_children(vec![ + MockComponent::meter(2).with_children(vec![ + MockComponent::battery_inverter(3) + .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]) + .with_children(vec![ + MockComponent::battery(4) + .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]) + .with_state(ElectricalComponentStateCode::Error), + ]), + ]), + ])) + .await; + + let mut rx = pool.telemetry_snapshots(); + let snap = last_snapshot(&mut rx, 10).await; + + let (_, status) = snap.groups().iter().next().unwrap(); + assert!( + status.healthy_inverters.contains_key(&3), + "inverter with Ready state should be healthy" + ); + assert!( + !status.healthy_batteries.contains_key(&4), + "battery with Error state should not be in healthy set" + ); + assert!( + status.unhealthy_batteries.contains_key(&4), + "battery with Error state should be in unhealthy set, got {:?}", + status + ); + } +} diff --git a/src/microgrid/telemetry_tracker/component_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/component_telemetry_tracker.rs new file mode 100644 index 0000000..3caa84f --- /dev/null +++ b/src/microgrid/telemetry_tracker/component_telemetry_tracker.rs @@ -0,0 +1,105 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! A tracker that watches an electrical component's telemetry stream and +//! classifies it as healthy or unhealthy based on its state codes and the +//! freshness of the samples. + +use std::{collections::HashSet, time::Duration}; + +use tokio::{ + select, + sync::{broadcast, mpsc}, +}; + +use crate::client::proto::common::microgrid::electrical_components::{ + ElectricalComponentStateCode, ElectricalComponentTelemetry, +}; + +pub(crate) struct ComponentTelemetryTracker { + component_id: u64, + missing_data_tolerance: Duration, + component_data_rx: broadcast::Receiver, + component_status_tx: mpsc::Sender, + healthy_state_codes: HashSet, +} + +#[derive(PartialEq, Clone, Debug)] +pub(crate) enum ComponentHealthStatus { + Healthy(u64, ElectricalComponentTelemetry), + Unhealthy(u64, Option), +} + +impl ComponentTelemetryTracker { + pub(super) fn new( + component_id: u64, + missing_data_tolerance: Duration, + healthy_state_codes: HashSet, + component_data_rx: broadcast::Receiver, + component_status_tx: mpsc::Sender, + ) -> Self { + Self { + component_id, + missing_data_tolerance, + component_data_rx, + component_status_tx, + healthy_state_codes, + } + } + + fn state_from_data(&self, data: ElectricalComponentTelemetry) -> ComponentHealthStatus { + for state in data.state_snapshots.iter() { + if !state.errors.is_empty() { + return ComponentHealthStatus::Unhealthy(self.component_id, Some(data)); + } + for state in state.states.iter() { + let Ok(state) = ElectricalComponentStateCode::try_from(*state) else { + tracing::warn!( + "Component {} has an invalid state code: {}", + self.component_id, + state + ); + return ComponentHealthStatus::Unhealthy(self.component_id, Some(data)); + }; + if !self.healthy_state_codes.contains(&state) { + return ComponentHealthStatus::Unhealthy(self.component_id, Some(data)); + } + } + } + ComponentHealthStatus::Healthy(data.electrical_component_id, data) + } + + pub async fn run(mut self) { + let mut interval = tokio::time::interval(self.missing_data_tolerance); + loop { + select! { + component_data = self.component_data_rx.recv() => { + match component_data { + Ok(data) => { + // Reset the interval timer on receiving valid data + interval.reset(); + let status = self.state_from_data(data); + if let Err(e) = self.component_status_tx.send(status).await { + tracing::error!("Failed to send component status: {}", e); + } + } + Err(broadcast::error::RecvError::Lagged(_)) => { + continue; + } + Err(broadcast::error::RecvError::Closed) => { + drop(self.component_status_tx); + break; + } + } + } + _ = interval.tick() => { + // If we reach here, it means no data was received within the tolerance period + let status = ComponentHealthStatus::Unhealthy(self.component_id, None); + if let Err(e) = self.component_status_tx.send(status).await { + tracing::error!("Failed to send component status: {}", e); + } + } + } + } + } +} diff --git a/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs new file mode 100644 index 0000000..0d3085c --- /dev/null +++ b/src/microgrid/telemetry_tracker/inverter_battery_group_telemetry_tracker.rs @@ -0,0 +1,195 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! A telemetry tracker for an inverter-battery group in the microgrid, which +//! consists of a set of inverters and their associated batteries, connected +//! in MxN configuration. Emits snapshots that partition the group's +//! components into healthy and unhealthy sets, each annotated with the +//! latest telemetry sample. + +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; + +use tokio::select; + +use crate::{ + Error, MicrogridClientHandle, + client::proto::common::microgrid::electrical_components::{ + ElectricalComponentStateCode, ElectricalComponentTelemetry, + }, + microgrid::telemetry_tracker::battery_pool_telemetry_tracker::InverterBatteryGroup, +}; + +use super::component_telemetry_tracker::{ComponentHealthStatus, ComponentTelemetryTracker}; + +/// A telemetry tracker for an inverter-battery group, which consists of a set +/// of inverters and their associated batteries, connected in MxN +/// configuration. +/// +/// On every change, the tracker emits an [`InverterBatteryGroupStatus`] which +/// partitions the group's components into healthy and unhealthy sets and +/// carries the latest [`ElectricalComponentTelemetry`] sample seen for each +/// component. Downstream consumers (e.g. the bounds tracker) can therefore +/// read both the health state and the most recent metric samples from a +/// single subscription without re-subscribing to the telemetry streams. +#[derive(Clone)] +pub(crate) struct InverterBatteryGroupTelemetryTracker { + inverter_battery_group: InverterBatteryGroup, + status_tx: tokio::sync::mpsc::Sender<(InverterBatteryGroup, InverterBatteryGroupStatus)>, + missing_data_tolerance: Duration, + healthy_state_codes: HashSet, + client: MicrogridClientHandle, +} + +/// A snapshot of an inverter-battery group's components, partitioned by health +/// status and annotated with the latest telemetry sample for each component. +/// +/// The `healthy_*` maps hold the most recent [`ElectricalComponentTelemetry`] +/// observed for each healthy component. The `unhealthy_*` maps hold the last +/// telemetry observed before the component became unhealthy, or `None` if no +/// sample has been received yet. Consumers can use the telemetry (including +/// per-metric bounds) directly without subscribing to the raw streams again. +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct InverterBatteryGroupStatus { + pub healthy_inverters: HashMap, + pub healthy_batteries: HashMap, + pub unhealthy_inverters: HashMap>, + pub unhealthy_batteries: HashMap>, +} + +impl InverterBatteryGroupTelemetryTracker { + pub(crate) fn new( + inverter_battery_group: InverterBatteryGroup, + missing_data_tolerance: Duration, + healthy_state_codes: HashSet, + client: MicrogridClientHandle, + status_tx: tokio::sync::mpsc::Sender<(InverterBatteryGroup, InverterBatteryGroupStatus)>, + ) -> Self { + Self { + inverter_battery_group, + status_tx, + missing_data_tolerance, + healthy_state_codes, + client, + } + } + + pub async fn run(self) -> Result<(), Error> { + let mut healthy_inverters = HashMap::new(); + let mut unhealthy_inverters = HashMap::new(); + let mut healthy_batteries = HashMap::new(); + let mut unhealthy_batteries = HashMap::new(); + + let (inverter_status_tx, mut inverter_status_rx) = tokio::sync::mpsc::channel(100); + + for &inverter_id in &self.inverter_battery_group.inverter_ids { + let component_data_stream = self + .client + .receive_electrical_component_telemetry_stream(inverter_id) + .await?; + let tracker = ComponentTelemetryTracker::new( + inverter_id, + self.missing_data_tolerance, + self.healthy_state_codes.clone(), + component_data_stream, + inverter_status_tx.clone(), + ); + // Spawn a task for each component telemetry tracker + tokio::spawn(async move { + tracker.run().await; + }); + // Initially mark the component as unhealthy until we see data. + unhealthy_inverters.insert(inverter_id, None); + } + + let (battery_status_tx, mut battery_status_rx) = tokio::sync::mpsc::channel(100); + + for &battery_id in &self.inverter_battery_group.battery_ids { + let component_data_stream = self + .client + .receive_electrical_component_telemetry_stream(battery_id) + .await?; + let tracker = ComponentTelemetryTracker::new( + battery_id, + self.missing_data_tolerance, + self.healthy_state_codes.clone(), + component_data_stream, + battery_status_tx.clone(), + ); + // Spawn a task for each component telemetry tracker + tokio::spawn(async move { + tracker.run().await; + }); + // Initially mark the component as unhealthy until we see data. + unhealthy_batteries.insert(battery_id, None); + } + + // Drop the original senders in the main task to allow the component + // trackers to close the channels when they finish, which will signal + // the main loop to stop. + drop(inverter_status_tx); + drop(battery_status_tx); + + loop { + select! { + inverter_status = inverter_status_rx.recv() => { + let Some(inverter_status) = inverter_status else { + let e = String::from("Inverter telemetry tracker stopped receiving status updates."); + tracing::error!("{}", e); + return Err(Error::component_data_error(e)); + }; + match inverter_status { + ComponentHealthStatus::Healthy(component_id, data) => { + healthy_inverters.insert(component_id, data); + unhealthy_inverters.remove(&component_id); + } + ComponentHealthStatus::Unhealthy(component_id, data) => { + unhealthy_inverters.insert(component_id, data); + healthy_inverters.remove(&component_id); + } + } + }, + battery_status = battery_status_rx.recv() => { + let Some(battery_status) = battery_status else { + let e = String::from( + "Battery telemetry tracker stopped receiving status updates." + ); + tracing::error!("{}", e); + return Err(Error::component_data_error(e)); + }; + match battery_status { + ComponentHealthStatus::Healthy(component_id, data) => { + healthy_batteries.insert(component_id, data); + unhealthy_batteries.remove(&component_id); + } + ComponentHealthStatus::Unhealthy(component_id, data) => { + unhealthy_batteries.insert(component_id, data); + healthy_batteries.remove(&component_id); + } + } + } + } + if let Err(e) = self + .status_tx + .send(( + self.inverter_battery_group.clone(), + InverterBatteryGroupStatus { + healthy_inverters: healthy_inverters.clone(), + healthy_batteries: healthy_batteries.clone(), + unhealthy_inverters: unhealthy_inverters.clone(), + unhealthy_batteries: unhealthy_batteries.clone(), + }, + )) + .await + { + tracing::error!("Failed to send inverter-battery group status: {}", e); + return Err(Error::component_data_error(format!( + "Failed to send inverter-battery group status: {}", + e + ))); + } + } + } +} From 00dd2bb53ac4cd6abe2d37c180b9bff9f5809514 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 20 Apr 2026 15:01:00 +0200 Subject: [PATCH 14/18] Add battery pool bounds tracker Signed-off-by: Sahas Subramanian --- examples/bounds.rs | 34 ++ src/microgrid.rs | 1 + src/microgrid/battery_bounds_tracker.rs | 520 ++++++++++++++++++++++++ src/microgrid/battery_pool.rs | 34 +- 4 files changed, 587 insertions(+), 2 deletions(-) create mode 100644 examples/bounds.rs create mode 100644 src/microgrid/battery_bounds_tracker.rs diff --git a/examples/bounds.rs b/examples/bounds.rs new file mode 100644 index 0000000..df943d0 --- /dev/null +++ b/examples/bounds.rs @@ -0,0 +1,34 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +use chrono::TimeDelta; +use frequenz_microgrid::Microgrid; +use frequenz_microgrid::{Error, LogicalMeterConfig}; +use tracing_subscriber::{ + EnvFilter, + fmt::{self}, + prelude::*, +}; + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::registry() + .with(EnvFilter::new("info,frequenz_microgrid=warn")) + .with(fmt::layer().with_file(true).with_line_number(true)) + .init(); + + let microgrid = Microgrid::try_new( + "http://[::1]:8800", + LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()), + ) + .await?; + + let mut battery_pool = microgrid.battery_pool(None); + let mut bounds_rx = battery_pool.power_bounds(); + + while let Ok(bounds) = bounds_rx.recv().await { + tracing::info!("Battery pool active-power bounds: {:?}", bounds); + } + + Ok(()) +} diff --git a/src/microgrid.rs b/src/microgrid.rs index 9afe849..7432e66 100644 --- a/src/microgrid.rs +++ b/src/microgrid.rs @@ -3,6 +3,7 @@ //! High-level interface for the Microgrid API. +mod battery_bounds_tracker; mod battery_pool; pub use battery_pool::BatteryPool; diff --git a/src/microgrid/battery_bounds_tracker.rs b/src/microgrid/battery_bounds_tracker.rs new file mode 100644 index 0000000..e6d01d6 --- /dev/null +++ b/src/microgrid/battery_bounds_tracker.rs @@ -0,0 +1,520 @@ +// License: MIT +// Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +//! Bounds tracker for pools of microgrid components. +//! +//! Subscribes to a [`BatteryPoolSnapshot`] stream and, for each update, extracts +//! the bounds of a target metric from every healthy component and aggregates +//! them into a single pool-level set of bounds. +//! +//! Aggregation follows the physical topology of an inverter-battery group in +//! an `MxN` configuration (M inverters wired in parallel to N batteries wired +//! in parallel, with the inverter side in series with the battery side): +//! +//! * Healthy inverters within a group are in parallel — their bounds are +//! added together. +//! * Healthy batteries within a group are in parallel — their bounds are +//! added together. +//! * The inverter side and battery side of a group are in series — their +//! aggregated bounds are intersected. +//! * Groups within a pool are in parallel — their bounds are added together. + +use std::collections::HashMap; +use std::marker::PhantomData; + +use tokio::sync::broadcast; + +use crate::bounds::{combine_parallel_sets, intersect_bounds_sets}; +use crate::client::proto::common::{ + metrics::Bounds as PbBounds, microgrid::electrical_components::ElectricalComponentTelemetry, +}; +use crate::microgrid::telemetry_tracker::battery_pool_telemetry_tracker::BatteryPoolSnapshot; +use crate::{Bounds, metric::Metric}; + +/// Tracks and aggregates power bounds for a battery pool. +/// +/// `InverterM` is the metric used to read bounds from inverters (e.g. +/// `AcPowerActive`); `BatteryM` is the metric used to read bounds from +/// batteries (e.g. `DcPower`). Both must share the same `QuantityType` so +/// their bounds can be intersected and summed. +pub(crate) struct BatteryPoolBoundsTracker { + pool_status_rx: broadcast::Receiver, + pool_bounds_tx: broadcast::Sender>>, + _marker: PhantomData<(InverterM, BatteryM)>, +} + +impl BatteryPoolBoundsTracker +where + InverterM: Metric, + BatteryM: Metric, + Bounds: From, +{ + pub(crate) fn new( + pool_status_rx: broadcast::Receiver, + pool_bounds_tx: broadcast::Sender>>, + ) -> Self { + Self { + pool_status_rx, + pool_bounds_tx, + _marker: PhantomData, + } + } + + pub(crate) async fn run(mut self) { + loop { + match self.pool_status_rx.recv().await { + Ok(pool_status) => { + let bounds = Self::compute_pool_bounds(&pool_status); + if self.pool_bounds_tx.send(bounds).is_err() { + tracing::debug!( + "No receivers for {}/{} bounds tracker; shutting down.", + InverterM::str_name(), + BatteryM::str_name(), + ); + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!( + "{}/{} bounds tracker lagged by {n} pool status updates.", + InverterM::str_name(), + BatteryM::str_name(), + ); + } + Err(broadcast::error::RecvError::Closed) => { + tracing::error!( + "Pool status channel closed; {}/{} bounds tracker shutting down.", + InverterM::str_name(), + BatteryM::str_name(), + ); + break; + } + } + } + } + + fn compute_pool_bounds(status: &BatteryPoolSnapshot) -> Vec> { + status + .groups() + .values() + .map(|group| { + let inverter_bounds = aggregate_parallel::(&group.healthy_inverters); + let battery_bounds = aggregate_parallel::(&group.healthy_batteries); + intersect_bounds_sets(&inverter_bounds, &battery_bounds) + }) + .fold(Vec::new(), |acc, group_bounds| { + combine_parallel_sets(&acc, &group_bounds) + }) + } +} + +/// Combines the bounds of every component in the map as if they were wired +/// in parallel. Components that don't report the metric `M` are skipped. +fn aggregate_parallel( + components: &HashMap, +) -> Vec> +where + Bounds: From, +{ + components + .values() + .filter_map(extract_metric_bounds::) + .fold(Vec::new(), |acc, bounds| combine_parallel_sets(&acc, &bounds)) +} + +fn extract_metric_bounds( + telemetry: &ElectricalComponentTelemetry, +) -> Option>> +where + Bounds: From, +{ + telemetry.metric_samples.iter().find_map(|sample| { + (sample.metric == M::METRIC as i32).then(|| { + sample + .bounds + .iter() + .map(|b| Bounds::from(*b)) + .collect::>() + }) + }) +} + +#[cfg(test)] +mod tests { + use std::collections::{BTreeSet, HashMap}; + + use crate::Bounds; + use crate::client::proto::common::metrics::{ + Bounds as PbBounds, Metric as MetricPb, MetricSample, + }; + use crate::client::proto::common::microgrid::electrical_components::ElectricalComponentTelemetry; + use crate::metric::AcPowerActive; + use crate::microgrid::telemetry_tracker::battery_pool_telemetry_tracker::{ + BatteryPoolSnapshot, InverterBatteryGroup, + }; + use crate::microgrid::telemetry_tracker::inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus; + use crate::quantity::Power; + + use super::BatteryPoolBoundsTracker; + + fn telem_with_power_bounds( + id: u64, + bounds: Vec<(Option, Option)>, + ) -> ElectricalComponentTelemetry { + ElectricalComponentTelemetry { + electrical_component_id: id, + metric_samples: vec![MetricSample { + sample_time: None, + metric: MetricPb::AcPowerActive as i32, + value: None, + bounds: bounds + .into_iter() + .map(|(lower, upper)| PbBounds { lower, upper }) + .collect(), + ..Default::default() + }], + ..Default::default() + } + } + + fn group(inverter_ids: &[u64], battery_ids: &[u64]) -> InverterBatteryGroup { + InverterBatteryGroup::new( + inverter_ids.iter().copied().collect::>(), + battery_ids.iter().copied().collect::>(), + ) + } + + fn status( + groups: Vec<(InverterBatteryGroup, InverterBatteryGroupStatus)>, + ) -> BatteryPoolSnapshot { + BatteryPoolSnapshot::from_groups(groups.into_iter().collect()) + } + + #[test] + fn single_group_intersects_inverter_and_battery_bounds() { + let g = group(&[10], &[20]); + let mut healthy_inverters = HashMap::new(); + healthy_inverters.insert( + 10, + telem_with_power_bounds( + 10, + vec![(Some(-1000.0), Some(-200.0)), (Some(200.0), Some(1000.0))], + ), + ); + let mut healthy_batteries = HashMap::new(); + healthy_batteries.insert( + 20, + telem_with_power_bounds(20, vec![(Some(-500.0), Some(800.0))]), + ); + + let snapshot = status(vec![( + g, + InverterBatteryGroupStatus { + healthy_inverters, + healthy_batteries, + unhealthy_inverters: HashMap::new(), + unhealthy_batteries: HashMap::new(), + }, + )]); + + let bounds = + BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + assert_eq!( + bounds, + vec![ + Bounds::new( + Some(Power::from_watts(-500.0)), + Some(Power::from_watts(-200.0)) + ), + Bounds::new( + Some(Power::from_watts(200.0)), + Some(Power::from_watts(800.0)) + ) + ] + ); + } + + #[test] + fn parallel_inverters_add_within_group() { + let g = group(&[10, 11], &[20]); + let mut healthy_inverters = HashMap::new(); + healthy_inverters.insert( + 10, + telem_with_power_bounds(10, vec![(Some(-1000.0), Some(1000.0))]), + ); + healthy_inverters.insert( + 11, + telem_with_power_bounds(11, vec![(Some(-2000.0), Some(2000.0))]), + ); + let mut healthy_batteries = HashMap::new(); + // Wide battery bounds so the intersect doesn't clip + healthy_batteries.insert( + 20, + telem_with_power_bounds(20, vec![(Some(-10_000.0), Some(10_000.0))]), + ); + + let snapshot = status(vec![( + g, + InverterBatteryGroupStatus { + healthy_inverters, + healthy_batteries, + unhealthy_inverters: HashMap::new(), + unhealthy_batteries: HashMap::new(), + }, + )]); + + let bounds = + BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + assert_eq!( + bounds, + vec![Bounds::new( + Some(Power::from_watts(-3000.0)), + Some(Power::from_watts(3000.0)) + )] + ); + } + + #[test] + fn multiple_groups_add_across_pool() { + let g1 = group(&[10], &[20]); + let mut h_inv_1 = HashMap::new(); + h_inv_1.insert( + 10, + telem_with_power_bounds(10, vec![(Some(-1000.0), Some(1000.0))]), + ); + let mut h_bat_1 = HashMap::new(); + h_bat_1.insert( + 20, + telem_with_power_bounds(20, vec![(Some(-1000.0), Some(1000.0))]), + ); + + let g2 = group(&[11], &[21]); + let mut h_inv_2 = HashMap::new(); + h_inv_2.insert( + 11, + telem_with_power_bounds(11, vec![(Some(-500.0), Some(500.0))]), + ); + let mut h_bat_2 = HashMap::new(); + h_bat_2.insert( + 21, + telem_with_power_bounds(21, vec![(Some(-500.0), Some(500.0))]), + ); + + let snapshot = status(vec![ + ( + g1, + InverterBatteryGroupStatus { + healthy_inverters: h_inv_1, + healthy_batteries: h_bat_1, + unhealthy_inverters: HashMap::new(), + unhealthy_batteries: HashMap::new(), + }, + ), + ( + g2, + InverterBatteryGroupStatus { + healthy_inverters: h_inv_2, + healthy_batteries: h_bat_2, + unhealthy_inverters: HashMap::new(), + unhealthy_batteries: HashMap::new(), + }, + ), + ]); + + let bounds = + BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + assert_eq!( + bounds, + vec![Bounds::new( + Some(Power::from_watts(-1500.0)), + Some(Power::from_watts(1500.0)) + )] + ); + } + + #[test] + fn empty_pool_yields_empty_bounds() { + let snapshot = status(vec![]); + let bounds = + BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + assert!(bounds.is_empty()); + } + + /// When inverters have no power bounds (metric absent or empty `bounds` + /// list), the group has no well-defined feasible region and must + /// contribute no bounds to the pool aggregate. + #[test] + fn missing_inverter_bounds_yields_no_group_bounds() { + let g = group(&[10], &[20]); + + // Inverter telemetry carries a matching metric but no bounds at all. + let mut healthy_inverters = HashMap::new(); + healthy_inverters.insert(10, telem_with_power_bounds(10, vec![])); + + let mut healthy_batteries = HashMap::new(); + healthy_batteries.insert( + 20, + telem_with_power_bounds(20, vec![(Some(-500.0), Some(500.0))]), + ); + + let snapshot = status(vec![( + g, + InverterBatteryGroupStatus { + healthy_inverters, + healthy_batteries, + unhealthy_inverters: HashMap::new(), + unhealthy_batteries: HashMap::new(), + }, + )]); + + let bounds = + BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + assert!( + bounds.is_empty(), + "group with no inverter bounds must not contribute any bounds" + ); + } + + /// Mirror of the above for the battery side: with batteries reporting no + /// power bounds, the group must contribute no bounds to the pool. + #[test] + fn missing_battery_bounds_yields_no_group_bounds() { + let g = group(&[10], &[20]); + + let mut healthy_inverters = HashMap::new(); + healthy_inverters.insert( + 10, + telem_with_power_bounds(10, vec![(Some(-1000.0), Some(1000.0))]), + ); + + let mut healthy_batteries = HashMap::new(); + healthy_batteries.insert(20, telem_with_power_bounds(20, vec![])); + + let snapshot = status(vec![( + g, + InverterBatteryGroupStatus { + healthy_inverters, + healthy_batteries, + unhealthy_inverters: HashMap::new(), + unhealthy_batteries: HashMap::new(), + }, + )]); + + let bounds = + BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + assert!( + bounds.is_empty(), + "group with no battery bounds must not contribute any bounds" + ); + } + + /// If every inverter in the group is unhealthy, the group cannot dispatch + /// power — the pool must report no bounds from this group regardless of + /// what the healthy batteries could handle. + #[test] + fn no_healthy_inverters_yields_no_group_bounds() { + let g = group(&[10], &[20]); + + let mut unhealthy_inverters = HashMap::new(); + unhealthy_inverters.insert(10, None); + + let mut healthy_batteries = HashMap::new(); + healthy_batteries.insert( + 20, + telem_with_power_bounds(20, vec![(Some(-500.0), Some(500.0))]), + ); + + let snapshot = status(vec![( + g, + InverterBatteryGroupStatus { + healthy_inverters: HashMap::new(), + healthy_batteries, + unhealthy_inverters, + unhealthy_batteries: HashMap::new(), + }, + )]); + + let bounds = + BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + assert!( + bounds.is_empty(), + "group with no healthy inverters must not contribute any bounds" + ); + } + + /// Mirror of the above: no healthy batteries in the group means nothing + /// to source/sink, so the group contributes no bounds to the pool. + #[test] + fn no_healthy_batteries_yields_no_group_bounds() { + let g = group(&[10], &[20]); + + let mut healthy_inverters = HashMap::new(); + healthy_inverters.insert( + 10, + telem_with_power_bounds(10, vec![(Some(-1000.0), Some(1000.0))]), + ); + + let mut unhealthy_batteries = HashMap::new(); + unhealthy_batteries.insert(20, None); + + let snapshot = status(vec![( + g, + InverterBatteryGroupStatus { + healthy_inverters, + healthy_batteries: HashMap::new(), + unhealthy_inverters: HashMap::new(), + unhealthy_batteries, + }, + )]); + + let bounds = + BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + assert!( + bounds.is_empty(), + "group with no healthy batteries must not contribute any bounds" + ); + } + + #[test] + fn group_without_matching_metric_contributes_nothing() { + let g = group(&[10], &[20]); + // Telemetry exists but carries a different metric. + let other = ElectricalComponentTelemetry { + electrical_component_id: 10, + metric_samples: vec![MetricSample { + sample_time: None, + metric: MetricPb::AcVoltage as i32, + value: None, + bounds: vec![PbBounds { + lower: Some(0.0), + upper: Some(1.0), + }], + ..Default::default() + }], + ..Default::default() + }; + let mut h_inv = HashMap::new(); + h_inv.insert(10, other); + let mut h_bat = HashMap::new(); + h_bat.insert( + 20, + telem_with_power_bounds(20, vec![(Some(-100.0), Some(100.0))]), + ); + + let snapshot = status(vec![( + g, + InverterBatteryGroupStatus { + healthy_inverters: h_inv, + healthy_batteries: h_bat, + unhealthy_inverters: HashMap::new(), + unhealthy_batteries: HashMap::new(), + }, + )]); + + // Inverter side has no active-power bounds → group produces no + // bounds, so the pool bounds are empty. + let bounds = + BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + assert!(bounds.is_empty()); + } +} diff --git a/src/microgrid/battery_pool.rs b/src/microgrid/battery_pool.rs index 4a378ea..64daccb 100644 --- a/src/microgrid/battery_pool.rs +++ b/src/microgrid/battery_pool.rs @@ -15,8 +15,11 @@ use crate::{ proto::common::microgrid::electrical_components::ElectricalComponentStateCode, }, metric, - microgrid::telemetry_tracker::battery_pool_telemetry_tracker::{ - BatteryPoolSnapshot, BatteryPoolTelemetryTracker, + microgrid::{ + battery_bounds_tracker::BatteryPoolBoundsTracker, + telemetry_tracker::battery_pool_telemetry_tracker::{ + BatteryPoolSnapshot, BatteryPoolTelemetryTracker, + }, }, quantity::Power, }; @@ -27,6 +30,7 @@ pub struct BatteryPool { client: MicrogridClientHandle, logical_meter: LogicalMeterHandle, snapshot_tx: Option>, + bounds_tx: Option>>>, } impl BatteryPool { @@ -65,6 +69,32 @@ impl BatteryPool { .battery::(self.component_ids.clone()) } + /// Returns a receiver for the aggregated active-power bounds of the pool, + /// updated on each snapshot. + /// + /// Reuses the running bounds tracker if one exists and still has active + /// receivers; otherwise starts a new one (which also starts or reuses the + /// underlying telemetry tracker). + pub fn power_bounds(&mut self) -> broadcast::Receiver>> { + if let Some(tx) = self + .bounds_tx + .as_ref() + .and_then(broadcast::WeakSender::upgrade) + && tx.receiver_count() > 0 + { + return tx.subscribe(); + } + let snapshot_rx = self.telemetry_snapshots(); + let (tx, rx) = broadcast::channel(100); + self.bounds_tx = Some(tx.downgrade()); + let tracker = BatteryPoolBoundsTracker::::new( + snapshot_rx, + tx, + ); + tokio::spawn(tracker.run()); + rx + } + /// Returns a receiver for a stream of [`BatteryPoolSnapshot`] values, /// each reflecting the latest component telemetry partitioned into /// healthy and unhealthy sets. From 29614ecd8825eb3096c0bad4789eb0375f6b2c3b Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 21 Apr 2026 14:45:00 +0200 Subject: [PATCH 15/18] Confirm components are batteries when creating a battery pool Signed-off-by: Sahas Subramanian --- examples/bounds.rs | 2 +- src/error.rs | 1 + src/microgrid.rs | 4 +-- src/microgrid/battery_pool.rs | 36 ++++++++++++++----- .../battery_pool_telemetry_tracker.rs | 2 +- 5 files changed, 32 insertions(+), 13 deletions(-) diff --git a/examples/bounds.rs b/examples/bounds.rs index df943d0..32697a2 100644 --- a/examples/bounds.rs +++ b/examples/bounds.rs @@ -23,7 +23,7 @@ async fn main() -> Result<(), Error> { ) .await?; - let mut battery_pool = microgrid.battery_pool(None); + let mut battery_pool = microgrid.battery_pool(None)?; let mut bounds_rx = battery_pool.power_bounds(); while let Ok(bounds) = bounds_rx.recv().await { diff --git a/src/error.rs b/src/error.rs index 8dd2e00..6325ea0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -59,6 +59,7 @@ ErrorKind!( (ChronoError, chrono_error), (DroppedUnusedFormulas, dropped_unused_formulas), (FormulaEngineError, formula_engine_error), + (InvalidComponent, invalid_component), (Internal, internal), (APIServerError, api_server_error), ); diff --git a/src/microgrid.rs b/src/microgrid.rs index 7432e66..3f102dc 100644 --- a/src/microgrid.rs +++ b/src/microgrid.rs @@ -58,8 +58,8 @@ impl Microgrid { self.logical_meter.clone() } - pub fn battery_pool(&self, component_ids: Option>) -> BatteryPool { - BatteryPool::new( + pub fn battery_pool(&self, component_ids: Option>) -> Result { + BatteryPool::try_new( component_ids.map(|ids| ids.into_iter().collect()), self.client.clone(), self.logical_meter.clone(), diff --git a/src/microgrid/battery_pool.rs b/src/microgrid/battery_pool.rs index 64daccb..cb48298 100644 --- a/src/microgrid/battery_pool.rs +++ b/src/microgrid/battery_pool.rs @@ -36,30 +36,48 @@ pub struct BatteryPool { impl BatteryPool { /// Creates a new `BatteryPool` instance with the given component IDs, /// client and logical meter handles. - pub(crate) fn new( + pub(crate) fn try_new( component_ids: Option>, client: MicrogridClientHandle, logical_meter: LogicalMeterHandle, - ) -> Self { - Self { + ) -> Result { + let this = Self { component_ids, client, logical_meter, snapshot_tx: None, bounds_tx: None, + }; + if let Some(ids) = &this.component_ids { + if ids.is_empty() { + let e = "component_ids cannot be an empty set".to_string(); + tracing::error!("{e}"); + return Err(Error::invalid_component(e)); + } + // Validate that all provided IDs correspond to batteries in the graph. + if !ids.is_subset(&this.get_all_battery_ids()) { + let e = format!("All component_ids {:?} must be batteries.", ids); + tracing::error!("{e}"); + return Err(Error::invalid_component(e)); + } } + Ok(this) + } + + fn get_all_battery_ids(&self) -> BTreeSet { + self.logical_meter + .graph() + .components() + .filter(|c| c.category() == ElectricalComponentCategory::Battery) + .map(|c| c.id) + .collect() } pub(crate) fn get_battery_ids(&self) -> BTreeSet { if let Some(ids) = &self.component_ids { ids.clone() } else { - self.logical_meter - .graph() - .components() - .filter(|c| c.category() == ElectricalComponentCategory::Battery) - .map(|c| c.id) - .collect() + self.get_all_battery_ids() } } diff --git a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs index 3a47272..8ccad01 100644 --- a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs @@ -250,7 +250,7 @@ mod tests { ) .await .unwrap(); - BatteryPool::new(None, client, lm) + BatteryPool::try_new(None, client, lm).unwrap() } /// Drains `rx` for up to `steps` * 100ms of simulated time, returning the From e20f7b12498d892f52310895f8bef295153904fe Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 21 Apr 2026 15:00:37 +0200 Subject: [PATCH 16/18] Improve formatting Signed-off-by: Sahas Subramanian --- src/client/test_utils.rs | 5 ++- src/microgrid/battery_bounds_tracker.rs | 49 +++++++++++++++---------- 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/src/client/test_utils.rs b/src/client/test_utils.rs index e184310..08f9ac4 100644 --- a/src/client/test_utils.rs +++ b/src/client/test_utils.rs @@ -355,8 +355,9 @@ impl MicrogridApiClient for MockMicrogridApiClient { if let Some(component) = component { if !component.metrics.is_empty() { let metrics = component.metrics.clone(); - let state_code = - component.state_code.unwrap_or(ElectricalComponentStateCode::Ready); + let state_code = component + .state_code + .unwrap_or(ElectricalComponentStateCode::Ready); let silence_after_metrics = component.silence_after_metrics; tokio::spawn(async move { let dur = std::time::Duration::from_millis(200); diff --git a/src/microgrid/battery_bounds_tracker.rs b/src/microgrid/battery_bounds_tracker.rs index e6d01d6..c58f789 100644 --- a/src/microgrid/battery_bounds_tracker.rs +++ b/src/microgrid/battery_bounds_tracker.rs @@ -119,7 +119,9 @@ where components .values() .filter_map(extract_metric_bounds::) - .fold(Vec::new(), |acc, bounds| combine_parallel_sets(&acc, &bounds)) + .fold(Vec::new(), |acc, bounds| { + combine_parallel_sets(&acc, &bounds) + }) } fn extract_metric_bounds( @@ -217,8 +219,9 @@ mod tests { }, )]); - let bounds = - BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + &snapshot, + ); assert_eq!( bounds, vec![ @@ -263,8 +266,9 @@ mod tests { }, )]); - let bounds = - BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + &snapshot, + ); assert_eq!( bounds, vec![Bounds::new( @@ -321,8 +325,9 @@ mod tests { ), ]); - let bounds = - BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + &snapshot, + ); assert_eq!( bounds, vec![Bounds::new( @@ -335,8 +340,9 @@ mod tests { #[test] fn empty_pool_yields_empty_bounds() { let snapshot = status(vec![]); - let bounds = - BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + &snapshot, + ); assert!(bounds.is_empty()); } @@ -367,8 +373,9 @@ mod tests { }, )]); - let bounds = - BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + &snapshot, + ); assert!( bounds.is_empty(), "group with no inverter bounds must not contribute any bounds" @@ -400,8 +407,9 @@ mod tests { }, )]); - let bounds = - BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + &snapshot, + ); assert!( bounds.is_empty(), "group with no battery bounds must not contribute any bounds" @@ -434,8 +442,9 @@ mod tests { }, )]); - let bounds = - BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + &snapshot, + ); assert!( bounds.is_empty(), "group with no healthy inverters must not contribute any bounds" @@ -467,8 +476,9 @@ mod tests { }, )]); - let bounds = - BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + &snapshot, + ); assert!( bounds.is_empty(), "group with no healthy batteries must not contribute any bounds" @@ -513,8 +523,9 @@ mod tests { // Inverter side has no active-power bounds → group produces no // bounds, so the pool bounds are empty. - let bounds = - BatteryPoolBoundsTracker::::compute_pool_bounds(&snapshot); + let bounds = BatteryPoolBoundsTracker::::compute_pool_bounds( + &snapshot, + ); assert!(bounds.is_empty()); } } From 9b04bfd7c2b2c04501634df5a545d734f9e788c2 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 21 Apr 2026 15:25:22 +0200 Subject: [PATCH 17/18] Define clippy restrictions for the crate This way, they don't have to be specified from the CLI each time. Signed-off-by: Sahas Subramanian --- src/lib.rs | 12 ++++++++++++ src/logical_meter/formula/async_formula.rs | 6 +++++- .../battery_pool_telemetry_tracker.rs | 8 +++----- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index be38afc..1c756ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,18 @@ //! High-level interface for the Microgrid API. +#![cfg_attr( + not(test), + deny( + clippy::unwrap_used, + clippy::expect_used, + clippy::panic, + clippy::unimplemented, + clippy::todo, + clippy::unreachable, + ) +)] + mod bounds; pub use bounds::Bounds; diff --git a/src/logical_meter/formula/async_formula.rs b/src/logical_meter/formula/async_formula.rs index 695c884..57f12c6 100644 --- a/src/logical_meter/formula/async_formula.rs +++ b/src/logical_meter/formula/async_formula.rs @@ -157,7 +157,11 @@ async fn synchronize_receivers( for item in formula_items.iter_mut() { match item.recv().await { Ok(vv) => latest.push(vv), - Err(_) => todo!(), + Err(e) => { + return Err(crate::Error::internal(format!( + "Failed to receive from formula operand: {e}" + ))); + } }; } diff --git a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs index 8ccad01..11ce4fe 100644 --- a/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs +++ b/src/microgrid/telemetry_tracker/battery_pool_telemetry_tracker.rs @@ -113,12 +113,10 @@ impl BatteryPoolTelemetryTracker { if !group_batteries.is_subset(&self.component_ids) { let e = format!( concat!( - "Inverter {} is connected to batteries {:?} which are not all in the ", - "requested component IDs {:?}" + "Inverters {:?} are connected to batteries {:?} which are not all in ", + "the requested component IDs {:?}" ), - group_inverters.iter().next().unwrap(), - group_batteries, - self.component_ids + group_inverters, group_batteries, self.component_ids ); tracing::error!("{}", e); From 8a56776850e96bd0ebf3e8edb2b86df102e67949 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 21 Apr 2026 16:43:15 +0200 Subject: [PATCH 18/18] Update release notes Signed-off-by: Sahas Subramanian --- RELEASE_NOTES.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index eb51cbf..2ad1102 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -28,6 +28,14 @@ - The resampler's `max_age_in_intervals` has also become configurable, through `LogicalMeterConfig`. +- The `Quantity` trait now exposes numeric operations (`abs`, `floor`, `ceil`, `round`, `trunc`, `fract`, `is_nan`, `is_infinite`, `min`, `max`) as trait methods, so they can be used in generic code over any `Quantity` (including `f32` and all quantity types). + +- The `Quantity` trait now provides associated `MIN` and `MAX` constants. + +- New `BatteryPool` type (accessible via `Microgrid::battery_pool`) exposing: + - `power()` — a `Formula` for the pool's aggregated power. + - `power_bounds()` — a `broadcast::Receiver>>` tracking the pool's power bounds. + ## Bug Fixes