diff --git a/Cargo.toml b/Cargo.toml index 8e7149b..33b469d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ path = "src/lib.rs" [dependencies] chrono = "0.4" -frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "ab3b998" } +frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "94dc826" } frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "e0567c0" } frequenz-resampling = { git = "https://github.com/frequenz-floss/frequenz-resampling-rs.git", rev = "ce84d66" } prost = "0.13" diff --git a/examples/logical_meter.rs b/examples/logical_meter.rs index 85445c8..6e97f4a 100644 --- a/examples/logical_meter.rs +++ b/examples/logical_meter.rs @@ -3,7 +3,7 @@ use chrono::TimeDelta; use frequenz_microgrid::{ - Error, LogicalMeterConfig, LogicalMeterHandle, Metric, MicrogridClientHandle, + Error, Formula, LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, metric, }; #[tokio::main] @@ -23,20 +23,20 @@ async fn main() -> Result<(), Error> { .await?; // Create a formula that calculates `grid_power - battery_power`. - let formula_grid = logical_meter.grid(Metric::AcActivePower)?; - let formula_battery = logical_meter.battery(None, Metric::AcActivePower)?; - let formula_consumer = logical_meter.consumer(Metric::AcActivePower)?; + let formula_grid = logical_meter.grid(metric::AcActivePower)?; + let formula_battery = logical_meter.battery(None, metric::AcActivePower)?; + let formula_consumer = logical_meter.consumer(metric::AcActivePower)?; - let formula = (logical_meter.grid(Metric::AcActivePower)? - - logical_meter.battery(None, Metric::AcActivePower)? - + logical_meter.consumer(Metric::AcActivePower)?)?; + let formula = (logical_meter.grid(metric::AcActivePower)? + - logical_meter.battery(None, metric::AcActivePower)? + + logical_meter.consumer(metric::AcActivePower)?)?; let mut rx = formula.subscribe().await?; let mut grid_rx = formula_grid.subscribe().await?; let mut battery_rx = formula_battery.subscribe().await?; let mut consumer_rx = formula_consumer.subscribe().await?; - loop { + for _ in 0..10 { let sample = rx.recv().await.unwrap(); let grid_sample = grid_rx.recv().await.unwrap(); let battery_sample = battery_rx.recv().await.unwrap(); @@ -49,4 +49,11 @@ async fn main() -> Result<(), Error> { sample.value().unwrap() ); } + + let formula_grid_voltage = logical_meter.grid(metric::AcVoltagePhase1N)?; + let mut grid_voltage_rx = formula_grid_voltage.subscribe().await?; + loop { + let sample = grid_voltage_rx.recv().await.unwrap(); + tracing::info!("grid voltage: {}", sample.value().unwrap()); + } } diff --git a/src/client/microgrid_client_actor.rs b/src/client/microgrid_client_actor.rs index c0085a8..04d7a34 100644 --- a/src/client/microgrid_client_actor.rs +++ b/src/client/microgrid_client_actor.rs @@ -59,7 +59,7 @@ impl MicrogridClientActor { let mut component_streams: HashMap> = HashMap::new(); - let (stream_stopped_tx, mut stream_stopped_rx) = mpsc::channel(50); + let (stream_status_tx, mut stream_status_rx) = mpsc::channel(50); let mut retry_timer = tokio::time::interval(std::time::Duration::from_secs(1)); retry_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mut components_to_retry = HashMap::new(); @@ -71,12 +71,12 @@ impl MicrogridClientActor { &mut client, &mut component_streams, instruction, - stream_stopped_tx.clone(), + stream_status_tx.clone(), ).await { tracing::error!("MicrogridClientActor: Error handling instruction: {e}"); } } - stream_status = stream_stopped_rx.recv() => { + stream_status = stream_status_rx.recv() => { match stream_status { Some(StreamStatus::Failed(component_id)) => { components_to_retry.entry(component_id).or_insert_with( @@ -100,7 +100,7 @@ impl MicrogridClientActor { &mut client, &mut component_streams, &mut components_to_retry, - stream_stopped_tx.clone(), + stream_status_tx.clone(), now, ).await { tracing::error!("MicrogridClientActor: Error handling retry timer: {e}"); @@ -116,7 +116,7 @@ async fn handle_instruction( client: &mut MicrogridClient, component_streams: &mut HashMap>, instruction: Option, - stream_stopped_tx: mpsc::Sender, + stream_status_tx: mpsc::Sender, ) -> Result<(), Error> { match instruction { Some(Instruction::GetComponentDataStream { @@ -137,7 +137,7 @@ async fn handle_instruction( // API service into the channel. let (tx, rx) = broadcast::channel::(100); component_streams.insert(component_id, tx.clone()); - start_component_data_stream(client, component_id, tx, stream_stopped_tx).await?; + start_component_data_stream(client, component_id, tx, stream_status_tx).await?; response_tx.send(rx).map_err(|_| { tracing::error!("failed to send response"); @@ -189,7 +189,7 @@ async fn handle_retry_timer( client: &mut MicrogridClient, component_streams: &mut HashMap>, components_to_retry: &mut HashMap, - stream_stopped_tx: mpsc::Sender, + stream_status_tx: mpsc::Sender, now: tokio::time::Instant, ) -> Result<(), Error> { for item in components_to_retry.iter_mut() { @@ -200,7 +200,7 @@ async fn handle_retry_timer( item.1.mark_new_retry(); let (component_id, _) = item; if let Some(tx) = component_streams.get(component_id).cloned() { - start_component_data_stream(client, *component_id, tx, stream_stopped_tx.clone()) + start_component_data_stream(client, *component_id, tx, stream_status_tx.clone()) .await?; } else { tracing::error!("Component stream not found for retry: {component_id}"); @@ -213,13 +213,13 @@ async fn handle_retry_timer( Ok(()) } -/// Creates anew data stream for the given component ID and starts a task to +/// Creates a new data stream for the given component ID and starts a task to /// fetch data from it in a loop. async fn start_component_data_stream( client: &mut MicrogridClient, component_id: u64, tx: broadcast::Sender, - stream_stopped_tx: mpsc::Sender, + stream_status_tx: mpsc::Sender, ) -> Result<(), Error> { let stream = match client .receive_component_data_stream(ReceiveComponentDataStreamRequest { @@ -230,7 +230,7 @@ async fn start_component_data_stream( { Ok(s) => s.into_inner(), Err(e) => { - stream_stopped_tx + stream_status_tx .send(StreamStatus::Failed(component_id)) .await .map_err(|e| { @@ -244,7 +244,7 @@ async fn start_component_data_stream( } }; - stream_stopped_tx + stream_status_tx .send(StreamStatus::Connected(component_id)) .await .map_err(|e| { @@ -255,7 +255,7 @@ async fn start_component_data_stream( // create a task to fetch data from the stream in a loop and put into a channel. tokio::spawn( - run_component_data_stream(stream, component_id, tx, stream_stopped_tx).in_current_span(), + run_component_data_stream(stream, component_id, tx, stream_status_tx).in_current_span(), ); Ok(()) } @@ -264,7 +264,7 @@ async fn run_component_data_stream( mut stream: tonic::Streaming, component_id: u64, tx: broadcast::Sender, - stream_stopped_tx: mpsc::Sender, + stream_status_tx: mpsc::Sender, ) { loop { if tx.receiver_count() == 0 { @@ -272,7 +272,7 @@ async fn run_component_data_stream( "Dropping ComponentData stream for component_id:{:?}", component_id ); - stream_stopped_tx + stream_status_tx .send(StreamStatus::Ended(component_id)) .await .unwrap_or_else(|e| { @@ -315,7 +315,7 @@ async fn run_component_data_stream( }; } - if let Err(e) = stream_stopped_tx + if let Err(e) = stream_status_tx .send(StreamStatus::Failed(component_id)) .await { diff --git a/src/lib.rs b/src/lib.rs index c8f1c1e..07da519 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,4 +15,6 @@ mod sample; pub use sample::Sample; mod logical_meter; -pub use logical_meter::{Formula, LogicalMeterConfig, LogicalMeterHandle, Metric}; +pub use logical_meter::{ + AggregationFormula, Formula, LogicalMeterConfig, LogicalMeterHandle, metric, +}; diff --git a/src/logical_meter.rs b/src/logical_meter.rs index 3fe3130..7bc7251 100644 --- a/src/logical_meter.rs +++ b/src/logical_meter.rs @@ -8,8 +8,7 @@ mod formula; mod logical_meter_actor; mod logical_meter_handle; pub use logical_meter_handle::LogicalMeterHandle; -mod metric; -pub use metric::Metric; +pub mod metric; pub use config::LogicalMeterConfig; -pub use formula::Formula; +pub use formula::{AggregationFormula, Formula}; diff --git a/src/logical_meter/formula.rs b/src/logical_meter/formula.rs index ff4e285..5429df6 100644 --- a/src/logical_meter/formula.rs +++ b/src/logical_meter/formula.rs @@ -1,129 +1,18 @@ // License: MIT // Copyright © 2025 Frequenz Energy-as-a-Service GmbH -//! A composable formula type, that can be subscribed to. +//! Formula module for the logical meter. -use tokio::sync::{broadcast, mpsc, oneshot}; +mod aggregation_formula; +mod coalesce_formula; +pub(crate) mod graph_formula_provider; +pub use aggregation_formula::AggregationFormula; +pub use coalesce_formula::CoalesceFormula; -use crate::{Error, Metric, Sample}; +use crate::{Error, Sample}; +use tokio::sync::broadcast; -use super::logical_meter_actor; - -#[derive(Clone)] -pub struct Formula { - formula: frequenz_microgrid_component_graph::Formula, - metric: Metric, - instructions_tx: mpsc::Sender, -} - -impl std::fmt::Display for Formula { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.formula.fmt(f) - } -} - -impl Formula { - pub(super) fn new( - formula: frequenz_microgrid_component_graph::Formula, - metric: Metric, - instructions_tx: mpsc::Sender, - ) -> Self { - Self { - formula, - metric, - instructions_tx, - } - } - - pub async fn subscribe(&self) -> Result, Error> { - let (tx, rx) = oneshot::channel(); - - self.instructions_tx - .send(logical_meter_actor::Instruction::SubscribeFormula { - formula: self.formula.to_string(), - metric: self.metric, - response_tx: tx, - }) - .await - .map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?; - let receiver = rx.await.map_err(|e| { - Error::connection_failure(format!("Could not receive instruction: {e}")) - })?; - - Ok(receiver) - } -} - -impl std::ops::Add for Formula { - type Output = Result; - - fn add(self, other: Self) -> Self::Output { - if self.metric != other.metric { - return Err(Error::invalid_metric(format!( - "Cannot add formulas with different metrics: {} and {}", - self.metric as isize, other.metric as isize - ))); - } - let new_formula = self.formula + other.formula; - Ok(Self::new(new_formula, self.metric, self.instructions_tx)) - } -} - -impl std::ops::Sub for Formula { - type Output = Result; - - fn sub(self, other: Self) -> Self::Output { - if self.metric != other.metric { - return Err(Error::invalid_metric(format!( - "Cannot subtract formulas with different metrics: {} and {}", - self.metric as isize, other.metric as isize - ))); - } - let new_formula = self.formula - other.formula; - Ok(Self::new(new_formula, self.metric, self.instructions_tx)) - } -} - -impl std::ops::Add for Result { - type Output = Result; - - fn add(self, other: Formula) -> Self::Output { - match self { - Ok(left) => left + other, - Err(e) => Err(e), - } - } -} - -impl std::ops::Sub for Result { - type Output = Result; - - fn sub(self, other: Formula) -> Self::Output { - match self { - Ok(left) => left - other, - Err(e) => Err(e), - } - } -} - -impl std::ops::Add> for Formula { - type Output = Result; - - fn add(self, other: Result) -> Self::Output { - match other { - Ok(right) => self + right, - Err(e) => Err(e), - } - } -} - -impl std::ops::Sub> for Formula { - type Output = Result; - - fn sub(self, other: Result) -> Self::Output { - match other { - Ok(right) => self - right, - Err(e) => Err(e), - } - } +/// Defines a formula that can be subscribed to for receiving samples. +pub trait Formula: std::fmt::Display { + fn subscribe(&self) -> impl Future, Error>> + Send; } diff --git a/src/logical_meter/formula/aggregation_formula.rs b/src/logical_meter/formula/aggregation_formula.rs new file mode 100644 index 0000000..f343d5e --- /dev/null +++ b/src/logical_meter/formula/aggregation_formula.rs @@ -0,0 +1,131 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! An formula that supports aggregation operations. + +use super::Formula; +use crate::{ + Error, Sample, logical_meter::logical_meter_actor, proto::common::v1::metrics::Metric, +}; +use tokio::sync::{broadcast, mpsc, oneshot}; + +#[derive(Clone)] +pub struct AggregationFormula { + formula: frequenz_microgrid_component_graph::AggregationFormula, + metric: Metric, + instructions_tx: mpsc::Sender, +} + +impl std::fmt::Display for AggregationFormula { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.formula.fmt(f) + } +} + +impl AggregationFormula { + pub(crate) fn new( + formula: frequenz_microgrid_component_graph::AggregationFormula, + metric: Metric, + instructions_tx: mpsc::Sender, + ) -> Self { + Self { + formula, + metric, + instructions_tx, + } + } +} + +impl Formula for AggregationFormula { + async fn subscribe(&self) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + + self.instructions_tx + .send(logical_meter_actor::Instruction::SubscribeFormula { + formula: self.formula.to_string(), + metric: self.metric, + response_tx: tx, + }) + .await + .map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?; + let receiver = rx.await.map_err(|e| { + Error::connection_failure(format!("Could not receive instruction: {e}")) + })?; + + Ok(receiver) + } +} + +impl std::ops::Add for AggregationFormula { + type Output = Result; + + fn add(self, other: Self) -> Self::Output { + if self.metric != other.metric { + return Err(Error::invalid_metric(format!( + "Cannot add formulas with different metrics: {} and {}", + self.metric as isize, other.metric as isize + ))); + } + let new_formula = self.formula + other.formula; + Ok(Self::new(new_formula, self.metric, self.instructions_tx)) + } +} + +impl std::ops::Sub for AggregationFormula { + type Output = Result; + + fn sub(self, other: Self) -> Self::Output { + if self.metric != other.metric { + return Err(Error::invalid_metric(format!( + "Cannot subtract formulas with different metrics: {} and {}", + self.metric as isize, other.metric as isize + ))); + } + let new_formula = self.formula - other.formula; + Ok(Self::new(new_formula, self.metric, self.instructions_tx)) + } +} + +impl std::ops::Add for Result { + type Output = Result; + + fn add(self, other: AggregationFormula) -> Self::Output { + match self { + Ok(left) => left + other, + Err(e) => Err(e), + } + } +} + +impl std::ops::Sub for Result { + type Output = Result; + + fn sub(self, other: AggregationFormula) -> Self::Output { + match self { + Ok(left) => left - other, + Err(e) => Err(e), + } + } +} + +impl std::ops::Add> for AggregationFormula { + type Output = Result; + + fn add(self, other: Result) -> Self::Output { + match other { + Ok(right) => self + right, + Err(e) => Err(e), + } + } +} + +impl std::ops::Sub> for AggregationFormula { + type Output = Result; + + fn sub(self, other: Result) -> Self::Output { + match other { + Ok(right) => self - right, + Err(e) => Err(e), + } + } +} diff --git a/src/logical_meter/formula/coalesce_formula.rs b/src/logical_meter/formula/coalesce_formula.rs new file mode 100644 index 0000000..877c741 --- /dev/null +++ b/src/logical_meter/formula/coalesce_formula.rs @@ -0,0 +1,57 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! An coalesce formula. + +use super::Formula; +use crate::{ + Error, Sample, logical_meter::logical_meter_actor, proto::common::v1::metrics::Metric, +}; +use tokio::sync::{broadcast, mpsc, oneshot}; + +#[derive(Clone)] +pub struct CoalesceFormula { + formula: frequenz_microgrid_component_graph::CoalesceFormula, + metric: Metric, + instructions_tx: mpsc::Sender, +} + +impl std::fmt::Display for CoalesceFormula { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.formula.fmt(f) + } +} + +impl CoalesceFormula { + pub(crate) fn new( + formula: frequenz_microgrid_component_graph::CoalesceFormula, + metric: Metric, + instructions_tx: mpsc::Sender, + ) -> Self { + Self { + formula, + metric, + instructions_tx, + } + } +} + +impl Formula for CoalesceFormula { + async fn subscribe(&self) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + + self.instructions_tx + .send(logical_meter_actor::Instruction::SubscribeFormula { + formula: self.formula.to_string(), + metric: self.metric, + response_tx: tx, + }) + .await + .map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?; + let receiver = rx.await.map_err(|e| { + Error::connection_failure(format!("Could not receive instruction: {e}")) + })?; + + Ok(receiver) + } +} diff --git a/src/logical_meter/formula/graph_formula_provider.rs b/src/logical_meter/formula/graph_formula_provider.rs new file mode 100644 index 0000000..ba3d9bf --- /dev/null +++ b/src/logical_meter/formula/graph_formula_provider.rs @@ -0,0 +1,92 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! A composable formula type, that can be subscribed to. + +use crate::Error; +use crate::logical_meter::logical_meter_actor; +use crate::proto::common::v1::microgrid::components::Component; +use crate::proto::common::v1::microgrid::components::ComponentConnection; +use frequenz_microgrid_component_graph::ComponentGraph; +use std::collections::BTreeSet; +use tokio::sync::mpsc; + +use super::{AggregationFormula, CoalesceFormula}; + +macro_rules! graph_formula_provider { + ($(($fnname:ident $(, $idsparam:ident)?)),+ $(,)?) => {$( + + fn $fnname( + _graph: &ComponentGraph, + _metric: M, + _instructions_tx: mpsc::Sender, + $($idsparam: Option>,)? + ) -> Result { + return Err(Error::component_graph_error( + format!( + "The component graph does not support {} formula generation for {}.", + stringify!($fnname), + _metric.to_string() + ) + )); + } + + )+}; +} + +/// Provides methods for generating corresponding formulas from the component +/// graph. +/// +/// The component graph exposes methods to retrieve `AggregationFormula`s and +/// `CoalesceFormula`s for each of these metrics. This trait provides a +/// way to generalize them. +pub trait GraphFormulaProvider: Sized { + graph_formula_provider!( + (grid), + (consumer), + (producer), + (battery, _battery_ids), + (chp, _chp_ids), + (pv, _pv_inverter_ids), + (ev_charger, _ev_charger_ids), + ); +} + +macro_rules! impl_graph_formula_provider { + ($(($fnname:ident, $graphfnname:ident$(, $idsparam:ident)?)),+ $(,)?) => {$( + + fn $fnname( + graph: &ComponentGraph, + _metric: M, + instructions_tx: mpsc::Sender, + $($idsparam: Option>,)? + ) -> Result { + let formula = graph.$graphfnname($($idsparam)?).map_err(|e| { + Error::component_graph_error( + format!("Could not get {} formula: {e}", stringify!($fnname)) + ) + })?; + Ok(Self::new(formula, M::METRIC, instructions_tx)) + } + )+}; +} + +impl GraphFormulaProvider for AggregationFormula { + impl_graph_formula_provider!( + (grid, grid_formula), + (consumer, consumer_formula), + (producer, producer_formula), + (battery, battery_formula, battery_ids), + (chp, chp_formula, chp_ids), + (pv, pv_formula, pv_inverter_ids), + (ev_charger, ev_charger_formula, ev_charger_ids), + ); +} + +impl GraphFormulaProvider for CoalesceFormula { + impl_graph_formula_provider!( + (grid, grid_coalesce_formula), + (battery, battery_ac_coalesce_formula, battery_ids), + (pv, pv_ac_coalesce_formula, pv_inverter_ids), + ); +} diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index c6c0023..2a1424e 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -12,10 +12,10 @@ use std::collections::{HashMap, HashSet}; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{MissedTickBehavior, interval}; +use crate::proto::common::v1::metrics::Metric; use crate::proto::common::v1::metrics::metric_value_variant::MetricValueVariant; use crate::{ - Error, Metric, MicrogridClientHandle, Sample, - proto::common::v1::microgrid::components::ComponentData, + Error, MicrogridClientHandle, Sample, proto::common::v1::microgrid::components::ComponentData, }; use super::config::LogicalMeterConfig; @@ -32,7 +32,7 @@ struct ComponentDataResampler { receiver: broadcast::Receiver, } -pub(super) enum Instruction { +pub(crate) enum Instruction { SubscribeFormula { formula: String, metric: Metric, @@ -95,8 +95,8 @@ impl LogicalMeterActor { } pub async fn run(mut self) { - let mut resamplers: HashMap = HashMap::new(); - let mut formulas: HashMap = HashMap::new(); + let mut resamplers: HashMap<(u64, Metric), ComponentDataResampler> = HashMap::new(); + let mut formulas: HashMap<(String, Metric), LogicalMeterFormula> = HashMap::new(); loop { tokio::select! { @@ -109,7 +109,7 @@ impl LogicalMeterActor { match instruction { Some(Instruction::SubscribeFormula{formula, metric, response_tx}) => { if let Err(err) = self.handle_subscribe_formula( - &formula, + formula, metric, response_tx, &mut formulas, @@ -145,25 +145,27 @@ impl LogicalMeterActor { /// in the formula, if it does not already exist. async fn handle_subscribe_formula( &mut self, - formula: &str, + formula: String, metric: Metric, receiver_tx: oneshot::Sender>, - formulas: &mut HashMap, - resamplers: &mut HashMap, + formulas: &mut HashMap<(String, Metric), LogicalMeterFormula>, + resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>, ) -> Result<(), Error> { - if formulas.contains_key(formula) { + let formula_key = (formula, metric); + if formulas.contains_key(&formula_key) { receiver_tx - .send(formulas[formula].sender.subscribe()) + .send(formulas[&formula_key].sender.subscribe()) .map_err(|_| Error::internal("Failed to send receiver for formula".to_string()))?; return Ok(()); } - let formula_engine = FormulaEngine::try_new(formula) + let formula_engine = FormulaEngine::try_new(&formula_key.0) .map_err(|e| Error::formula_engine_error(format!("Failed to parse formula: {e}")))?; let (sender, receiver) = broadcast::channel(8); for component_id in formula_engine.components() { - if resamplers.contains_key(component_id) { + let resampler_key = (*component_id, metric); + if resamplers.contains_key(&resampler_key) { continue; } let resampler = ComponentDataResampler { @@ -180,11 +182,11 @@ impl LogicalMeterActor { ), receiver: self.client.get_component_data_stream(*component_id).await?, }; - resamplers.insert(*component_id, resampler); + resamplers.insert(resampler_key, resampler); } formulas.insert( - formula.to_string(), + formula_key, LogicalMeterFormula { formula: formula_engine, sender, @@ -200,10 +202,11 @@ impl LogicalMeterActor { /// Resamples component data and evaluates formulas for the next timestamp. fn do_next( &mut self, - resamplers: &mut HashMap, - formulas: &mut HashMap, + resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>, + formulas: &mut HashMap<(String, Metric), LogicalMeterFormula>, ) -> Result<(), Error> { - let mut comp_data = HashMap::new(); + let mut resampled_metrics: HashMap>> = HashMap::new(); + for (_, resampler) in resamplers.iter_mut() { while let Ok(data) = resampler.receiver.try_recv() { self.push_to_resampler(resampler, data, resampler.metric); @@ -215,37 +218,55 @@ impl LogicalMeterActor { resampled.len() ))); } - comp_data.insert(resampler.component_id, resampled[0].clone().value()); + resampled_metrics + .entry(resampler.metric) + .or_default() + .insert(resampler.component_id, resampled[0].clone().value()); } let mut formulas_to_drop = vec![]; - for (formula_str, formula) in formulas.iter_mut() { - let result = formula.formula.calculate(&comp_data).map_err(|e| { - Error::formula_engine_error(format!("Failed to evaluate formula: {e}")) - })?; + for (formula_key, formula) in formulas.iter_mut() { + let result = formula + .formula + .calculate(resampled_metrics.entry(formula_key.1).or_default()) + .map_err(|e| { + Error::formula_engine_error(format!("Failed to evaluate formula: {e}")) + })?; if let Err(e) = formula.sender.send(Sample::new(self.next_ts, result)) { - tracing::debug!("No remaining subscribers for formula: {formula_str}. Err: {e}"); - formulas_to_drop.push(formula_str.to_string()); + tracing::debug!( + "No remaining subscribers for formula: {}:({}). Err: {e}", + formula_key.1.as_str_name(), + formula_key.0 + ); + formulas_to_drop.push(formula_key.clone()); } } - for formula_str in &formulas_to_drop { - if let Some(formula) = formulas.remove(formula_str) { - tracing::debug!("Dropping formula: {}", formula_str); + for formula_key in &formulas_to_drop { + if let Some(formula) = formulas.remove(formula_key) { + tracing::debug!( + "Dropping formula: {}:({})", + formula_key.1.as_str_name(), + formula_key.0 + ); drop(formula); } } if !formulas_to_drop.is_empty() { - let mut components = HashSet::::new(); - for (_, formula) in formulas.iter() { - components.extend(formula.formula.components()); + let mut components = HashSet::<(u64, Metric)>::new(); + for ((_, metric), formula) in formulas.iter() { + components.extend(formula.formula.components().iter().map(|&id| (id, *metric))); } resamplers.retain(|component_id, _| { if components.contains(component_id) { true } else { - tracing::debug!("Dropping resampler for component {}", component_id); + tracing::debug!( + "Dropping resampler for component {}:{}", + component_id.0, + component_id.1.as_str_name() + ); false } }); @@ -268,6 +289,11 @@ impl LogicalMeterActor { .iter() .find(|s| s.metric == metric as i32) else { + tracing::warn!( + "No data for metric {:?} in component {}", + metric, + resampler.component_id + ); return; }; let timestamp = if let Some(timestamp) = dd.sampled_at { diff --git a/src/logical_meter/logical_meter_handle.rs b/src/logical_meter/logical_meter_handle.rs index 5e76235..16208a8 100644 --- a/src/logical_meter/logical_meter_handle.rs +++ b/src/logical_meter/logical_meter_handle.rs @@ -1,6 +1,8 @@ // License: MIT // Copyright © 2025 Frequenz Energy-as-a-Service GmbH +use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider; +use crate::proto::common::v1::metrics::Metric; use crate::{ client::MicrogridClientHandle, error::Error, @@ -10,7 +12,7 @@ use frequenz_microgrid_component_graph::{self, ComponentGraph}; use std::collections::BTreeSet; use tokio::sync::mpsc; -use super::{Formula, LogicalMeterConfig, Metric, logical_meter_actor::LogicalMeterActor}; +use super::{AggregationFormula, LogicalMeterConfig, logical_meter_actor::LogicalMeterActor}; /// This provides an interface stream high-level metrics from a microgrid. #[derive(Clone)] @@ -54,77 +56,62 @@ impl LogicalMeterHandle { /// Returns a receiver that streams samples for the given `metric` at the grid /// connection point. - pub fn grid(&mut self, metric: Metric) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The grid formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.grid_formula().map_err(|e| { - Error::component_graph_error(format!("Could not derive grid formula: {e}")) - })?; - - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + pub fn grid( + &mut self, + metric: M, + ) -> Result { + M::FormulaType::grid(&self.graph, metric, self.instructions_tx.clone()) } /// Returns a receiver that streams samples for the given `metric` for the /// given battery IDs. /// /// When `component_ids` is `None`, all batteries in the microgrid are used. - pub fn battery( + pub fn battery( &mut self, component_ids: Option>, - metric: Metric, - ) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The battery formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.battery_formula(component_ids).map_err(|e| { - Error::component_graph_error(format!("Could not derive battery formula: {e}")) - })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + metric: M, + ) -> Result { + M::FormulaType::battery( + &self.graph, + metric, + self.instructions_tx.clone(), + component_ids, + ) } /// Returns a receiver that streams samples for the given `metric` for the /// given CHP IDs. /// /// When `component_ids` is `None`, all CHPs in the microgrid are used. - pub fn chp( + pub fn chp( &mut self, component_ids: Option>, - metric: Metric, - ) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The CHP formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.chp_formula(component_ids).map_err(|e| { - Error::component_graph_error(format!("Could not derive CHP formula: {e}")) - })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + metric: M, + ) -> Result { + M::FormulaType::chp( + &self.graph, + metric, + self.instructions_tx.clone(), + component_ids, + ) } /// Returns a receiver that streams samples for the given `metric` for the /// given PV IDs. /// /// When `component_ids` is `None`, all PVs in the microgrid are used. - pub fn pv( + pub fn pv( &mut self, component_ids: Option>, - metric: Metric, - ) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The PV formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.pv_formula(component_ids).map_err(|e| { - Error::component_graph_error(format!("Could not derive PV formula: {e}")) - })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + metric: M, + ) -> Result { + M::FormulaType::pv( + &self.graph, + metric, + self.instructions_tx.clone(), + component_ids, + ) } /// Returns a receiver that streams samples for the given `metric` for the @@ -132,58 +119,49 @@ impl LogicalMeterHandle { /// /// When `component_ids` is `None`, all EV chargers in the microgrid are /// used. - pub fn ev_charger( + pub fn ev_charger( &mut self, component_ids: Option>, - metric: Metric, - ) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The EV charger formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.ev_charger_formula(component_ids).map_err(|e| { - Error::component_graph_error(format!("Could not derive EV charger formula: {e}")) - })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + metric: M, + ) -> Result { + M::FormulaType::ev_charger( + &self.graph, + metric, + self.instructions_tx.clone(), + component_ids, + ) } /// Returns a receiver that streams samples for the given `metric` for all /// the consumers in the microgrid. - pub fn consumer(&mut self, metric: Metric) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The consumer formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.consumer_formula().map_err(|e| { - Error::component_graph_error(format!("Could not derive consumer formula: {e}")) - })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + pub fn consumer( + &mut self, + metric: M, + ) -> Result { + M::FormulaType::consumer(&self.graph, metric, self.instructions_tx.clone()) } /// Returns a receiver that streams samples for the given `metric` for all /// producers in the microgrid. - pub fn producer(&mut self, metric: Metric) -> Result { - if !metric.power() && !metric.current() { - return Err(Error::invalid_metric(format!( - "The producer formula only supports power or current metrics, but got: {metric:?}" - ))); - } - let formula = self.graph.producer_formula().map_err(|e| { - Error::component_graph_error(format!("Could not derive producer formula: {e}")) - })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + pub fn producer( + &mut self, + metric: M, + ) -> Result { + M::FormulaType::producer(&self.graph, metric, self.instructions_tx.clone()) } pub fn coalesce( &mut self, component_ids: BTreeSet, metric: Metric, - ) -> Result { + ) -> Result { let formula = self.graph.coalesce(component_ids).map_err(|e| { Error::component_graph_error(format!("Could not derive coalesce formula: {e}")) })?; - Ok(Formula::new(formula, metric, self.instructions_tx.clone())) + Ok(AggregationFormula::new( + formula, + metric, + self.instructions_tx.clone(), + )) } } diff --git a/src/logical_meter/metric.rs b/src/logical_meter/metric.rs index 8978091..cc65f63 100644 --- a/src/logical_meter/metric.rs +++ b/src/logical_meter/metric.rs @@ -3,42 +3,52 @@ //! Metrics supported by the logical meter. +pub(crate) mod metric_trait; + use crate::proto::common::v1::metrics::Metric as MetricPb; -/// Metrics supported by the logical meter. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Metric { - // AC Power - AcActivePower = MetricPb::AcActivePower as isize, - AcReactivePower = MetricPb::AcReactivePower as isize, - // AC Current - AcCurrent = MetricPb::AcCurrent as isize, - AcCurrentPhase1 = MetricPb::AcCurrentPhase1 as isize, - AcCurrentPhase2 = MetricPb::AcCurrentPhase2 as isize, - AcCurrentPhase3 = MetricPb::AcCurrentPhase3 as isize, - // AC Voltage - AcVoltage = MetricPb::AcVoltage as isize, - AcVoltagePhase1N = MetricPb::AcVoltagePhase1N as isize, - AcVoltagePhase2N = MetricPb::AcVoltagePhase2N as isize, - AcVoltagePhase3N = MetricPb::AcVoltagePhase3N as isize, - AcVoltagePhase1Phase2 = MetricPb::AcVoltagePhase1Phase2 as isize, - AcVoltagePhase2Phase3 = MetricPb::AcVoltagePhase2Phase3 as isize, - AcVoltagePhase3Phase1 = MetricPb::AcVoltagePhase3Phase1 as isize, - // AC Frequency - AcFrequency = MetricPb::AcFrequency as isize, +use super::formula; +use metric_trait::AcMetric; + +macro_rules! define_metric { + ($({name: $metric_name:ident, formula: $formula:ident}),+ $(,)?) => { + $( + // Define a metric + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub struct $metric_name; + + // Implement the AcMetric trait for the metric + impl AcMetric for $metric_name { + type FormulaType = formula::$formula; + + const METRIC: MetricPb = MetricPb::$metric_name; + } + + impl std::fmt::Display for $metric_name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", stringify!($metric_name)) + } + } + + )+ + }; } -impl Metric { - pub(super) fn power(self) -> bool { - matches!(self, Metric::AcActivePower | Metric::AcReactivePower) - } - pub(super) fn current(self) -> bool { - matches!( - self, - Metric::AcCurrent - | Metric::AcCurrentPhase1 - | Metric::AcCurrentPhase2 - | Metric::AcCurrentPhase3 - ) - } +define_metric! { + {name: AcActivePower, formula: AggregationFormula}, + {name: AcReactivePower, formula: AggregationFormula}, + {name: AcCurrent, formula: AggregationFormula}, + {name: AcCurrentPhase1, formula: AggregationFormula}, + {name: AcCurrentPhase2, formula: AggregationFormula}, + {name: AcCurrentPhase3, formula: AggregationFormula}, + + {name: AcVoltage, formula: CoalesceFormula}, + {name: AcVoltagePhase1N, formula: CoalesceFormula}, + {name: AcVoltagePhase2N, formula: CoalesceFormula}, + {name: AcVoltagePhase3N, formula: CoalesceFormula}, + {name: AcVoltagePhase1Phase2, formula: CoalesceFormula}, + {name: AcVoltagePhase2Phase3, formula: CoalesceFormula}, + {name: AcVoltagePhase3Phase1, formula: CoalesceFormula}, + + {name: AcFrequency, formula: CoalesceFormula}, } diff --git a/src/logical_meter/metric/metric_trait.rs b/src/logical_meter/metric/metric_trait.rs new file mode 100644 index 0000000..cdc5371 --- /dev/null +++ b/src/logical_meter/metric/metric_trait.rs @@ -0,0 +1,14 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! A trait specifying the output formula type and the corresponding PB metric, +//! for all metrics supported by the logical meter. + +use super::formula; +use crate::proto::common::v1::metrics::Metric as MetricPb; + +pub trait AcMetric: std::fmt::Display { + type FormulaType: formula::Formula + formula::graph_formula_provider::GraphFormulaProvider; + + const METRIC: MetricPb; +}