From da5db09f5ed030e6177ee4c2f764f1a21f2bdab1 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Tue, 15 Jul 2025 17:41:47 +0200 Subject: [PATCH 01/14] Upgrade to latest component-graph Signed-off-by: Sahas Subramanian --- Cargo.toml | 2 +- src/logical_meter/formula.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8e7149b..33b469d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ path = "src/lib.rs" [dependencies] chrono = "0.4" -frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "ab3b998" } +frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "94dc826" } frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "e0567c0" } frequenz-resampling = { git = "https://github.com/frequenz-floss/frequenz-resampling-rs.git", rev = "ce84d66" } prost = "0.13" diff --git a/src/logical_meter/formula.rs b/src/logical_meter/formula.rs index ff4e285..7908309 100644 --- a/src/logical_meter/formula.rs +++ b/src/logical_meter/formula.rs @@ -11,7 +11,7 @@ use super::logical_meter_actor; #[derive(Clone)] pub struct Formula { - formula: frequenz_microgrid_component_graph::Formula, + formula: frequenz_microgrid_component_graph::AggregationFormula, metric: Metric, instructions_tx: mpsc::Sender, } @@ -24,7 +24,7 @@ impl std::fmt::Display for Formula { impl Formula { pub(super) fn new( - formula: frequenz_microgrid_component_graph::Formula, + formula: frequenz_microgrid_component_graph::AggregationFormula, metric: Metric, instructions_tx: mpsc::Sender, ) -> Self { From 473f4a427188ce45b46f74c8151707bd62729630 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 16 Jul 2025 11:31:42 +0200 Subject: [PATCH 02/14] =?UTF-8?q?Rename=20`Formula`=20=E2=86=92=20`Aggrega?= =?UTF-8?q?tionFormula`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Because there will be another type of formula that doesn't support aggregation. Signed-off-by: Sahas Subramanian --- src/lib.rs | 2 +- src/logical_meter.rs | 2 +- src/logical_meter/formula.rs | 34 ++++++------ src/logical_meter/logical_meter_handle.rs | 68 +++++++++++++++++------ 4 files changed, 70 insertions(+), 36 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c8f1c1e..4a83f17 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,4 +15,4 @@ mod sample; pub use sample::Sample; mod logical_meter; -pub use logical_meter::{Formula, LogicalMeterConfig, LogicalMeterHandle, Metric}; +pub use logical_meter::{AggregationFormula, LogicalMeterConfig, LogicalMeterHandle, Metric}; diff --git a/src/logical_meter.rs b/src/logical_meter.rs index 3fe3130..392c66a 100644 --- a/src/logical_meter.rs +++ b/src/logical_meter.rs @@ -12,4 +12,4 @@ mod metric; pub use metric::Metric; pub use config::LogicalMeterConfig; -pub use formula::Formula; +pub use formula::AggregationFormula; diff --git a/src/logical_meter/formula.rs b/src/logical_meter/formula.rs index 7908309..7535e0d 100644 --- a/src/logical_meter/formula.rs +++ b/src/logical_meter/formula.rs @@ -10,19 +10,19 @@ use crate::{Error, Metric, Sample}; use super::logical_meter_actor; #[derive(Clone)] -pub struct Formula { +pub struct AggregationFormula { formula: frequenz_microgrid_component_graph::AggregationFormula, metric: Metric, instructions_tx: mpsc::Sender, } -impl std::fmt::Display for Formula { +impl std::fmt::Display for AggregationFormula { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.formula.fmt(f) } } -impl Formula { +impl AggregationFormula { pub(super) fn new( formula: frequenz_microgrid_component_graph::AggregationFormula, metric: Metric, @@ -54,7 +54,7 @@ impl Formula { } } -impl std::ops::Add for Formula { +impl std::ops::Add for AggregationFormula { type Output = Result; fn add(self, other: Self) -> Self::Output { @@ -69,7 +69,7 @@ impl std::ops::Add for Formula { } } -impl std::ops::Sub for Formula { +impl std::ops::Sub for AggregationFormula { type Output = Result; fn sub(self, other: Self) -> Self::Output { @@ -84,10 +84,10 @@ impl std::ops::Sub for Formula { } } -impl std::ops::Add for Result { - type Output = Result; +impl std::ops::Add for Result { + type Output = Result; - fn add(self, other: Formula) -> Self::Output { + fn add(self, other: AggregationFormula) -> Self::Output { match self { Ok(left) => left + other, Err(e) => Err(e), @@ -95,10 +95,10 @@ impl std::ops::Add for Result { } } -impl std::ops::Sub for Result { - type Output = Result; +impl std::ops::Sub for Result { + type Output = Result; - fn sub(self, other: Formula) -> Self::Output { + fn sub(self, other: AggregationFormula) -> Self::Output { match self { Ok(left) => left - other, Err(e) => Err(e), @@ -106,10 +106,10 @@ impl std::ops::Sub for Result { } } -impl std::ops::Add> for Formula { - type Output = Result; +impl std::ops::Add> for AggregationFormula { + type Output = Result; - fn add(self, other: Result) -> Self::Output { + fn add(self, other: Result) -> Self::Output { match other { Ok(right) => self + right, Err(e) => Err(e), @@ -117,10 +117,10 @@ impl std::ops::Add> for Formula { } } -impl std::ops::Sub> for Formula { - type Output = Result; +impl std::ops::Sub> for AggregationFormula { + type Output = Result; - fn sub(self, other: Result) -> Self::Output { + fn sub(self, other: Result) -> Self::Output { match other { Ok(right) => self - right, Err(e) => Err(e), diff --git a/src/logical_meter/logical_meter_handle.rs b/src/logical_meter/logical_meter_handle.rs index 5e76235..0aec03e 100644 --- a/src/logical_meter/logical_meter_handle.rs +++ b/src/logical_meter/logical_meter_handle.rs @@ -10,7 +10,9 @@ use frequenz_microgrid_component_graph::{self, ComponentGraph}; use std::collections::BTreeSet; use tokio::sync::mpsc; -use super::{Formula, LogicalMeterConfig, Metric, logical_meter_actor::LogicalMeterActor}; +use super::{ + AggregationFormula, LogicalMeterConfig, Metric, logical_meter_actor::LogicalMeterActor, +}; /// This provides an interface stream high-level metrics from a microgrid. #[derive(Clone)] @@ -54,7 +56,7 @@ impl LogicalMeterHandle { /// Returns a receiver that streams samples for the given `metric` at the grid /// connection point. - pub fn grid(&mut self, metric: Metric) -> Result { + pub fn grid(&mut self, metric: Metric) -> Result { if !metric.power() && !metric.current() { return Err(Error::invalid_metric(format!( "The grid formula only supports power or current metrics, but got: {metric:?}" @@ -64,7 +66,11 @@ impl LogicalMeterHandle { Error::component_graph_error(format!("Could not derive grid formula: {e}")) })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + Ok(AggregationFormula::new( + formula, + metric, + self.instructions_tx.clone(), + )) } /// Returns a receiver that streams samples for the given `metric` for the @@ -75,7 +81,7 @@ impl LogicalMeterHandle { &mut self, component_ids: Option>, metric: Metric, - ) -> Result { + ) -> Result { if !metric.power() && !metric.current() { return Err(Error::invalid_metric(format!( "The battery formula only supports power or current metrics, but got: {metric:?}" @@ -84,7 +90,11 @@ impl LogicalMeterHandle { let formula = self.graph.battery_formula(component_ids).map_err(|e| { Error::component_graph_error(format!("Could not derive battery formula: {e}")) })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + Ok(AggregationFormula::new( + formula, + metric, + self.instructions_tx.clone(), + )) } /// Returns a receiver that streams samples for the given `metric` for the @@ -95,7 +105,7 @@ impl LogicalMeterHandle { &mut self, component_ids: Option>, metric: Metric, - ) -> Result { + ) -> Result { if !metric.power() && !metric.current() { return Err(Error::invalid_metric(format!( "The CHP formula only supports power or current metrics, but got: {metric:?}" @@ -104,7 +114,11 @@ impl LogicalMeterHandle { let formula = self.graph.chp_formula(component_ids).map_err(|e| { Error::component_graph_error(format!("Could not derive CHP formula: {e}")) })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + Ok(AggregationFormula::new( + formula, + metric, + self.instructions_tx.clone(), + )) } /// Returns a receiver that streams samples for the given `metric` for the @@ -115,7 +129,7 @@ impl LogicalMeterHandle { &mut self, component_ids: Option>, metric: Metric, - ) -> Result { + ) -> Result { if !metric.power() && !metric.current() { return Err(Error::invalid_metric(format!( "The PV formula only supports power or current metrics, but got: {metric:?}" @@ -124,7 +138,11 @@ impl LogicalMeterHandle { let formula = self.graph.pv_formula(component_ids).map_err(|e| { Error::component_graph_error(format!("Could not derive PV formula: {e}")) })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + Ok(AggregationFormula::new( + formula, + metric, + self.instructions_tx.clone(), + )) } /// Returns a receiver that streams samples for the given `metric` for the @@ -136,7 +154,7 @@ impl LogicalMeterHandle { &mut self, component_ids: Option>, metric: Metric, - ) -> Result { + ) -> Result { if !metric.power() && !metric.current() { return Err(Error::invalid_metric(format!( "The EV charger formula only supports power or current metrics, but got: {metric:?}" @@ -145,12 +163,16 @@ impl LogicalMeterHandle { let formula = self.graph.ev_charger_formula(component_ids).map_err(|e| { Error::component_graph_error(format!("Could not derive EV charger formula: {e}")) })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + Ok(AggregationFormula::new( + formula, + metric, + self.instructions_tx.clone(), + )) } /// Returns a receiver that streams samples for the given `metric` for all /// the consumers in the microgrid. - pub fn consumer(&mut self, metric: Metric) -> Result { + pub fn consumer(&mut self, metric: Metric) -> Result { if !metric.power() && !metric.current() { return Err(Error::invalid_metric(format!( "The consumer formula only supports power or current metrics, but got: {metric:?}" @@ -159,12 +181,16 @@ impl LogicalMeterHandle { let formula = self.graph.consumer_formula().map_err(|e| { Error::component_graph_error(format!("Could not derive consumer formula: {e}")) })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + Ok(AggregationFormula::new( + formula, + metric, + self.instructions_tx.clone(), + )) } /// Returns a receiver that streams samples for the given `metric` for all /// producers in the microgrid. - pub fn producer(&mut self, metric: Metric) -> Result { + pub fn producer(&mut self, metric: Metric) -> Result { if !metric.power() && !metric.current() { return Err(Error::invalid_metric(format!( "The producer formula only supports power or current metrics, but got: {metric:?}" @@ -173,17 +199,25 @@ impl LogicalMeterHandle { let formula = self.graph.producer_formula().map_err(|e| { Error::component_graph_error(format!("Could not derive producer formula: {e}")) })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + Ok(AggregationFormula::new( + formula, + metric, + self.instructions_tx.clone(), + )) } pub fn coalesce( &mut self, component_ids: BTreeSet, metric: Metric, - ) -> Result { + ) -> Result { let formula = self.graph.coalesce(component_ids).map_err(|e| { Error::component_graph_error(format!("Could not derive coalesce formula: {e}")) })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + Ok(AggregationFormula::new( + formula, + metric, + self.instructions_tx.clone(), + )) } } From ebb439c5ebf7452568a8176e20e530344f0ec214 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 17 Jul 2025 09:53:27 +0200 Subject: [PATCH 03/14] Move `AggregationFormula` into its own module Signed-off-by: Sahas Subramanian --- src/logical_meter/formula.rs | 128 +----------------- .../formula/aggregation_formula.rs | 126 +++++++++++++++++ src/logical_meter/logical_meter_actor.rs | 2 +- 3 files changed, 130 insertions(+), 126 deletions(-) create mode 100644 src/logical_meter/formula/aggregation_formula.rs diff --git a/src/logical_meter/formula.rs b/src/logical_meter/formula.rs index 7535e0d..8802bdb 100644 --- a/src/logical_meter/formula.rs +++ b/src/logical_meter/formula.rs @@ -1,129 +1,7 @@ // License: MIT // Copyright © 2025 Frequenz Energy-as-a-Service GmbH -//! A composable formula type, that can be subscribed to. +//! Formula module for the logical meter. -use tokio::sync::{broadcast, mpsc, oneshot}; - -use crate::{Error, Metric, Sample}; - -use super::logical_meter_actor; - -#[derive(Clone)] -pub struct AggregationFormula { - formula: frequenz_microgrid_component_graph::AggregationFormula, - metric: Metric, - instructions_tx: mpsc::Sender, -} - -impl std::fmt::Display for AggregationFormula { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.formula.fmt(f) - } -} - -impl AggregationFormula { - pub(super) fn new( - formula: frequenz_microgrid_component_graph::AggregationFormula, - metric: Metric, - instructions_tx: mpsc::Sender, - ) -> Self { - Self { - formula, - metric, - instructions_tx, - } - } - - pub async fn subscribe(&self) -> Result, Error> { - let (tx, rx) = oneshot::channel(); - - self.instructions_tx - .send(logical_meter_actor::Instruction::SubscribeFormula { - formula: self.formula.to_string(), - metric: self.metric, - response_tx: tx, - }) - .await - .map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?; - let receiver = rx.await.map_err(|e| { - Error::connection_failure(format!("Could not receive instruction: {e}")) - })?; - - Ok(receiver) - } -} - -impl std::ops::Add for AggregationFormula { - type Output = Result; - - fn add(self, other: Self) -> Self::Output { - if self.metric != other.metric { - return Err(Error::invalid_metric(format!( - "Cannot add formulas with different metrics: {} and {}", - self.metric as isize, other.metric as isize - ))); - } - let new_formula = self.formula + other.formula; - Ok(Self::new(new_formula, self.metric, self.instructions_tx)) - } -} - -impl std::ops::Sub for AggregationFormula { - type Output = Result; - - fn sub(self, other: Self) -> Self::Output { - if self.metric != other.metric { - return Err(Error::invalid_metric(format!( - "Cannot subtract formulas with different metrics: {} and {}", - self.metric as isize, other.metric as isize - ))); - } - let new_formula = self.formula - other.formula; - Ok(Self::new(new_formula, self.metric, self.instructions_tx)) - } -} - -impl std::ops::Add for Result { - type Output = Result; - - fn add(self, other: AggregationFormula) -> Self::Output { - match self { - Ok(left) => left + other, - Err(e) => Err(e), - } - } -} - -impl std::ops::Sub for Result { - type Output = Result; - - fn sub(self, other: AggregationFormula) -> Self::Output { - match self { - Ok(left) => left - other, - Err(e) => Err(e), - } - } -} - -impl std::ops::Add> for AggregationFormula { - type Output = Result; - - fn add(self, other: Result) -> Self::Output { - match other { - Ok(right) => self + right, - Err(e) => Err(e), - } - } -} - -impl std::ops::Sub> for AggregationFormula { - type Output = Result; - - fn sub(self, other: Result) -> Self::Output { - match other { - Ok(right) => self - right, - Err(e) => Err(e), - } - } -} +mod aggregation_formula; +pub use aggregation_formula::AggregationFormula; diff --git a/src/logical_meter/formula/aggregation_formula.rs b/src/logical_meter/formula/aggregation_formula.rs new file mode 100644 index 0000000..821e248 --- /dev/null +++ b/src/logical_meter/formula/aggregation_formula.rs @@ -0,0 +1,126 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! An formula that supports aggregation operations. + +use crate::{Error, Metric, Sample, logical_meter::logical_meter_actor}; +use tokio::sync::{broadcast, mpsc, oneshot}; + +#[derive(Clone)] +pub struct AggregationFormula { + formula: frequenz_microgrid_component_graph::AggregationFormula, + metric: Metric, + instructions_tx: mpsc::Sender, +} + +impl std::fmt::Display for AggregationFormula { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.formula.fmt(f) + } +} + +impl AggregationFormula { + pub(crate) fn new( + formula: frequenz_microgrid_component_graph::AggregationFormula, + metric: Metric, + instructions_tx: mpsc::Sender, + ) -> Self { + Self { + formula, + metric, + instructions_tx, + } + } + + pub async fn subscribe(&self) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + + self.instructions_tx + .send(logical_meter_actor::Instruction::SubscribeFormula { + formula: self.formula.to_string(), + metric: self.metric, + response_tx: tx, + }) + .await + .map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?; + let receiver = rx.await.map_err(|e| { + Error::connection_failure(format!("Could not receive instruction: {e}")) + })?; + + Ok(receiver) + } +} + +impl std::ops::Add for AggregationFormula { + type Output = Result; + + fn add(self, other: Self) -> Self::Output { + if self.metric != other.metric { + return Err(Error::invalid_metric(format!( + "Cannot add formulas with different metrics: {} and {}", + self.metric as isize, other.metric as isize + ))); + } + let new_formula = self.formula + other.formula; + Ok(Self::new(new_formula, self.metric, self.instructions_tx)) + } +} + +impl std::ops::Sub for AggregationFormula { + type Output = Result; + + fn sub(self, other: Self) -> Self::Output { + if self.metric != other.metric { + return Err(Error::invalid_metric(format!( + "Cannot subtract formulas with different metrics: {} and {}", + self.metric as isize, other.metric as isize + ))); + } + let new_formula = self.formula - other.formula; + Ok(Self::new(new_formula, self.metric, self.instructions_tx)) + } +} + +impl std::ops::Add for Result { + type Output = Result; + + fn add(self, other: AggregationFormula) -> Self::Output { + match self { + Ok(left) => left + other, + Err(e) => Err(e), + } + } +} + +impl std::ops::Sub for Result { + type Output = Result; + + fn sub(self, other: AggregationFormula) -> Self::Output { + match self { + Ok(left) => left - other, + Err(e) => Err(e), + } + } +} + +impl std::ops::Add> for AggregationFormula { + type Output = Result; + + fn add(self, other: Result) -> Self::Output { + match other { + Ok(right) => self + right, + Err(e) => Err(e), + } + } +} + +impl std::ops::Sub> for AggregationFormula { + type Output = Result; + + fn sub(self, other: Result) -> Self::Output { + match other { + Ok(right) => self - right, + Err(e) => Err(e), + } + } +} diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index c6c0023..c00a4a0 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -32,7 +32,7 @@ struct ComponentDataResampler { receiver: broadcast::Receiver, } -pub(super) enum Instruction { +pub(crate) enum Instruction { SubscribeFormula { formula: String, metric: Metric, From 76ed7ddc28a3da7b73ca7e62ce2ec64eb5836220 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 17 Jul 2025 00:36:22 +0200 Subject: [PATCH 04/14] Add new Metric representation bound to a formula type Signed-off-by: Sahas Subramanian --- src/lib.rs | 4 ++- src/logical_meter.rs | 2 +- src/logical_meter/metric.rs | 38 ++++++++++++++++++++++++ src/logical_meter/metric/metric_trait.rs | 13 ++++++++ 4 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 src/logical_meter/metric/metric_trait.rs diff --git a/src/lib.rs b/src/lib.rs index 4a83f17..ccd358f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,4 +15,6 @@ mod sample; pub use sample::Sample; mod logical_meter; -pub use logical_meter::{AggregationFormula, LogicalMeterConfig, LogicalMeterHandle, Metric}; +pub use logical_meter::{ + AggregationFormula, LogicalMeterConfig, LogicalMeterHandle, Metric, metric, +}; diff --git a/src/logical_meter.rs b/src/logical_meter.rs index 392c66a..6f5726f 100644 --- a/src/logical_meter.rs +++ b/src/logical_meter.rs @@ -8,7 +8,7 @@ mod formula; mod logical_meter_actor; mod logical_meter_handle; pub use logical_meter_handle::LogicalMeterHandle; -mod metric; +pub mod metric; pub use metric::Metric; pub use config::LogicalMeterConfig; diff --git a/src/logical_meter/metric.rs b/src/logical_meter/metric.rs index 8978091..b4a0a4e 100644 --- a/src/logical_meter/metric.rs +++ b/src/logical_meter/metric.rs @@ -3,8 +3,46 @@ //! Metrics supported by the logical meter. +pub(crate) mod metric_trait; + use crate::proto::common::v1::metrics::Metric as MetricPb; +use super::formula; +use metric_trait::AcMetric; + +macro_rules! define_metric { + ($({name: $metric_name:ident, formula: $formula:ident}),+ $(,)?) => { + $( + // Define a metric + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub struct $metric_name; + + // Implement the AcMetric trait for the metric + impl AcMetric for $metric_name { + type FormulaType = formula::$formula; + + const METRIC: MetricPb = MetricPb::$metric_name; + } + + impl std::fmt::Display for $metric_name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", stringify!($metric_name)) + } + } + + )+ + }; +} + +define_metric! { + {name: AcActivePower, formula: AggregationFormula}, + {name: AcReactivePower, formula: AggregationFormula}, + {name: AcCurrent, formula: AggregationFormula}, + {name: AcCurrentPhase1, formula: AggregationFormula}, + {name: AcCurrentPhase2, formula: AggregationFormula}, + {name: AcCurrentPhase3, formula: AggregationFormula}, +} + /// Metrics supported by the logical meter. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Metric { diff --git a/src/logical_meter/metric/metric_trait.rs b/src/logical_meter/metric/metric_trait.rs new file mode 100644 index 0000000..30c592b --- /dev/null +++ b/src/logical_meter/metric/metric_trait.rs @@ -0,0 +1,13 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! A trait specifying the output formula type and the corresponding PB metric, +//! for all metrics supported by the logical meter. + +use crate::proto::common::v1::metrics::Metric as MetricPb; + +pub trait AcMetric: std::fmt::Display { + type FormulaType; + + const METRIC: MetricPb; +} From 472eb2e0945659279d650f87d5871bcd2ced078a Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 17 Jul 2025 09:58:51 +0200 Subject: [PATCH 05/14] Defines a `Formula` trait and implement it for `AggregationFormula` This will be implemented by the upcoming `CoalesceFormula` as well. Signed-off-by: Sahas Subramanian --- src/lib.rs | 2 +- src/logical_meter.rs | 2 +- src/logical_meter/formula.rs | 8 ++++++++ src/logical_meter/formula/aggregation_formula.rs | 5 ++++- src/logical_meter/metric/metric_trait.rs | 3 ++- 5 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ccd358f..a165336 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,5 +16,5 @@ pub use sample::Sample; mod logical_meter; pub use logical_meter::{ - AggregationFormula, LogicalMeterConfig, LogicalMeterHandle, Metric, metric, + AggregationFormula, Formula, LogicalMeterConfig, LogicalMeterHandle, Metric, metric, }; diff --git a/src/logical_meter.rs b/src/logical_meter.rs index 6f5726f..1d3826b 100644 --- a/src/logical_meter.rs +++ b/src/logical_meter.rs @@ -12,4 +12,4 @@ pub mod metric; pub use metric::Metric; pub use config::LogicalMeterConfig; -pub use formula::AggregationFormula; +pub use formula::{AggregationFormula, Formula}; diff --git a/src/logical_meter/formula.rs b/src/logical_meter/formula.rs index 8802bdb..14c930e 100644 --- a/src/logical_meter/formula.rs +++ b/src/logical_meter/formula.rs @@ -5,3 +5,11 @@ mod aggregation_formula; pub use aggregation_formula::AggregationFormula; + +use crate::{Error, Sample}; +use tokio::sync::broadcast; + +/// Defines a formula that can be subscribed to for receiving samples. +pub trait Formula: std::fmt::Display { + fn subscribe(&self) -> impl Future, Error>> + Send; +} diff --git a/src/logical_meter/formula/aggregation_formula.rs b/src/logical_meter/formula/aggregation_formula.rs index 821e248..358253b 100644 --- a/src/logical_meter/formula/aggregation_formula.rs +++ b/src/logical_meter/formula/aggregation_formula.rs @@ -3,6 +3,7 @@ //! An formula that supports aggregation operations. +use super::Formula; use crate::{Error, Metric, Sample, logical_meter::logical_meter_actor}; use tokio::sync::{broadcast, mpsc, oneshot}; @@ -31,8 +32,10 @@ impl AggregationFormula { instructions_tx, } } +} - pub async fn subscribe(&self) -> Result, Error> { +impl Formula for AggregationFormula { + async fn subscribe(&self) -> Result, Error> { let (tx, rx) = oneshot::channel(); self.instructions_tx diff --git a/src/logical_meter/metric/metric_trait.rs b/src/logical_meter/metric/metric_trait.rs index 30c592b..0d39ae2 100644 --- a/src/logical_meter/metric/metric_trait.rs +++ b/src/logical_meter/metric/metric_trait.rs @@ -4,10 +4,11 @@ //! A trait specifying the output formula type and the corresponding PB metric, //! for all metrics supported by the logical meter. +use super::formula; use crate::proto::common::v1::metrics::Metric as MetricPb; pub trait AcMetric: std::fmt::Display { - type FormulaType; + type FormulaType: formula::Formula; const METRIC: MetricPb; } From d077cd91c0adc873850ac86669e1ba5048c13842 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 17 Jul 2025 10:15:07 +0200 Subject: [PATCH 06/14] Implement a generic way to create formulas from the component graph Signed-off-by: Sahas Subramanian --- src/logical_meter/formula.rs | 1 + .../formula/graph_formula_provider.rs | 84 +++++++++++++++++++ src/logical_meter/metric/metric_trait.rs | 2 +- 3 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 src/logical_meter/formula/graph_formula_provider.rs diff --git a/src/logical_meter/formula.rs b/src/logical_meter/formula.rs index 14c930e..e92a9b6 100644 --- a/src/logical_meter/formula.rs +++ b/src/logical_meter/formula.rs @@ -4,6 +4,7 @@ //! Formula module for the logical meter. mod aggregation_formula; +pub(crate) mod graph_formula_provider; pub use aggregation_formula::AggregationFormula; use crate::{Error, Sample}; diff --git a/src/logical_meter/formula/graph_formula_provider.rs b/src/logical_meter/formula/graph_formula_provider.rs new file mode 100644 index 0000000..305bd6f --- /dev/null +++ b/src/logical_meter/formula/graph_formula_provider.rs @@ -0,0 +1,84 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! A composable formula type, that can be subscribed to. + +use crate::Error; +use crate::logical_meter::logical_meter_actor; +use crate::proto::common::v1::microgrid::components::Component; +use crate::proto::common::v1::microgrid::components::ComponentConnection; +use frequenz_microgrid_component_graph::ComponentGraph; +use std::collections::BTreeSet; +use tokio::sync::mpsc; + +use super::AggregationFormula; + +macro_rules! graph_formula_provider { + ($(($fnname:ident $(, $idsparam:ident)?)),+ $(,)?) => {$( + + fn $fnname( + _graph: &ComponentGraph, + _metric: M, + _instructions_tx: mpsc::Sender, + $($idsparam: Option>,)? + ) -> Result { + return Err(Error::component_graph_error( + format!( + "The component graph does not support {} formula generation for {}.", + stringify!($fnname), + _metric.to_string() + ) + )); + } + + )+}; +} + +/// Provides methods for generating corresponding formulas from the component +/// graph. +/// +/// The component graph exposes methods to retrieve `AggregationFormula`s and +/// `CoalesceFormula`s for each of these metrics. This trait provides a +/// way to generalize them. +pub trait GraphFormulaProvider: Sized { + graph_formula_provider!( + (grid), + (consumer), + (producer), + (battery, _battery_ids), + (chp, _chp_ids), + (pv, _pv_inverter_ids), + (ev_charger, _ev_charger_ids), + ); +} + +macro_rules! impl_graph_formula_provider { + ($(($fnname:ident, $graphfnname:ident$(, $idsparam:ident)?)),+ $(,)?) => {$( + + fn $fnname( + graph: &ComponentGraph, + _metric: M, + instructions_tx: mpsc::Sender, + $($idsparam: Option>,)? + ) -> Result { + let formula = graph.$graphfnname($($idsparam)?).map_err(|e| { + Error::component_graph_error( + format!("Could not get {} formula: {e}", stringify!($fnname)) + ) + })?; + Ok(Self::new(formula, M::METRIC, instructions_tx)) + } + )+}; +} + +impl GraphFormulaProvider for AggregationFormula { + impl_graph_formula_provider!( + (grid, grid_formula), + (consumer, consumer_formula), + (producer, producer_formula), + (battery, battery_formula, battery_ids), + (chp, chp_formula, chp_ids), + (pv, pv_formula, pv_inverter_ids), + (ev_charger, ev_charger_formula, ev_charger_ids), + ); +} diff --git a/src/logical_meter/metric/metric_trait.rs b/src/logical_meter/metric/metric_trait.rs index 0d39ae2..cdc5371 100644 --- a/src/logical_meter/metric/metric_trait.rs +++ b/src/logical_meter/metric/metric_trait.rs @@ -8,7 +8,7 @@ use super::formula; use crate::proto::common::v1::metrics::Metric as MetricPb; pub trait AcMetric: std::fmt::Display { - type FormulaType: formula::Formula; + type FormulaType: formula::Formula + formula::graph_formula_provider::GraphFormulaProvider; const METRIC: MetricPb; } From 9704904bbb64a76f4313e9a6b4e7ceeaac0406c1 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 17 Jul 2025 10:24:39 +0200 Subject: [PATCH 07/14] Upgrade the logical meter to use the new metric types This also enables us to delegate the component graph formula creation to the logical meter formulas, through the `GraphFormulaProvider` trait. Signed-off-by: Sahas Subramanian --- examples/logical_meter.rs | 14 +- .../formula/aggregation_formula.rs | 4 +- src/logical_meter/logical_meter_actor.rs | 4 +- src/logical_meter/logical_meter_handle.rs | 148 ++++++------------ 4 files changed, 58 insertions(+), 112 deletions(-) diff --git a/examples/logical_meter.rs b/examples/logical_meter.rs index 85445c8..0bedcef 100644 --- a/examples/logical_meter.rs +++ b/examples/logical_meter.rs @@ -3,7 +3,7 @@ use chrono::TimeDelta; use frequenz_microgrid::{ - Error, LogicalMeterConfig, LogicalMeterHandle, Metric, MicrogridClientHandle, + Error, Formula, LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, metric, }; #[tokio::main] @@ -23,13 +23,13 @@ async fn main() -> Result<(), Error> { .await?; // Create a formula that calculates `grid_power - battery_power`. - let formula_grid = logical_meter.grid(Metric::AcActivePower)?; - let formula_battery = logical_meter.battery(None, Metric::AcActivePower)?; - let formula_consumer = logical_meter.consumer(Metric::AcActivePower)?; + let formula_grid = logical_meter.grid(metric::AcActivePower)?; + let formula_battery = logical_meter.battery(None, metric::AcActivePower)?; + let formula_consumer = logical_meter.consumer(metric::AcActivePower)?; - let formula = (logical_meter.grid(Metric::AcActivePower)? - - logical_meter.battery(None, Metric::AcActivePower)? - + logical_meter.consumer(Metric::AcActivePower)?)?; + let formula = (logical_meter.grid(metric::AcActivePower)? + - logical_meter.battery(None, metric::AcActivePower)? + + logical_meter.consumer(metric::AcActivePower)?)?; let mut rx = formula.subscribe().await?; let mut grid_rx = formula_grid.subscribe().await?; diff --git a/src/logical_meter/formula/aggregation_formula.rs b/src/logical_meter/formula/aggregation_formula.rs index 358253b..f343d5e 100644 --- a/src/logical_meter/formula/aggregation_formula.rs +++ b/src/logical_meter/formula/aggregation_formula.rs @@ -4,7 +4,9 @@ //! An formula that supports aggregation operations. use super::Formula; -use crate::{Error, Metric, Sample, logical_meter::logical_meter_actor}; +use crate::{ + Error, Sample, logical_meter::logical_meter_actor, proto::common::v1::metrics::Metric, +}; use tokio::sync::{broadcast, mpsc, oneshot}; #[derive(Clone)] diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index c00a4a0..87411e9 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -12,10 +12,10 @@ use std::collections::{HashMap, HashSet}; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{MissedTickBehavior, interval}; +use crate::proto::common::v1::metrics::Metric; use crate::proto::common::v1::metrics::metric_value_variant::MetricValueVariant; use crate::{ - Error, Metric, MicrogridClientHandle, Sample, - proto::common::v1::microgrid::components::ComponentData, + Error, MicrogridClientHandle, Sample, proto::common::v1::microgrid::components::ComponentData, }; use super::config::LogicalMeterConfig; diff --git a/src/logical_meter/logical_meter_handle.rs b/src/logical_meter/logical_meter_handle.rs index 0aec03e..16208a8 100644 --- a/src/logical_meter/logical_meter_handle.rs +++ b/src/logical_meter/logical_meter_handle.rs @@ -1,6 +1,8 @@ // License: MIT // Copyright © 2025 Frequenz Energy-as-a-Service GmbH +use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider; +use crate::proto::common::v1::metrics::Metric; use crate::{ client::MicrogridClientHandle, error::Error, @@ -10,9 +12,7 @@ use frequenz_microgrid_component_graph::{self, ComponentGraph}; use std::collections::BTreeSet; use tokio::sync::mpsc; -use super::{ - AggregationFormula, LogicalMeterConfig, Metric, logical_meter_actor::LogicalMeterActor, -}; +use super::{AggregationFormula, LogicalMeterConfig, logical_meter_actor::LogicalMeterActor}; /// This provides an interface stream high-level metrics from a microgrid. #[derive(Clone)] @@ -56,93 +56,62 @@ impl LogicalMeterHandle { /// Returns a receiver that streams samples for the given `metric` at the grid /// connection point. - pub fn grid(&mut self, metric: Metric) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The grid formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.grid_formula().map_err(|e| { - Error::component_graph_error(format!("Could not derive grid formula: {e}")) - })?; - - Ok(AggregationFormula::new( - formula, - metric, - self.instructions_tx.clone(), - )) + pub fn grid( + &mut self, + metric: M, + ) -> Result { + M::FormulaType::grid(&self.graph, metric, self.instructions_tx.clone()) } /// Returns a receiver that streams samples for the given `metric` for the /// given battery IDs. /// /// When `component_ids` is `None`, all batteries in the microgrid are used. - pub fn battery( + pub fn battery( &mut self, component_ids: Option>, - metric: Metric, - ) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The battery formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.battery_formula(component_ids).map_err(|e| { - Error::component_graph_error(format!("Could not derive battery formula: {e}")) - })?; - Ok(AggregationFormula::new( - formula, + metric: M, + ) -> Result { + M::FormulaType::battery( + &self.graph, metric, self.instructions_tx.clone(), - )) + component_ids, + ) } /// Returns a receiver that streams samples for the given `metric` for the /// given CHP IDs. /// /// When `component_ids` is `None`, all CHPs in the microgrid are used. - pub fn chp( + pub fn chp( &mut self, component_ids: Option>, - metric: Metric, - ) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The CHP formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.chp_formula(component_ids).map_err(|e| { - Error::component_graph_error(format!("Could not derive CHP formula: {e}")) - })?; - Ok(AggregationFormula::new( - formula, + metric: M, + ) -> Result { + M::FormulaType::chp( + &self.graph, metric, self.instructions_tx.clone(), - )) + component_ids, + ) } /// Returns a receiver that streams samples for the given `metric` for the /// given PV IDs. /// /// When `component_ids` is `None`, all PVs in the microgrid are used. - pub fn pv( + pub fn pv( &mut self, component_ids: Option>, - metric: Metric, - ) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The PV formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.pv_formula(component_ids).map_err(|e| { - Error::component_graph_error(format!("Could not derive PV formula: {e}")) - })?; - Ok(AggregationFormula::new( - formula, + metric: M, + ) -> Result { + M::FormulaType::pv( + &self.graph, metric, self.instructions_tx.clone(), - )) + component_ids, + ) } /// Returns a receiver that streams samples for the given `metric` for the @@ -150,60 +119,35 @@ impl LogicalMeterHandle { /// /// When `component_ids` is `None`, all EV chargers in the microgrid are /// used. - pub fn ev_charger( + pub fn ev_charger( &mut self, component_ids: Option>, - metric: Metric, - ) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The EV charger formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.ev_charger_formula(component_ids).map_err(|e| { - Error::component_graph_error(format!("Could not derive EV charger formula: {e}")) - })?; - Ok(AggregationFormula::new( - formula, + metric: M, + ) -> Result { + M::FormulaType::ev_charger( + &self.graph, metric, self.instructions_tx.clone(), - )) + component_ids, + ) } /// Returns a receiver that streams samples for the given `metric` for all /// the consumers in the microgrid. - pub fn consumer(&mut self, metric: Metric) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The consumer formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.consumer_formula().map_err(|e| { - Error::component_graph_error(format!("Could not derive consumer formula: {e}")) - })?; - Ok(AggregationFormula::new( - formula, - metric, - self.instructions_tx.clone(), - )) + pub fn consumer( + &mut self, + metric: M, + ) -> Result { + M::FormulaType::consumer(&self.graph, metric, self.instructions_tx.clone()) } /// Returns a receiver that streams samples for the given `metric` for all /// producers in the microgrid. - pub fn producer(&mut self, metric: Metric) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The producer formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.producer_formula().map_err(|e| { - Error::component_graph_error(format!("Could not derive producer formula: {e}")) - })?; - Ok(AggregationFormula::new( - formula, - metric, - self.instructions_tx.clone(), - )) + pub fn producer( + &mut self, + metric: M, + ) -> Result { + M::FormulaType::producer(&self.graph, metric, self.instructions_tx.clone()) } pub fn coalesce( From cb32b8e752491acdf8c96e57cd81383875e664c7 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 17 Jul 2025 10:28:41 +0200 Subject: [PATCH 08/14] Remove old Metric definitions Signed-off-by: Sahas Subramanian --- src/lib.rs | 2 +- src/logical_meter.rs | 1 - src/logical_meter/metric.rs | 38 ------------------------------------- 3 files changed, 1 insertion(+), 40 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a165336..07da519 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,5 +16,5 @@ pub use sample::Sample; mod logical_meter; pub use logical_meter::{ - AggregationFormula, Formula, LogicalMeterConfig, LogicalMeterHandle, Metric, metric, + AggregationFormula, Formula, LogicalMeterConfig, LogicalMeterHandle, metric, }; diff --git a/src/logical_meter.rs b/src/logical_meter.rs index 1d3826b..7bc7251 100644 --- a/src/logical_meter.rs +++ b/src/logical_meter.rs @@ -9,7 +9,6 @@ mod logical_meter_actor; mod logical_meter_handle; pub use logical_meter_handle::LogicalMeterHandle; pub mod metric; -pub use metric::Metric; pub use config::LogicalMeterConfig; pub use formula::{AggregationFormula, Formula}; diff --git a/src/logical_meter/metric.rs b/src/logical_meter/metric.rs index b4a0a4e..bc126dc 100644 --- a/src/logical_meter/metric.rs +++ b/src/logical_meter/metric.rs @@ -42,41 +42,3 @@ define_metric! { {name: AcCurrentPhase2, formula: AggregationFormula}, {name: AcCurrentPhase3, formula: AggregationFormula}, } - -/// Metrics supported by the logical meter. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Metric { - // AC Power - AcActivePower = MetricPb::AcActivePower as isize, - AcReactivePower = MetricPb::AcReactivePower as isize, - // AC Current - AcCurrent = MetricPb::AcCurrent as isize, - AcCurrentPhase1 = MetricPb::AcCurrentPhase1 as isize, - AcCurrentPhase2 = MetricPb::AcCurrentPhase2 as isize, - AcCurrentPhase3 = MetricPb::AcCurrentPhase3 as isize, - // AC Voltage - AcVoltage = MetricPb::AcVoltage as isize, - AcVoltagePhase1N = MetricPb::AcVoltagePhase1N as isize, - AcVoltagePhase2N = MetricPb::AcVoltagePhase2N as isize, - AcVoltagePhase3N = MetricPb::AcVoltagePhase3N as isize, - AcVoltagePhase1Phase2 = MetricPb::AcVoltagePhase1Phase2 as isize, - AcVoltagePhase2Phase3 = MetricPb::AcVoltagePhase2Phase3 as isize, - AcVoltagePhase3Phase1 = MetricPb::AcVoltagePhase3Phase1 as isize, - // AC Frequency - AcFrequency = MetricPb::AcFrequency as isize, -} - -impl Metric { - pub(super) fn power(self) -> bool { - matches!(self, Metric::AcActivePower | Metric::AcReactivePower) - } - pub(super) fn current(self) -> bool { - matches!( - self, - Metric::AcCurrent - | Metric::AcCurrentPhase1 - | Metric::AcCurrentPhase2 - | Metric::AcCurrentPhase3 - ) - } -} From f50a1c30425dcc80c5001795b8f5b04ec9db57f8 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 17 Jul 2025 11:02:24 +0200 Subject: [PATCH 09/14] Implement `CoalesceFormula` And introduce support for voltage and frequency metrics using `CoalesceFormula`s. Signed-off-by: Sahas Subramanian --- src/logical_meter/formula.rs | 2 + src/logical_meter/formula/coalesce_formula.rs | 57 +++++++++++++++++++ .../formula/graph_formula_provider.rs | 10 +++- src/logical_meter/metric.rs | 10 ++++ 4 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 src/logical_meter/formula/coalesce_formula.rs diff --git a/src/logical_meter/formula.rs b/src/logical_meter/formula.rs index e92a9b6..5429df6 100644 --- a/src/logical_meter/formula.rs +++ b/src/logical_meter/formula.rs @@ -4,8 +4,10 @@ //! Formula module for the logical meter. mod aggregation_formula; +mod coalesce_formula; pub(crate) mod graph_formula_provider; pub use aggregation_formula::AggregationFormula; +pub use coalesce_formula::CoalesceFormula; use crate::{Error, Sample}; use tokio::sync::broadcast; diff --git a/src/logical_meter/formula/coalesce_formula.rs b/src/logical_meter/formula/coalesce_formula.rs new file mode 100644 index 0000000..877c741 --- /dev/null +++ b/src/logical_meter/formula/coalesce_formula.rs @@ -0,0 +1,57 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! An coalesce formula. + +use super::Formula; +use crate::{ + Error, Sample, logical_meter::logical_meter_actor, proto::common::v1::metrics::Metric, +}; +use tokio::sync::{broadcast, mpsc, oneshot}; + +#[derive(Clone)] +pub struct CoalesceFormula { + formula: frequenz_microgrid_component_graph::CoalesceFormula, + metric: Metric, + instructions_tx: mpsc::Sender, +} + +impl std::fmt::Display for CoalesceFormula { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.formula.fmt(f) + } +} + +impl CoalesceFormula { + pub(crate) fn new( + formula: frequenz_microgrid_component_graph::CoalesceFormula, + metric: Metric, + instructions_tx: mpsc::Sender, + ) -> Self { + Self { + formula, + metric, + instructions_tx, + } + } +} + +impl Formula for CoalesceFormula { + async fn subscribe(&self) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + + self.instructions_tx + .send(logical_meter_actor::Instruction::SubscribeFormula { + formula: self.formula.to_string(), + metric: self.metric, + response_tx: tx, + }) + .await + .map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?; + let receiver = rx.await.map_err(|e| { + Error::connection_failure(format!("Could not receive instruction: {e}")) + })?; + + Ok(receiver) + } +} diff --git a/src/logical_meter/formula/graph_formula_provider.rs b/src/logical_meter/formula/graph_formula_provider.rs index 305bd6f..ba3d9bf 100644 --- a/src/logical_meter/formula/graph_formula_provider.rs +++ b/src/logical_meter/formula/graph_formula_provider.rs @@ -11,7 +11,7 @@ use frequenz_microgrid_component_graph::ComponentGraph; use std::collections::BTreeSet; use tokio::sync::mpsc; -use super::AggregationFormula; +use super::{AggregationFormula, CoalesceFormula}; macro_rules! graph_formula_provider { ($(($fnname:ident $(, $idsparam:ident)?)),+ $(,)?) => {$( @@ -82,3 +82,11 @@ impl GraphFormulaProvider for AggregationFormula { (ev_charger, ev_charger_formula, ev_charger_ids), ); } + +impl GraphFormulaProvider for CoalesceFormula { + impl_graph_formula_provider!( + (grid, grid_coalesce_formula), + (battery, battery_ac_coalesce_formula, battery_ids), + (pv, pv_ac_coalesce_formula, pv_inverter_ids), + ); +} diff --git a/src/logical_meter/metric.rs b/src/logical_meter/metric.rs index bc126dc..cc65f63 100644 --- a/src/logical_meter/metric.rs +++ b/src/logical_meter/metric.rs @@ -41,4 +41,14 @@ define_metric! { {name: AcCurrentPhase1, formula: AggregationFormula}, {name: AcCurrentPhase2, formula: AggregationFormula}, {name: AcCurrentPhase3, formula: AggregationFormula}, + + {name: AcVoltage, formula: CoalesceFormula}, + {name: AcVoltagePhase1N, formula: CoalesceFormula}, + {name: AcVoltagePhase2N, formula: CoalesceFormula}, + {name: AcVoltagePhase3N, formula: CoalesceFormula}, + {name: AcVoltagePhase1Phase2, formula: CoalesceFormula}, + {name: AcVoltagePhase2Phase3, formula: CoalesceFormula}, + {name: AcVoltagePhase3Phase1, formula: CoalesceFormula}, + + {name: AcFrequency, formula: CoalesceFormula}, } From 2a8ddfb03ea32b7203f2e14fdf9201a86481a945 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 17 Jul 2025 11:36:59 +0200 Subject: [PATCH 10/14] Start a resampler for each requested metric for a component This is a bugfix, because earlier there was only one resampler for each component and data from all metrics were coming from that one resampler, which had the data only for the first metric that was subscribed to. Signed-off-by: Sahas Subramanian --- src/logical_meter/logical_meter_actor.rs | 67 +++++++++++++++--------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index 87411e9..1288e4d 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -95,8 +95,8 @@ impl LogicalMeterActor { } pub async fn run(mut self) { - let mut resamplers: HashMap = HashMap::new(); - let mut formulas: HashMap = HashMap::new(); + let mut resamplers: HashMap<(u64, Metric), ComponentDataResampler> = HashMap::new(); + let mut formulas: HashMap<(String, Metric), LogicalMeterFormula> = HashMap::new(); loop { tokio::select! { @@ -109,7 +109,7 @@ impl LogicalMeterActor { match instruction { Some(Instruction::SubscribeFormula{formula, metric, response_tx}) => { if let Err(err) = self.handle_subscribe_formula( - &formula, + formula, metric, response_tx, &mut formulas, @@ -145,25 +145,27 @@ impl LogicalMeterActor { /// in the formula, if it does not already exist. async fn handle_subscribe_formula( &mut self, - formula: &str, + formula: String, metric: Metric, receiver_tx: oneshot::Sender>, - formulas: &mut HashMap, - resamplers: &mut HashMap, + formulas: &mut HashMap<(String, Metric), LogicalMeterFormula>, + resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>, ) -> Result<(), Error> { - if formulas.contains_key(formula) { + let formula_key = (formula, metric); + if formulas.contains_key(&formula_key) { receiver_tx - .send(formulas[formula].sender.subscribe()) + .send(formulas[&formula_key].sender.subscribe()) .map_err(|_| Error::internal("Failed to send receiver for formula".to_string()))?; return Ok(()); } - let formula_engine = FormulaEngine::try_new(formula) + let formula_engine = FormulaEngine::try_new(&formula_key.0) .map_err(|e| Error::formula_engine_error(format!("Failed to parse formula: {e}")))?; let (sender, receiver) = broadcast::channel(8); for component_id in formula_engine.components() { - if resamplers.contains_key(component_id) { + let resampler_key = (*component_id, metric); + if resamplers.contains_key(&resampler_key) { continue; } let resampler = ComponentDataResampler { @@ -180,11 +182,11 @@ impl LogicalMeterActor { ), receiver: self.client.get_component_data_stream(*component_id).await?, }; - resamplers.insert(*component_id, resampler); + resamplers.insert(resampler_key, resampler); } formulas.insert( - formula.to_string(), + formula_key, LogicalMeterFormula { formula: formula_engine, sender, @@ -200,8 +202,8 @@ impl LogicalMeterActor { /// Resamples component data and evaluates formulas for the next timestamp. fn do_next( &mut self, - resamplers: &mut HashMap, - formulas: &mut HashMap, + resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>, + formulas: &mut HashMap<(String, Metric), LogicalMeterFormula>, ) -> Result<(), Error> { let mut comp_data = HashMap::new(); for (_, resampler) in resamplers.iter_mut() { @@ -219,33 +221,45 @@ impl LogicalMeterActor { } let mut formulas_to_drop = vec![]; - for (formula_str, formula) in formulas.iter_mut() { + for (formula_key, formula) in formulas.iter_mut() { let result = formula.formula.calculate(&comp_data).map_err(|e| { Error::formula_engine_error(format!("Failed to evaluate formula: {e}")) })?; if let Err(e) = formula.sender.send(Sample::new(self.next_ts, result)) { - tracing::debug!("No remaining subscribers for formula: {formula_str}. Err: {e}"); - formulas_to_drop.push(formula_str.to_string()); + tracing::debug!( + "No remaining subscribers for formula: {}:({}). Err: {e}", + formula_key.1.as_str_name(), + formula_key.0 + ); + formulas_to_drop.push(formula_key.clone()); } } - for formula_str in &formulas_to_drop { - if let Some(formula) = formulas.remove(formula_str) { - tracing::debug!("Dropping formula: {}", formula_str); + for formula_key in &formulas_to_drop { + if let Some(formula) = formulas.remove(formula_key) { + tracing::debug!( + "Dropping formula: {}:({})", + formula_key.1.as_str_name(), + formula_key.0 + ); drop(formula); } } if !formulas_to_drop.is_empty() { - let mut components = HashSet::::new(); - for (_, formula) in formulas.iter() { - components.extend(formula.formula.components()); + let mut components = HashSet::<(u64, Metric)>::new(); + for ((_, metric), formula) in formulas.iter() { + components.extend(formula.formula.components().iter().map(|&id| (id, *metric))); } resamplers.retain(|component_id, _| { if components.contains(component_id) { true } else { - tracing::debug!("Dropping resampler for component {}", component_id); + tracing::debug!( + "Dropping resampler for component {}:{}", + component_id.0, + component_id.1.as_str_name() + ); false } }); @@ -268,6 +282,11 @@ impl LogicalMeterActor { .iter() .find(|s| s.metric == metric as i32) else { + tracing::warn!( + "No data for metric {:?} in component {}", + metric, + resampler.component_id + ); return; }; let timestamp = if let Some(timestamp) = dd.sampled_at { From 5de5aa24d67be19342e2b519d422326905f5399b Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 17 Jul 2025 11:40:44 +0200 Subject: [PATCH 11/14] Update example to also stream grid voltage Signed-off-by: Sahas Subramanian --- examples/logical_meter.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/examples/logical_meter.rs b/examples/logical_meter.rs index 0bedcef..6e97f4a 100644 --- a/examples/logical_meter.rs +++ b/examples/logical_meter.rs @@ -36,7 +36,7 @@ async fn main() -> Result<(), Error> { let mut battery_rx = formula_battery.subscribe().await?; let mut consumer_rx = formula_consumer.subscribe().await?; - loop { + for _ in 0..10 { let sample = rx.recv().await.unwrap(); let grid_sample = grid_rx.recv().await.unwrap(); let battery_sample = battery_rx.recv().await.unwrap(); @@ -49,4 +49,11 @@ async fn main() -> Result<(), Error> { sample.value().unwrap() ); } + + let formula_grid_voltage = logical_meter.grid(metric::AcVoltagePhase1N)?; + let mut grid_voltage_rx = formula_grid_voltage.subscribe().await?; + loop { + let sample = grid_voltage_rx.recv().await.unwrap(); + tracing::info!("grid voltage: {}", sample.value().unwrap()); + } } From adac1a1c053906a4c86ded21f0fa290734c37127 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 21 Jul 2025 11:38:15 +0200 Subject: [PATCH 12/14] Fix doc-comment typo Signed-off-by: Sahas Subramanian --- src/client/microgrid_client_actor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index c0085a8..2ab3f0b 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -213,7 +213,7 @@ async fn handle_retry_timer( Ok(()) } -/// Creates anew data stream for the given component ID and starts a task to +/// Creates a new data stream for the given component ID and starts a task to /// fetch data from it in a loop. async fn start_component_data_stream( client: &mut MicrogridClient, From bcf19c273f1ebe8695f0ebd28389d6bc5a62b602 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 21 Jul 2025 11:38:32 +0200 Subject: [PATCH 13/14] Rename stream status channel for clarity The channel used to communicate the status of a component data stream was called `stream_stopped_tx/rx`, which is no longer accurate. Signed-off-by: Sahas Subramanian --- src/client/microgrid_client_actor.rs | 30 ++++++++++++++-------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index 2ab3f0b..04d7a34 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -59,7 +59,7 @@ impl MicrogridClientActor { let mut component_streams: HashMap> = HashMap::new(); - let (stream_stopped_tx, mut stream_stopped_rx) = mpsc::channel(50); + let (stream_status_tx, mut stream_status_rx) = mpsc::channel(50); let mut retry_timer = tokio::time::interval(std::time::Duration::from_secs(1)); retry_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut components_to_retry = HashMap::new(); @@ -71,12 +71,12 @@ impl MicrogridClientActor { &mut client, &mut component_streams, instruction, - stream_stopped_tx.clone(), + stream_status_tx.clone(), ).await { tracing::error!("MicrogridClientActor: Error handling instruction: {e}"); } } - stream_status = stream_stopped_rx.recv() => { + stream_status = stream_status_rx.recv() => { match stream_status { Some(StreamStatus::Failed(component_id)) => { components_to_retry.entry(component_id).or_insert_with( @@ -100,7 +100,7 @@ impl MicrogridClientActor { &mut client, &mut component_streams, &mut components_to_retry, - stream_stopped_tx.clone(), + stream_status_tx.clone(), now, ).await { tracing::error!("MicrogridClientActor: Error handling retry timer: {e}"); @@ -116,7 +116,7 @@ async fn handle_instruction( client: &mut MicrogridClient, component_streams: &mut HashMap>, instruction: Option, - stream_stopped_tx: mpsc::Sender, + stream_status_tx: mpsc::Sender, ) -> Result<(), Error> { match instruction { Some(Instruction::GetComponentDataStream { @@ -137,7 +137,7 @@ async fn handle_instruction( // API service into the channel. let (tx, rx) = broadcast::channel::(100); component_streams.insert(component_id, tx.clone()); - start_component_data_stream(client, component_id, tx, stream_stopped_tx).await?; + start_component_data_stream(client, component_id, tx, stream_status_tx).await?; response_tx.send(rx).map_err(|_| { tracing::error!("failed to send response"); @@ -189,7 +189,7 @@ async fn handle_retry_timer( client: &mut MicrogridClient, component_streams: &mut HashMap>, components_to_retry: &mut HashMap, - stream_stopped_tx: mpsc::Sender, + stream_status_tx: mpsc::Sender, now: tokio::time::Instant, ) -> Result<(), Error> { for item in components_to_retry.iter_mut() { @@ -200,7 +200,7 @@ async fn handle_retry_timer( item.1.mark_new_retry(); let (component_id, _) = item; if let Some(tx) = component_streams.get(component_id).cloned() { - start_component_data_stream(client, *component_id, tx, stream_stopped_tx.clone()) + start_component_data_stream(client, *component_id, tx, stream_status_tx.clone()) .await?; } else { tracing::error!("Component stream not found for retry: {component_id}"); @@ -219,7 +219,7 @@ async fn start_component_data_stream( client: &mut MicrogridClient, component_id: u64, tx: broadcast::Sender, - stream_stopped_tx: mpsc::Sender, + stream_status_tx: mpsc::Sender, ) -> Result<(), Error> { let stream = match client .receive_component_data_stream(ReceiveComponentDataStreamRequest { @@ -230,7 +230,7 @@ async fn start_component_data_stream( { Ok(s) => s.into_inner(), Err(e) => { - stream_stopped_tx + stream_status_tx .send(StreamStatus::Failed(component_id)) .await .map_err(|e| { @@ -244,7 +244,7 @@ async fn start_component_data_stream( } }; - stream_stopped_tx + stream_status_tx .send(StreamStatus::Connected(component_id)) .await .map_err(|e| { @@ -255,7 +255,7 @@ async fn start_component_data_stream( // create a task to fetch data from the stream in a loop and put into a channel. tokio::spawn( - run_component_data_stream(stream, component_id, tx, stream_stopped_tx).in_current_span(), + run_component_data_stream(stream, component_id, tx, stream_status_tx).in_current_span(), ); Ok(()) } @@ -264,7 +264,7 @@ async fn run_component_data_stream( mut stream: tonic::Streaming, component_id: u64, tx: broadcast::Sender, - stream_stopped_tx: mpsc::Sender, + stream_status_tx: mpsc::Sender, ) { loop { if tx.receiver_count() == 0 { @@ -272,7 +272,7 @@ async fn run_component_data_stream( "Dropping ComponentData stream for component_id:{:?}", component_id ); - stream_stopped_tx + stream_status_tx .send(StreamStatus::Ended(component_id)) .await .unwrap_or_else(|e| { @@ -315,7 +315,7 @@ async fn run_component_data_stream( }; } - if let Err(e) = stream_stopped_tx + if let Err(e) = stream_status_tx .send(StreamStatus::Failed(component_id)) .await { From 9aa1c784b607312b4713ecfa629a0a7553142d22 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Fri, 25 Jul 2025 17:52:58 +0200 Subject: [PATCH 14/14] Group resampled data by metrics and then by component ID Earlier we were picking a random metric and sending its data to all formulas, which was incorrect. Signed-off-by: Sahas Subramanian --- src/logical_meter/logical_meter_actor.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index 1288e4d..2a1424e 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -205,7 +205,8 @@ impl LogicalMeterActor { resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>, formulas: &mut HashMap<(String, Metric), LogicalMeterFormula>, ) -> Result<(), Error> { - let mut comp_data = HashMap::new(); + let mut resampled_metrics: HashMap>> = HashMap::new(); + for (_, resampler) in resamplers.iter_mut() { while let Ok(data) = resampler.receiver.try_recv() { self.push_to_resampler(resampler, data, resampler.metric); @@ -217,14 +218,20 @@ impl LogicalMeterActor { resampled.len() ))); } - comp_data.insert(resampler.component_id, resampled[0].clone().value()); + resampled_metrics + .entry(resampler.metric) + .or_default() + .insert(resampler.component_id, resampled[0].clone().value()); } let mut formulas_to_drop = vec![]; for (formula_key, formula) in formulas.iter_mut() { - let result = formula.formula.calculate(&comp_data).map_err(|e| { - Error::formula_engine_error(format!("Failed to evaluate formula: {e}")) - })?; + let result = formula + .formula + .calculate(resampled_metrics.entry(formula_key.1).or_default()) + .map_err(|e| { + Error::formula_engine_error(format!("Failed to evaluate formula: {e}")) + })?; if let Err(e) = formula.sender.send(Sample::new(self.next_ts, result)) { tracing::debug!(