diff --git a/Cargo.toml b/Cargo.toml index 33b469d..55e4dda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,8 +9,8 @@ path = "src/lib.rs" [dependencies] chrono = "0.4" -frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "94dc826" } -frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "e0567c0" } +frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "b2fd616" } +frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "463414a" } frequenz-resampling = { git = "https://github.com/frequenz-floss/frequenz-resampling-rs.git", rev = "ce84d66" } prost = "0.13" prost-types = "0.13" @@ -19,6 +19,8 @@ tonic = "0.13" tracing = { version = "0.1" } tracing-subscriber = { version = "0.3" } +[dev-dependencies] +tracing-subscriber = { version = "0.3", features = ["std", "env-filter"] } [build-dependencies] tonic-build = "0.13" diff --git a/examples/logical_meter.rs b/examples/logical_meter.rs index 6e97f4a..07ebdb5 100644 --- a/examples/logical_meter.rs +++ b/examples/logical_meter.rs @@ -5,12 +5,17 @@ use chrono::TimeDelta; use frequenz_microgrid::{ Error, Formula, LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, metric, }; +use tracing_subscriber::{ + EnvFilter, + fmt::{self}, + prelude::*, +}; #[tokio::main] async fn main() -> Result<(), Error> { - tracing_subscriber::fmt::fmt() - .with_file(true) - .with_line_number(true) + tracing_subscriber::registry() + .with(EnvFilter::new("info,frequenz_microgrid=debug")) + .with(fmt::layer().with_file(true).with_line_number(true)) .init(); let client = MicrogridClientHandle::new("http://[::1]:8800"); @@ -36,7 +41,7 @@ async fn main() -> Result<(), Error> { let mut battery_rx = formula_battery.subscribe().await?; let mut consumer_rx = formula_consumer.subscribe().await?; - for _ in 0..10 { + for _ in 0..3 { let sample = rx.recv().await.unwrap(); let grid_sample = grid_rx.recv().await.unwrap(); let battery_sample = battery_rx.recv().await.unwrap(); @@ -49,9 +54,22 @@ async fn main() -> Result<(), Error> { sample.value().unwrap() ); } + let formula_grid_voltage = logical_meter + .battery(None, metric::AcVoltagePhase1N)? + .coalesce(logical_meter.pv(None, metric::AcVoltagePhase1N)?)?; - let formula_grid_voltage = logical_meter.grid(metric::AcVoltagePhase1N)?; + tracing::info!("formula_grid_voltage: {}", formula_grid_voltage); let mut grid_voltage_rx = formula_grid_voltage.subscribe().await?; + for _ in 0..3 { + let sample = grid_voltage_rx.recv().await.unwrap(); + tracing::info!("grid voltage: {}", sample.value().unwrap()); + } + + drop(rx); + drop(grid_rx); + drop(battery_rx); + drop(consumer_rx); + loop { let sample = grid_voltage_rx.recv().await.unwrap(); tracing::info!("grid voltage: {}", sample.value().unwrap()); diff --git a/src/logical_meter/formula.rs b/src/logical_meter/formula.rs index 5429df6..efd3cbc 100644 --- a/src/logical_meter/formula.rs +++ b/src/logical_meter/formula.rs @@ -3,16 +3,111 @@ //! Formula module for the logical meter. +use frequenz_microgrid_component_graph::Formula as _; mod aggregation_formula; mod coalesce_formula; pub(crate) mod graph_formula_provider; pub use aggregation_formula::AggregationFormula; pub use coalesce_formula::CoalesceFormula; -use crate::{Error, Sample}; -use tokio::sync::broadcast; +use crate::{Error, Sample, proto::common::v1::metrics::Metric}; +use tokio::sync::{broadcast, mpsc}; + +use super::logical_meter_actor; + +/// Connects logical meter formulas to the component graph formulas. +pub(crate) trait GraphFormulaProvider: std::fmt::Display { + type GraphFormulaType: frequenz_microgrid_component_graph::Formula; +} /// Defines a formula that can be subscribed to for receiving samples. -pub trait Formula: std::fmt::Display { +pub(crate) trait FormulaSubscriber: std::fmt::Display { + fn subscribe(&self) -> impl Future, Error>> + Send; +} + +/// Parameters for creating a logical meter formula. +pub(super) struct FormulaParams { + pub(super) formula: F::GraphFormulaType, + pub(super) metric: Metric, + pub(super) instructions_tx: mpsc::Sender, +} + +impl FormulaParams { + pub(super) fn new( + formula: F::GraphFormulaType, + metric: Metric, + instructions_tx: mpsc::Sender, + ) -> Self { + Self { + formula, + metric, + instructions_tx, + } + } +} + +/// A trait that defines generic formula operations. +pub trait Formula: std::fmt::Display + Sized { + fn coalesce(self, other: Self) -> Result; + fn min(self, other: Self) -> Result; + fn max(self, other: Self) -> Result; fn subscribe(&self) -> impl Future, Error>> + Send; } + +impl Formula for T +where + T: FormulaSubscriber + + GraphFormulaProvider + + From> + + Into> + + std::fmt::Display, +{ + fn coalesce(self, other: Self) -> Result { + let mut params_self: FormulaParams = self.into(); + let params_other: FormulaParams = other.into(); + + if params_self.metric != params_other.metric { + return Err(Error::invalid_metric(format!( + "Cannot coalesce formulas with different metrics: {} and {}", + params_self.metric.as_str_name(), + params_other.metric.as_str_name() + ))); + } + params_self.formula = params_self.formula.coalesce(params_other.formula); + Ok(params_self.into()) + } + + fn min(self, other: Self) -> Result { + let mut params_self: FormulaParams = self.into(); + let params_other: FormulaParams = other.into(); + + if params_self.metric != params_other.metric { + return Err(Error::invalid_metric(format!( + "Cannot take min of formulas with different metrics: {} and {}", + params_self.metric.as_str_name(), + params_other.metric.as_str_name() + ))); + } + params_self.formula = params_self.formula.min(params_other.formula); + Ok(params_self.into()) + } + + fn max(self, other: Self) -> Result { + let mut params_self: FormulaParams = self.into(); + let params_other: FormulaParams = other.into(); + + if params_self.metric != params_other.metric { + return Err(Error::invalid_metric(format!( + "Cannot take max of formulas with different metrics: {} and {}", + params_self.metric.as_str_name(), + params_other.metric.as_str_name() + ))); + } + params_self.formula = params_self.formula.max(params_other.formula); + Ok(params_self.into()) + } + + fn subscribe(&self) -> impl Future, Error>> + Send { + ::subscribe(self) + } +} diff --git a/src/logical_meter/formula/aggregation_formula.rs b/src/logical_meter/formula/aggregation_formula.rs index f343d5e..1010878 100644 --- a/src/logical_meter/formula/aggregation_formula.rs +++ b/src/logical_meter/formula/aggregation_formula.rs @@ -3,7 +3,7 @@ //! An formula that supports aggregation operations. -use super::Formula; +use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider}; use crate::{ Error, Sample, logical_meter::logical_meter_actor, proto::common::v1::metrics::Metric, }; @@ -22,21 +22,11 @@ impl std::fmt::Display for AggregationFormula { } } -impl AggregationFormula { - pub(crate) fn new( - formula: frequenz_microgrid_component_graph::AggregationFormula, - metric: Metric, - instructions_tx: mpsc::Sender, - ) -> Self { - Self { - formula, - metric, - instructions_tx, - } - } +impl GraphFormulaProvider for AggregationFormula { + type GraphFormulaType = frequenz_microgrid_component_graph::AggregationFormula; } -impl Formula for AggregationFormula { +impl FormulaSubscriber for AggregationFormula { async fn subscribe(&self) -> Result, Error> { let (tx, rx) = oneshot::channel(); @@ -56,6 +46,26 @@ impl Formula for AggregationFormula { } } +impl From> for AggregationFormula { + fn from(params: FormulaParams) -> Self { + Self { + formula: params.formula, + metric: params.metric, + instructions_tx: params.instructions_tx, + } + } +} + +impl From for FormulaParams { + fn from(formula: AggregationFormula) -> Self { + FormulaParams { + formula: formula.formula, + metric: formula.metric, + instructions_tx: formula.instructions_tx, + } + } +} + impl std::ops::Add for AggregationFormula { type Output = Result; @@ -67,7 +77,7 @@ impl std::ops::Add for AggregationFormula { ))); } let new_formula = self.formula + other.formula; - Ok(Self::new(new_formula, self.metric, self.instructions_tx)) + Ok(FormulaParams::new(new_formula, self.metric, self.instructions_tx).into()) } } @@ -82,7 +92,7 @@ impl std::ops::Sub for AggregationFormula { ))); } let new_formula = self.formula - other.formula; - Ok(Self::new(new_formula, self.metric, self.instructions_tx)) + Ok(FormulaParams::new(new_formula, self.metric, self.instructions_tx).into()) } } diff --git a/src/logical_meter/formula/coalesce_formula.rs b/src/logical_meter/formula/coalesce_formula.rs index 877c741..551a2dc 100644 --- a/src/logical_meter/formula/coalesce_formula.rs +++ b/src/logical_meter/formula/coalesce_formula.rs @@ -3,7 +3,7 @@ //! An coalesce formula. -use super::Formula; +use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider}; use crate::{ Error, Sample, logical_meter::logical_meter_actor, proto::common::v1::metrics::Metric, }; @@ -22,21 +22,11 @@ impl std::fmt::Display for CoalesceFormula { } } -impl CoalesceFormula { - pub(crate) fn new( - formula: frequenz_microgrid_component_graph::CoalesceFormula, - metric: Metric, - instructions_tx: mpsc::Sender, - ) -> Self { - Self { - formula, - metric, - instructions_tx, - } - } +impl GraphFormulaProvider for CoalesceFormula { + type GraphFormulaType = frequenz_microgrid_component_graph::CoalesceFormula; } -impl Formula for CoalesceFormula { +impl FormulaSubscriber for CoalesceFormula { async fn subscribe(&self) -> Result, Error> { let (tx, rx) = oneshot::channel(); @@ -55,3 +45,23 @@ impl Formula for CoalesceFormula { Ok(receiver) } } + +impl From> for CoalesceFormula { + fn from(params: FormulaParams) -> Self { + Self { + formula: params.formula, + metric: params.metric, + instructions_tx: params.instructions_tx, + } + } +} + +impl From for FormulaParams { + fn from(formula: CoalesceFormula) -> Self { + FormulaParams { + formula: formula.formula, + metric: formula.metric, + instructions_tx: formula.instructions_tx, + } + } +} diff --git a/src/logical_meter/formula/graph_formula_provider.rs b/src/logical_meter/formula/graph_formula_provider.rs index ba3d9bf..5b0fd94 100644 --- a/src/logical_meter/formula/graph_formula_provider.rs +++ b/src/logical_meter/formula/graph_formula_provider.rs @@ -4,6 +4,7 @@ //! A composable formula type, that can be subscribed to. use crate::Error; +use crate::logical_meter::formula::FormulaParams; use crate::logical_meter::logical_meter_actor; use crate::proto::common::v1::microgrid::components::Component; use crate::proto::common::v1::microgrid::components::ComponentConnection; @@ -14,13 +15,14 @@ use tokio::sync::mpsc; use super::{AggregationFormula, CoalesceFormula}; macro_rules! graph_formula_provider { - ($(($fnname:ident $(, $idsparam:ident)?)),+ $(,)?) => {$( + ($(($fnname:ident $(, ids:$idsparam:ident)? $(, id:$idparam:ident)?)),+ $(,)?) => {$( - fn $fnname( + fn $fnname( _graph: &ComponentGraph, _metric: M, _instructions_tx: mpsc::Sender, $($idsparam: Option>,)? + $($idparam: u64,)? ) -> Result { return Err(Error::component_graph_error( format!( @@ -45,29 +47,37 @@ pub trait GraphFormulaProvider: Sized { (grid), (consumer), (producer), - (battery, _battery_ids), - (chp, _chp_ids), - (pv, _pv_inverter_ids), - (ev_charger, _ev_charger_ids), + (battery, ids: _battery_ids), + (chp, ids: _chp_ids), + (pv, ids: _pv_inverter_ids), + (ev_charger, ids: _ev_charger_ids), + (component, id: _component_id), ); } macro_rules! impl_graph_formula_provider { - ($(($fnname:ident, $graphfnname:ident$(, $idsparam:ident)?)),+ $(,)?) => {$( + ($(( + $fnname:ident, + $graphfnname:ident + $(, ids:$idsparam:ident)? + $(, id:$idparam:ident)? + )),+ $(,)?) => {$( - fn $fnname( + fn $fnname( graph: &ComponentGraph, _metric: M, instructions_tx: mpsc::Sender, $($idsparam: Option>,)? + $($idparam: u64,)? ) -> Result { - let formula = graph.$graphfnname($($idsparam)?).map_err(|e| { + let formula = graph.$graphfnname($($idsparam)?$($idparam)?).map_err(|e| { Error::component_graph_error( format!("Could not get {} formula: {e}", stringify!($fnname)) ) })?; - Ok(Self::new(formula, M::METRIC, instructions_tx)) + Ok(FormulaParams::new(formula, M::METRIC, instructions_tx).into()) } + )+}; } @@ -76,17 +86,19 @@ impl GraphFormulaProvider for AggregationFormula { (grid, grid_formula), (consumer, consumer_formula), (producer, producer_formula), - (battery, battery_formula, battery_ids), - (chp, chp_formula, chp_ids), - (pv, pv_formula, pv_inverter_ids), - (ev_charger, ev_charger_formula, ev_charger_ids), + (battery, battery_formula, ids: battery_ids), + (chp, chp_formula, ids: chp_ids), + (pv, pv_formula, ids: pv_inverter_ids), + (ev_charger, ev_charger_formula, ids: ev_charger_ids), + (component, component_formula, id: component_id), ); } impl GraphFormulaProvider for CoalesceFormula { impl_graph_formula_provider!( (grid, grid_coalesce_formula), - (battery, battery_ac_coalesce_formula, battery_ids), - (pv, pv_ac_coalesce_formula, pv_inverter_ids), + (battery, battery_ac_coalesce_formula, ids: battery_ids), + (pv, pv_ac_coalesce_formula, ids: pv_inverter_ids), + (component, component_ac_coalesce_formula, id: component_id), ); } diff --git a/src/logical_meter/logical_meter_handle.rs b/src/logical_meter/logical_meter_handle.rs index 16208a8..55b9634 100644 --- a/src/logical_meter/logical_meter_handle.rs +++ b/src/logical_meter/logical_meter_handle.rs @@ -2,7 +2,6 @@ // Copyright © 2025 Frequenz Energy-as-a-Service GmbH use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider; -use crate::proto::common::v1::metrics::Metric; use crate::{ client::MicrogridClientHandle, error::Error, @@ -12,7 +11,7 @@ use frequenz_microgrid_component_graph::{self, ComponentGraph}; use std::collections::BTreeSet; use tokio::sync::mpsc; -use super::{AggregationFormula, LogicalMeterConfig, logical_meter_actor::LogicalMeterActor}; +use super::{LogicalMeterConfig, logical_meter_actor::LogicalMeterActor}; /// This provides an interface stream high-level metrics from a microgrid. #[derive(Clone)] @@ -56,10 +55,7 @@ impl LogicalMeterHandle { /// Returns a receiver that streams samples for the given `metric` at the grid /// connection point. - pub fn grid( - &mut self, - metric: M, - ) -> Result { + pub fn grid(&mut self, metric: M) -> Result { M::FormulaType::grid(&self.graph, metric, self.instructions_tx.clone()) } @@ -67,7 +63,7 @@ impl LogicalMeterHandle { /// given battery IDs. /// /// When `component_ids` is `None`, all batteries in the microgrid are used. - pub fn battery( + pub fn battery( &mut self, component_ids: Option>, metric: M, @@ -84,7 +80,7 @@ impl LogicalMeterHandle { /// given CHP IDs. /// /// When `component_ids` is `None`, all CHPs in the microgrid are used. - pub fn chp( + pub fn chp( &mut self, component_ids: Option>, metric: M, @@ -101,7 +97,7 @@ impl LogicalMeterHandle { /// given PV IDs. /// /// When `component_ids` is `None`, all PVs in the microgrid are used. - pub fn pv( + pub fn pv( &mut self, component_ids: Option>, metric: M, @@ -119,7 +115,7 @@ impl LogicalMeterHandle { /// /// When `component_ids` is `None`, all EV chargers in the microgrid are /// used. - pub fn ev_charger( + pub fn ev_charger( &mut self, component_ids: Option>, metric: M, @@ -132,36 +128,36 @@ impl LogicalMeterHandle { ) } - /// Returns a receiver that streams samples for the given `metric` for all - /// the consumers in the microgrid. - pub fn consumer( + /// Returns a receiver that streams samples for the given `metric` for the + /// logical `consumer` in the microgrid. + pub fn consumer( &mut self, metric: M, ) -> Result { M::FormulaType::consumer(&self.graph, metric, self.instructions_tx.clone()) } - /// Returns a receiver that streams samples for the given `metric` for all - /// producers in the microgrid. - pub fn producer( + /// Returns a receiver that streams samples for the given `metric` for the + /// logical `producer` in the microgrid. + pub fn producer( &mut self, metric: M, ) -> Result { M::FormulaType::producer(&self.graph, metric, self.instructions_tx.clone()) } - pub fn coalesce( + /// Returns a receiver that streams samples for the given `metric` for the + /// given component ID. + pub fn component( &mut self, - component_ids: BTreeSet, - metric: Metric, - ) -> Result { - let formula = self.graph.coalesce(component_ids).map_err(|e| { - Error::component_graph_error(format!("Could not derive coalesce formula: {e}")) - })?; - Ok(AggregationFormula::new( - formula, + component_id: u64, + metric: M, + ) -> Result { + M::FormulaType::component( + &self.graph, metric, self.instructions_tx.clone(), - )) + component_id, + ) } } diff --git a/src/logical_meter/metric.rs b/src/logical_meter/metric.rs index cc65f63..16ec880 100644 --- a/src/logical_meter/metric.rs +++ b/src/logical_meter/metric.rs @@ -3,12 +3,15 @@ //! Metrics supported by the logical meter. -pub(crate) mod metric_trait; - use crate::proto::common::v1::metrics::Metric as MetricPb; use super::formula; -use metric_trait::AcMetric; + +pub trait Metric: std::fmt::Display + std::fmt::Debug + Clone + Copy + PartialEq + Eq { + type FormulaType: formula::Formula + formula::graph_formula_provider::GraphFormulaProvider; + + const METRIC: MetricPb; +} macro_rules! define_metric { ($({name: $metric_name:ident, formula: $formula:ident}),+ $(,)?) => { @@ -18,7 +21,7 @@ macro_rules! define_metric { pub struct $metric_name; // Implement the AcMetric trait for the metric - impl AcMetric for $metric_name { + impl Metric for $metric_name { type FormulaType = formula::$formula; const METRIC: MetricPb = MetricPb::$metric_name; diff --git a/src/logical_meter/metric/metric_trait.rs b/src/logical_meter/metric/metric_trait.rs deleted file mode 100644 index cdc5371..0000000 --- a/src/logical_meter/metric/metric_trait.rs +++ /dev/null @@ -1,14 +0,0 @@ -// License: MIT -// Copyright © 2025 Frequenz Energy-as-a-Service GmbH - -//! A trait specifying the output formula type and the corresponding PB metric, -//! for all metrics supported by the logical meter. - -use super::formula; -use crate::proto::common::v1::metrics::Metric as MetricPb; - -pub trait AcMetric: std::fmt::Display { - type FormulaType: formula::Formula + formula::graph_formula_provider::GraphFormulaProvider; - - const METRIC: MetricPb; -}