diff --git a/Cargo.toml b/Cargo.toml index fcae139..b972800 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ path = "src/lib.rs" [dependencies] async-trait = "0.1.89" chrono = "0.4" -frequenz-microgrid-component-graph = "0.5.0" +frequenz-microgrid-component-graph = { git = "https://github.com/shsms/frequenz-microgrid-component-graph-rs", branch = "meter-subtraction" } frequenz-microgrid-formula-engine = "0.1.0" frequenz-resampling = "0.2" futures = "0.3.31" diff --git a/src/logical_meter/formula.rs b/src/logical_meter/formula.rs index 217e7cd..0ba326a 100644 --- a/src/logical_meter/formula.rs +++ b/src/logical_meter/formula.rs @@ -6,9 +6,8 @@ use std::marker::PhantomData; use async_trait::async_trait; -pub(crate) mod aggregation_formula; mod async_formula; -pub(crate) mod coalesce_formula; +pub(crate) mod graph_formula; pub(crate) mod graph_formula_provider; pub use async_formula::Formula; @@ -25,11 +24,6 @@ use tokio::sync::{ use super::logical_meter_actor; -/// Connects logical meter formulas to the component graph formulas. -pub(crate) trait GraphFormulaConnector: std::fmt::Display { - type GraphFormulaType: frequenz_microgrid_component_graph::Formula; -} - #[async_trait] pub trait FormulaSubscriber: std::fmt::Display + Sync + Send { type QuantityType: Quantity; @@ -37,15 +31,15 @@ pub trait FormulaSubscriber: std::fmt::Display + Sync + Send { } /// Parameters for creating a logical meter formula. -pub(super) struct FormulaParams { - pub(super) formula: F::GraphFormulaType, +pub(super) struct FormulaParams { + pub(super) formula: frequenz_microgrid_component_graph::Formula, pub(super) instructions_tx: mpsc::Sender, phantom: PhantomData, } -impl FormulaParams { +impl FormulaParams { pub(super) fn new( - formula: F::GraphFormulaType, + formula: frequenz_microgrid_component_graph::Formula, instructions_tx: mpsc::Sender, ) -> Self { Self { diff --git a/src/logical_meter/formula/aggregation_formula.rs b/src/logical_meter/formula/aggregation_formula.rs deleted file mode 100644 index 25d10ec..0000000 --- a/src/logical_meter/formula/aggregation_formula.rs +++ /dev/null @@ -1,75 +0,0 @@ -// License: MIT -// Copyright © 2025 Frequenz Energy-as-a-Service GmbH - -//! An formula that supports aggregation operations. - -use std::marker::PhantomData; - -use super::{FormulaParams, FormulaSubscriber, GraphFormulaConnector}; -use crate::{ - Error, Sample, logical_meter::logical_meter_actor, metric::Metric, quantity::Quantity, -}; -use async_trait::async_trait; -use tokio::sync::{broadcast, mpsc, oneshot}; - -#[derive(Clone)] -pub struct AggregationFormula { - formula: frequenz_microgrid_component_graph::AggregationFormula, - instructions_tx: mpsc::Sender, - phantom: PhantomData, -} - -impl std::fmt::Display for AggregationFormula { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}::({})", M::METRIC.as_str_name(), self.formula) - } -} - -impl GraphFormulaConnector for AggregationFormula { - type GraphFormulaType = frequenz_microgrid_component_graph::AggregationFormula; -} - -#[async_trait] -impl + Sync + Send> FormulaSubscriber - for AggregationFormula -{ - type QuantityType = Q; - - async fn subscribe(&self) -> Result>, Error> { - let (tx, rx) = oneshot::channel(); - - self.instructions_tx - .send(logical_meter_actor::Instruction::SubscribeFormula { - formula: self.formula.to_string(), - metric: M::METRIC, - response_tx: tx.try_into()?, - }) - .await - .map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?; - let receiver = rx.await.map_err(|e| { - Error::connection_failure(format!("Could not receive instruction: {e}")) - })?; - - Ok(receiver) - } -} - -impl From, M>> for AggregationFormula { - fn from(params: FormulaParams, M>) -> Self { - Self { - formula: params.formula, - instructions_tx: params.instructions_tx, - phantom: PhantomData, - } - } -} - -impl From> for FormulaParams, M> { - fn from(formula: AggregationFormula) -> Self { - FormulaParams { - formula: formula.formula, - instructions_tx: formula.instructions_tx, - phantom: PhantomData, - } - } -} diff --git a/src/logical_meter/formula/coalesce_formula.rs b/src/logical_meter/formula/coalesce_formula.rs deleted file mode 100644 index ee70aaf..0000000 --- a/src/logical_meter/formula/coalesce_formula.rs +++ /dev/null @@ -1,75 +0,0 @@ -// License: MIT -// Copyright © 2025 Frequenz Energy-as-a-Service GmbH - -//! An coalesce formula. - -use std::marker::PhantomData; - -use super::{FormulaParams, FormulaSubscriber, GraphFormulaConnector}; -use crate::{ - Error, Sample, logical_meter::logical_meter_actor, metric::Metric, quantity::Quantity, -}; -use async_trait::async_trait; -use tokio::sync::{broadcast, mpsc, oneshot}; - -#[derive(Clone)] -pub struct CoalesceFormula { - formula: frequenz_microgrid_component_graph::CoalesceFormula, - instructions_tx: mpsc::Sender, - phantom: PhantomData, -} - -impl std::fmt::Display for CoalesceFormula { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}::({})", M::METRIC.as_str_name(), self.formula) - } -} - -impl GraphFormulaConnector for CoalesceFormula { - type GraphFormulaType = frequenz_microgrid_component_graph::CoalesceFormula; -} - -#[async_trait] -impl + Sync + Send> FormulaSubscriber - for CoalesceFormula -{ - type QuantityType = Q; - - async fn subscribe(&self) -> Result>, Error> { - let (tx, rx) = oneshot::channel(); - - self.instructions_tx - .send(logical_meter_actor::Instruction::SubscribeFormula { - formula: self.formula.to_string(), - metric: M::METRIC, - response_tx: tx.try_into()?, - }) - .await - .map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?; - let receiver = rx.await.map_err(|e| { - Error::connection_failure(format!("Could not receive instruction: {e}")) - })?; - - Ok(receiver) - } -} - -impl From, M>> for CoalesceFormula { - fn from(params: FormulaParams, M>) -> Self { - Self { - formula: params.formula, - instructions_tx: params.instructions_tx, - phantom: PhantomData, - } - } -} - -impl From> for FormulaParams, M> { - fn from(formula: CoalesceFormula) -> Self { - FormulaParams { - formula: formula.formula, - phantom: formula.phantom, - instructions_tx: formula.instructions_tx, - } - } -} diff --git a/src/logical_meter/formula/graph_formula.rs b/src/logical_meter/formula/graph_formula.rs new file mode 100644 index 0000000..9116f6a --- /dev/null +++ b/src/logical_meter/formula/graph_formula.rs @@ -0,0 +1,94 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! A formula generated from the component graph, subscribable through the +//! logical-meter actor. +//! +//! The wrapper body is identical for every formula; the marker type `K` only +//! selects which component-graph methods generate it (the aggregation +//! `*_formula` methods vs. the `*_coalesce_formula` ones), via the +//! [`GraphFormulaProvider`](super::graph_formula_provider::GraphFormulaProvider) +//! impls. The [`AggregationFormula`] / [`CoalesceFormula`] aliases name the two +//! kinds. + +use std::marker::PhantomData; + +use super::{FormulaParams, FormulaSubscriber}; +use crate::{ + Error, Sample, logical_meter::logical_meter_actor, metric::Metric, quantity::Quantity, +}; +use async_trait::async_trait; +use tokio::sync::{broadcast, mpsc, oneshot}; + +/// Marker for formulas from the aggregation (`*_formula`) graph methods. +pub enum Aggregation {} + +/// Marker for formulas from the coalesce (`*_coalesce_formula`) graph methods. +pub enum Coalesce {} + +/// A component-graph formula for metric `M`, tagged by the kind `K` that +/// selects the graph methods generating it. +pub struct GraphFormula { + formula: frequenz_microgrid_component_graph::Formula, + instructions_tx: mpsc::Sender, + phantom: PhantomData<(M, K)>, +} + +/// A formula that supports aggregation operations. +pub type AggregationFormula = GraphFormula; + +/// A formula built from the component graph's coalesce methods. +pub type CoalesceFormula = GraphFormula; + +// Manual `Clone`: a derive would demand `K: Clone`, but the marker types are +// uninhabited and carry no data. +impl Clone for GraphFormula { + fn clone(&self) -> Self { + Self { + formula: self.formula.clone(), + instructions_tx: self.instructions_tx.clone(), + phantom: PhantomData, + } + } +} + +impl std::fmt::Display for GraphFormula { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}::({})", M::METRIC.as_str_name(), self.formula) + } +} + +#[async_trait] +impl + Sync + Send, K: Sync + Send> + FormulaSubscriber for GraphFormula +{ + type QuantityType = Q; + + async fn subscribe(&self) -> Result>, Error> { + let (tx, rx) = oneshot::channel(); + + self.instructions_tx + .send(logical_meter_actor::Instruction::SubscribeFormula { + formula: self.formula.to_string(), + metric: M::METRIC, + response_tx: tx.try_into()?, + }) + .await + .map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?; + let receiver = rx.await.map_err(|e| { + Error::connection_failure(format!("Could not receive instruction: {e}")) + })?; + + Ok(receiver) + } +} + +impl From> for GraphFormula { + fn from(params: FormulaParams) -> Self { + Self { + formula: params.formula, + instructions_tx: params.instructions_tx, + phantom: PhantomData, + } + } +} diff --git a/src/logical_meter/formula/graph_formula_provider.rs b/src/logical_meter/formula/graph_formula_provider.rs index 5fdcbfd..8847e1c 100644 --- a/src/logical_meter/formula/graph_formula_provider.rs +++ b/src/logical_meter/formula/graph_formula_provider.rs @@ -8,8 +8,7 @@ use crate::client::proto::common::microgrid::electrical_components::{ ElectricalComponent, ElectricalComponentConnection, }; use crate::logical_meter::formula::FormulaParams; -use crate::logical_meter::formula::aggregation_formula::AggregationFormula; -use crate::logical_meter::formula::coalesce_formula::CoalesceFormula; +use crate::logical_meter::formula::graph_formula::{Aggregation, Coalesce, GraphFormula}; use crate::logical_meter::logical_meter_actor; use crate::metric::Metric; @@ -84,7 +83,7 @@ macro_rules! impl_graph_formula_provider { )+}; } -impl GraphFormulaProvider for AggregationFormula { +impl GraphFormulaProvider for GraphFormula { type MetricType = M; impl_graph_formula_provider!( @@ -99,7 +98,7 @@ impl GraphFormulaProvider for AggregationFormula { ); } -impl GraphFormulaProvider for CoalesceFormula { +impl GraphFormulaProvider for GraphFormula { type MetricType = M; impl_graph_formula_provider!( diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index df3379b..ac3a51f 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -639,6 +639,42 @@ mod tests { quantity::Power, }; + /// The component-graph meter-minus-sibling subtraction formulas the actor + /// now receives (e.g. `COALESCE(#8, #5 - #6, 0.0)`) parse and evaluate + /// through the formula engine: the component's own reading wins, else the + /// parent meter minus its sibling, else zero. + #[test] + fn subtraction_formula_evaluates_through_engine() { + let engine = FormulaEngine::::try_new("COALESCE(#8, #5 - #6, 0.0)") + .expect("formula should parse"); + + // Own reading present: used directly. + assert_eq!( + engine + .calculate(&HashMap::from([ + (8, Some(7.0)), + (5, Some(10.0)), + (6, Some(3.0)) + ])) + .unwrap(), + Some(7.0), + ); + // Own reading missing: parent meter minus its sibling (10 - 3). + assert_eq!( + engine + .calculate(&HashMap::from([(8, None), (5, Some(10.0)), (6, Some(3.0))])) + .unwrap(), + Some(7.0), + ); + // Own reading and meter missing: the difference is null, so zero wins. + assert_eq!( + engine + .calculate(&HashMap::from([(8, None), (5, None), (6, Some(3.0))])) + .unwrap(), + Some(0.0), + ); + } + async fn new_handle( meter: MockComponent, config: LogicalMeterConfig, diff --git a/src/logical_meter/logical_meter_handle.rs b/src/logical_meter/logical_meter_handle.rs index 139c096..bb813cc 100644 --- a/src/logical_meter/logical_meter_handle.rs +++ b/src/logical_meter/logical_meter_handle.rs @@ -303,7 +303,7 @@ mod tests { .unwrap(); assert_eq!( formula.to_string(), - "METRIC_AC_POWER_ACTIVE::(COALESCE(#8, 0.0))" + "METRIC_AC_POWER_ACTIVE::(COALESCE(#8, #5 - #6, 0.0))" ); let formula = lm diff --git a/src/metric.rs b/src/metric.rs index 2d32b6b..13f8a4f 100644 --- a/src/metric.rs +++ b/src/metric.rs @@ -3,8 +3,7 @@ //! Metrics supported by the logical meter. -use crate::logical_meter::formula::aggregation_formula::AggregationFormula; -use crate::logical_meter::formula::coalesce_formula::CoalesceFormula; +use crate::logical_meter::formula::graph_formula::{AggregationFormula, CoalesceFormula}; use crate::{ client::proto::common::metrics::Metric as MetricPb, logical_meter::formula, logical_meter::formula::FormulaSubscriber, diff --git a/src/microgrid/pv_pool.rs b/src/microgrid/pv_pool.rs index 8d59f4f..7b79488 100644 --- a/src/microgrid/pv_pool.rs +++ b/src/microgrid/pv_pool.rs @@ -265,7 +265,7 @@ mod tests { let formula = pool.power().unwrap(); assert_eq!( formula.to_string(), - "METRIC_AC_POWER_ACTIVE::(COALESCE(#3, COALESCE(#5, 0.0) + COALESCE(#4, 0.0)))" + "METRIC_AC_POWER_ACTIVE::(COALESCE(#5 + #4, #3, COALESCE(#5, 0.0) + COALESCE(#4, 0.0)))" ); } @@ -276,7 +276,7 @@ mod tests { let formula = pool.power().unwrap(); assert_eq!( formula.to_string(), - "METRIC_AC_POWER_ACTIVE::(COALESCE(#3, COALESCE(#5, 0.0) + COALESCE(#4, 0.0)))" + "METRIC_AC_POWER_ACTIVE::(COALESCE(#5 + #4, #3, COALESCE(#5, 0.0) + COALESCE(#4, 0.0)))" ); } }