diff --git a/src/error.rs b/src/error.rs index 2efc228..7e78681 100644 --- a/src/error.rs +++ b/src/error.rs @@ -43,6 +43,11 @@ macro_rules! ErrorKind { } } )* + + /// Returns the kind of error that occurred. + pub fn kind(&self) -> ErrorKind { + self.kind.clone() + } } }; } @@ -51,8 +56,8 @@ ErrorKind!( (ComponentGraphError, component_graph_error), (ConnectionFailure, connection_failure), (ChronoError, chrono_error), + (DroppedUnusedFormulas, dropped_unused_formulas), (FormulaEngineError, formula_engine_error), - (InvalidMetric, invalid_metric), (Internal, internal) ); diff --git a/src/lib.rs b/src/lib.rs index 07da519..77dfae6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,8 @@ pub use client::MicrogridClientHandle; mod error; pub use error::{Error, ErrorKind}; +mod quantity; + mod proto; mod sample; diff --git a/src/logical_meter/formula.rs b/src/logical_meter/formula.rs index e4110fb..9e827df 100644 --- a/src/logical_meter/formula.rs +++ b/src/logical_meter/formula.rs @@ -10,7 +10,7 @@ pub(crate) mod graph_formula_provider; pub use aggregation_formula::AggregationFormula; pub use coalesce_formula::CoalesceFormula; -use crate::{Error, Sample, proto::common::v1alpha8::metrics::Metric}; +use crate::{Error, Sample, metric::Metric, quantity::Quantity}; use tokio::sync::{broadcast, mpsc}; use super::logical_meter_actor; @@ -22,20 +22,29 @@ pub(crate) trait GraphFormulaProvider: std::fmt::Display { /// Defines a formula that can be subscribed to for receiving samples. pub(crate) trait FormulaSubscriber: std::fmt::Display { - fn subscribe(&self) -> impl Future, Error>> + Send; + type MetricType: Metric; + + fn subscribe( + &self, + ) -> impl Future< + Output = Result< + broadcast::Receiver::QuantityType>>, + Error, + >, + > + Send; } /// Parameters for creating a logical meter formula. -pub(super) struct FormulaParams { +pub(super) struct FormulaParams { pub(super) formula: F::GraphFormulaType, - pub(super) metric: Metric, + pub(super) metric: M, pub(super) instructions_tx: mpsc::Sender, } -impl FormulaParams { +impl FormulaParams { pub(super) fn new( formula: F::GraphFormulaType, - metric: Metric, + metric: M, instructions_tx: mpsc::Sender, ) -> Self { Self { @@ -47,67 +56,53 @@ impl FormulaParams { } /// A trait that defines generic formula operations. -pub trait Formula: std::fmt::Display + Sized { +pub trait Formula: std::fmt::Display + Sized { fn coalesce(self, other: Self) -> Result; fn min(self, other: Self) -> Result; fn max(self, other: Self) -> Result; - fn subscribe(&self) -> impl Future, Error>> + Send; + fn subscribe( + &self, + ) -> impl Future>, Error>> + Send; } -impl Formula for T +impl Formula for T where - T: FormulaSubscriber + T: FormulaSubscriber + GraphFormulaProvider - + From> - + Into> + + From> + + Into> + std::fmt::Display, + Q: Quantity, + M: Metric, { fn coalesce(self, other: Self) -> Result { - let mut params_self: FormulaParams = self.into(); - let params_other: FormulaParams = other.into(); - - if params_self.metric != params_other.metric { - return Err(Error::invalid_metric(format!( - "Cannot coalesce formulas with different metrics: {} and {}", - params_self.metric.as_str_name(), - params_other.metric.as_str_name() - ))); - } + let mut params_self: FormulaParams = self.into(); + let params_other: FormulaParams = other.into(); + params_self.formula = params_self.formula.coalesce(params_other.formula); Ok(params_self.into()) } fn min(self, other: Self) -> Result { - let mut params_self: FormulaParams = self.into(); - let params_other: FormulaParams = other.into(); - - if params_self.metric != params_other.metric { - return Err(Error::invalid_metric(format!( - "Cannot take min of formulas with different metrics: {} and {}", - params_self.metric.as_str_name(), - params_other.metric.as_str_name() - ))); - } + let mut params_self: FormulaParams = self.into(); + let params_other: FormulaParams = other.into(); + params_self.formula = params_self.formula.min(params_other.formula); Ok(params_self.into()) } fn max(self, other: Self) -> Result { - let mut params_self: FormulaParams = self.into(); - let params_other: FormulaParams = other.into(); - - if params_self.metric != params_other.metric { - return Err(Error::invalid_metric(format!( - "Cannot take max of formulas with different metrics: {} and {}", - params_self.metric.as_str_name(), - params_other.metric.as_str_name() - ))); - } + let mut params_self: FormulaParams = self.into(); + let params_other: FormulaParams = other.into(); + params_self.formula = params_self.formula.max(params_other.formula); Ok(params_self.into()) } - fn subscribe(&self) -> impl Future, Error>> + Send { + fn subscribe( + &self, + ) -> impl Future>, Error>> + Send + { ::subscribe(self) } } diff --git a/src/logical_meter/formula/aggregation_formula.rs b/src/logical_meter/formula/aggregation_formula.rs index 8d145b8..dd7d7c6 100644 --- a/src/logical_meter/formula/aggregation_formula.rs +++ b/src/logical_meter/formula/aggregation_formula.rs @@ -5,36 +5,40 @@ use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider}; use crate::{ - Error, Sample, logical_meter::logical_meter_actor, proto::common::v1alpha8::metrics::Metric, + Error, Sample, logical_meter::logical_meter_actor, metric::Metric, quantity::Quantity, }; use tokio::sync::{broadcast, mpsc, oneshot}; #[derive(Clone)] -pub struct AggregationFormula { +pub struct AggregationFormula { formula: frequenz_microgrid_component_graph::AggregationFormula, - metric: Metric, + metric: M, instructions_tx: mpsc::Sender, } -impl std::fmt::Display for AggregationFormula { +impl std::fmt::Display for AggregationFormula { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.formula.fmt(f) } } -impl GraphFormulaProvider for AggregationFormula { +impl GraphFormulaProvider for AggregationFormula { type GraphFormulaType = frequenz_microgrid_component_graph::AggregationFormula; } -impl FormulaSubscriber for AggregationFormula { - async fn subscribe(&self) -> Result, Error> { +impl + Sync> FormulaSubscriber + for AggregationFormula +{ + type MetricType = M; + + async fn subscribe(&self) -> Result>, Error> { let (tx, rx) = oneshot::channel(); self.instructions_tx .send(logical_meter_actor::Instruction::SubscribeFormula { formula: self.formula.to_string(), - metric: self.metric, - response_tx: tx, + metric: M::METRIC, + response_tx: tx.try_into()?, }) .await .map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?; @@ -46,8 +50,8 @@ impl FormulaSubscriber for AggregationFormula { } } -impl From> for AggregationFormula { - fn from(params: FormulaParams) -> Self { +impl From, M>> for AggregationFormula { + fn from(params: FormulaParams, M>) -> Self { Self { formula: params.formula, metric: params.metric, @@ -56,8 +60,8 @@ impl From> for AggregationFormula { } } -impl From for FormulaParams { - fn from(formula: AggregationFormula) -> Self { +impl From> for FormulaParams, M> { + fn from(formula: AggregationFormula) -> Self { FormulaParams { formula: formula.formula, metric: formula.metric, @@ -66,40 +70,28 @@ impl From for FormulaParams { } } -impl std::ops::Add for AggregationFormula { +impl std::ops::Add for AggregationFormula { type Output = Result; fn add(self, other: Self) -> Self::Output { - if self.metric != other.metric { - return Err(Error::invalid_metric(format!( - "Cannot add formulas with different metrics: {} and {}", - self.metric as isize, other.metric as isize - ))); - } let new_formula = self.formula + other.formula; Ok(FormulaParams::new(new_formula, self.metric, self.instructions_tx).into()) } } -impl std::ops::Sub for AggregationFormula { +impl std::ops::Sub for AggregationFormula { type Output = Result; fn sub(self, other: Self) -> Self::Output { - if self.metric != other.metric { - return Err(Error::invalid_metric(format!( - "Cannot subtract formulas with different metrics: {} and {}", - self.metric as isize, other.metric as isize - ))); - } let new_formula = self.formula - other.formula; Ok(FormulaParams::new(new_formula, self.metric, self.instructions_tx).into()) } } -impl std::ops::Add for Result { - type Output = Result; +impl std::ops::Add> for Result, Error> { + type Output = Result, Error>; - fn add(self, other: AggregationFormula) -> Self::Output { + fn add(self, other: AggregationFormula) -> Self::Output { match self { Ok(left) => left + other, Err(e) => Err(e), @@ -107,10 +99,10 @@ impl std::ops::Add for Result { } } -impl std::ops::Sub for Result { - type Output = Result; +impl std::ops::Sub> for Result, Error> { + type Output = Result, Error>; - fn sub(self, other: AggregationFormula) -> Self::Output { + fn sub(self, other: AggregationFormula) -> Self::Output { match self { Ok(left) => left - other, Err(e) => Err(e), @@ -118,10 +110,10 @@ impl std::ops::Sub for Result { } } -impl std::ops::Add> for AggregationFormula { - type Output = Result; +impl std::ops::Add, Error>> for AggregationFormula { + type Output = Result, Error>; - fn add(self, other: Result) -> Self::Output { + fn add(self, other: Result, Error>) -> Self::Output { match other { Ok(right) => self + right, Err(e) => Err(e), @@ -129,10 +121,10 @@ impl std::ops::Add> for AggregationFormula { } } -impl std::ops::Sub> for AggregationFormula { - type Output = Result; +impl std::ops::Sub, Error>> for AggregationFormula { + type Output = Result, Error>; - fn sub(self, other: Result) -> Self::Output { + fn sub(self, other: Result, Error>) -> Self::Output { match other { Ok(right) => self - right, Err(e) => Err(e), diff --git a/src/logical_meter/formula/coalesce_formula.rs b/src/logical_meter/formula/coalesce_formula.rs index c52f1dc..59492ca 100644 --- a/src/logical_meter/formula/coalesce_formula.rs +++ b/src/logical_meter/formula/coalesce_formula.rs @@ -5,36 +5,40 @@ use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider}; use crate::{ - Error, Sample, logical_meter::logical_meter_actor, proto::common::v1alpha8::metrics::Metric, + Error, Sample, logical_meter::logical_meter_actor, metric::Metric, quantity::Quantity, }; use tokio::sync::{broadcast, mpsc, oneshot}; #[derive(Clone)] -pub struct CoalesceFormula { +pub struct CoalesceFormula { formula: frequenz_microgrid_component_graph::CoalesceFormula, - metric: Metric, + metric: M, instructions_tx: mpsc::Sender, } -impl std::fmt::Display for CoalesceFormula { +impl std::fmt::Display for CoalesceFormula { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.formula.fmt(f) } } -impl GraphFormulaProvider for CoalesceFormula { +impl GraphFormulaProvider for CoalesceFormula { type GraphFormulaType = frequenz_microgrid_component_graph::CoalesceFormula; } -impl FormulaSubscriber for CoalesceFormula { - async fn subscribe(&self) -> Result, Error> { +impl + Sync> FormulaSubscriber + for CoalesceFormula +{ + type MetricType = M; + + async fn subscribe(&self) -> Result>, Error> { let (tx, rx) = oneshot::channel(); self.instructions_tx .send(logical_meter_actor::Instruction::SubscribeFormula { formula: self.formula.to_string(), - metric: self.metric, - response_tx: tx, + metric: M::METRIC, + response_tx: tx.try_into()?, }) .await .map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?; @@ -46,8 +50,8 @@ impl FormulaSubscriber for CoalesceFormula { } } -impl From> for CoalesceFormula { - fn from(params: FormulaParams) -> Self { +impl From, M>> for CoalesceFormula { + fn from(params: FormulaParams, M>) -> Self { Self { formula: params.formula, metric: params.metric, @@ -56,8 +60,8 @@ impl From> for CoalesceFormula { } } -impl From for FormulaParams { - fn from(formula: CoalesceFormula) -> Self { +impl From> for FormulaParams, M> { + fn from(formula: CoalesceFormula) -> Self { FormulaParams { formula: formula.formula, metric: formula.metric, diff --git a/src/logical_meter/formula/graph_formula_provider.rs b/src/logical_meter/formula/graph_formula_provider.rs index 0d816a7..6828406 100644 --- a/src/logical_meter/formula/graph_formula_provider.rs +++ b/src/logical_meter/formula/graph_formula_provider.rs @@ -6,6 +6,7 @@ use crate::Error; use crate::logical_meter::formula::FormulaParams; use crate::logical_meter::logical_meter_actor; +use crate::metric::Metric; use crate::proto::common::v1alpha8::microgrid::electrical_components::{ ElectricalComponent, ElectricalComponentConnection, }; @@ -19,9 +20,9 @@ use super::{AggregationFormula, CoalesceFormula}; macro_rules! graph_formula_provider { ($(($fnname:ident $(, ids:$idsparam:ident)? $(, id:$idparam:ident)?)),+ $(,)?) => {$( - fn $fnname( + fn $fnname( _graph: &ComponentGraph, - _metric: M, + _metric: Self::MetricType, _instructions_tx: mpsc::Sender, $($idsparam: Option>,)? $($idparam: u64,)? @@ -45,6 +46,8 @@ macro_rules! graph_formula_provider { /// `CoalesceFormula`s for each of these metrics. This trait provides a /// way to generalize them. pub trait GraphFormulaProvider: Sized { + type MetricType: Metric; + graph_formula_provider!( (grid), (consumer), @@ -65,9 +68,9 @@ macro_rules! impl_graph_formula_provider { $(, id:$idparam:ident)? )),+ $(,)?) => {$( - fn $fnname( + fn $fnname( graph: &ComponentGraph, - _metric: M, + _metric: Self::MetricType, instructions_tx: mpsc::Sender, $($idsparam: Option>,)? $($idparam: u64,)? @@ -77,13 +80,15 @@ macro_rules! impl_graph_formula_provider { format!("Could not get {} formula: {e}", stringify!($fnname)) ) })?; - Ok(FormulaParams::new(formula, M::METRIC, instructions_tx).into()) + Ok(FormulaParams::new(formula, _metric, instructions_tx).into()) } )+}; } -impl GraphFormulaProvider for AggregationFormula { +impl GraphFormulaProvider for AggregationFormula { + type MetricType = M; + impl_graph_formula_provider!( (grid, grid_formula), (consumer, consumer_formula), @@ -96,7 +101,9 @@ impl GraphFormulaProvider for AggregationFormula { ); } -impl GraphFormulaProvider for CoalesceFormula { +impl GraphFormulaProvider for CoalesceFormula { + type MetricType = M; + impl_graph_formula_provider!( (grid, grid_coalesce_formula), (battery, battery_ac_coalesce_formula, ids: battery_ids), diff --git a/src/logical_meter/logical_meter_actor.rs b/src/logical_meter/logical_meter_actor.rs index 60bcdd8..496aa36 100644 --- a/src/logical_meter/logical_meter_actor.rs +++ b/src/logical_meter/logical_meter_actor.rs @@ -12,7 +12,9 @@ use std::collections::{HashMap, HashSet}; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{MissedTickBehavior, interval}; +use crate::ErrorKind; use crate::proto::common::v1alpha8::metrics::{Metric, metric_value_variant::MetricValueVariant}; +use crate::quantity::{Current, Power, Quantity, ReactivePower, Voltage}; use crate::{ Error, MicrogridClientHandle, Sample, proto::common::v1alpha8::microgrid::electrical_components::ElectricalComponentTelemetry, @@ -20,23 +22,71 @@ use crate::{ use super::config::LogicalMeterConfig; -struct LogicalMeterFormula { +struct LogicalMeterFormula { formula: FormulaEngine, - sender: broadcast::Sender, + sender: broadcast::Sender>, } struct ComponentDataResampler { component_id: u64, metric: Metric, - resampler: frequenz_resampling::Resampler, + resampler: frequenz_resampling::Resampler>, receiver: broadcast::Receiver, } +/// Used to send strongly-typed formula streams from the LogicalMeterActor back +/// to the Handle. +pub(crate) enum TypedFormulaResponseSender { + Power(oneshot::Sender>>), + Voltage(oneshot::Sender>>), + ReactivePower(oneshot::Sender>>), + Current(oneshot::Sender>>), +} + +impl TryFrom>>> + for TypedFormulaResponseSender +{ + type Error = Error; + + fn try_from( + sender: oneshot::Sender>>, + ) -> Result { + let sender: Box = Box::new(sender); + + let sender = match sender.downcast::>>>() + { + Ok(sender) => return Ok(TypedFormulaResponseSender::Power(*sender)), + Err(sender) => sender, + }; + + let sender = + match sender.downcast::>>>() { + Ok(sender) => return Ok(TypedFormulaResponseSender::Voltage(*sender)), + Err(sender) => sender, + }; + + let sender = match sender + .downcast::>>>() + { + Ok(sender) => return Ok(TypedFormulaResponseSender::ReactivePower(*sender)), + Err(sender) => sender, + }; + + match sender.downcast::>>>() { + Ok(sender) => Ok(TypedFormulaResponseSender::Current(*sender)), + _ => Err(Error::internal(format!( + "Can't create TypedFormulaResponseSender for `{}`", + std::any::type_name::() + ))), + } + } +} + pub(crate) enum Instruction { SubscribeFormula { formula: String, metric: Metric, - response_tx: oneshot::Sender>, + response_tx: TypedFormulaResponseSender, }, } @@ -44,8 +94,154 @@ pub(super) struct LogicalMeterActor { instructions_rx: mpsc::Receiver, client: MicrogridClientHandle, config: LogicalMeterConfig, - next_ts: DateTime, - timer: tokio::time::Interval, + resampler_ts: DateTime, + resampler_timer: tokio::time::Interval, +} + +/// Holds all active formulas, grouped by quantity type. +#[derive(Default)] +struct Formulas { + power: HashMap<(String, Metric), LogicalMeterFormula>, + voltage: HashMap<(String, Metric), LogicalMeterFormula>, + reactive_power: HashMap<(String, Metric), LogicalMeterFormula>, + current: HashMap<(String, Metric), LogicalMeterFormula>, +} + +impl Formulas { + /// Checks if a formula with the given key exists. + fn contains_key(&self, key: &(String, Metric)) -> bool { + self.power.contains_key(key) + || self.voltage.contains_key(key) + || self.reactive_power.contains_key(key) + || self.current.contains_key(key) + } + + /// Sends an existing subscription receiver for the formula with the given key. + fn send_subscription( + &self, + key: &(String, Metric), + receiver_tx: TypedFormulaResponseSender, + ) -> Result<(), Error> { + match receiver_tx { + TypedFormulaResponseSender::Power(sender) => { + if self.power.contains_key(key) { + sender + .send(self.power[key].sender.subscribe()) + .map_err(|_| { + Error::internal("Failed to send receiver for formula".to_string()) + })?; + return Ok(()); + } + } + TypedFormulaResponseSender::Voltage(sender) => { + if self.voltage.contains_key(key) { + sender + .send(self.voltage[key].sender.subscribe()) + .map_err(|_| { + Error::internal("Failed to send receiver for formula".to_string()) + })?; + return Ok(()); + } + } + TypedFormulaResponseSender::ReactivePower(sender) => { + if self.reactive_power.contains_key(key) { + sender + .send(self.reactive_power[key].sender.subscribe()) + .map_err(|_| { + Error::internal("Failed to send receiver for formula".to_string()) + })?; + return Ok(()); + } + } + TypedFormulaResponseSender::Current(sender) => { + if self.current.contains_key(key) { + sender + .send(self.current[key].sender.subscribe()) + .map_err(|_| { + Error::internal("Failed to send receiver for formula".to_string()) + })?; + return Ok(()); + } + } + } + Err(Error::internal(format!( + "Formula exists, but can't find it: {}:({})", + key.1.as_str_name(), + key.0 + ))) + } + + /// Starts a new formula with the given formula string, metric, and sends a receiver + /// back to the handle. + fn start_formulas( + &mut self, + formula: String, + metric: Metric, + response_tx: TypedFormulaResponseSender, + ) -> Result, Error> { + let formula_key = (formula, metric); + + let formula_engine = FormulaEngine::try_new(&formula_key.0) + .map_err(|e| Error::formula_engine_error(format!("Failed to parse formula: {e}")))?; + let components = formula_engine.components().clone(); + + match response_tx { + TypedFormulaResponseSender::Power(receiver_tx) => { + let (sender, receiver) = broadcast::channel(100); + self.power.insert( + formula_key, + LogicalMeterFormula { + formula: formula_engine, + sender, + }, + ); + receiver_tx.send(receiver).map_err(|_| { + Error::internal("Failed to send receiver for formula".to_string()) + })?; + } + TypedFormulaResponseSender::Voltage(receiver_tx) => { + let (sender, receiver) = broadcast::channel(100); + self.voltage.insert( + formula_key, + LogicalMeterFormula { + formula: formula_engine, + sender, + }, + ); + receiver_tx.send(receiver).map_err(|_| { + Error::internal("Failed to send receiver for formula".to_string()) + })?; + } + TypedFormulaResponseSender::ReactivePower(receiver_tx) => { + let (sender, receiver) = broadcast::channel(100); + self.reactive_power.insert( + formula_key, + LogicalMeterFormula { + formula: formula_engine, + sender, + }, + ); + receiver_tx.send(receiver).map_err(|_| { + Error::internal("Failed to send receiver for formula".to_string()) + })?; + } + TypedFormulaResponseSender::Current(receiver_tx) => { + let (sender, receiver) = broadcast::channel(100); + self.current.insert( + formula_key, + LogicalMeterFormula { + formula: formula_engine, + sender, + }, + ); + receiver_tx.send(receiver).map_err(|_| { + Error::internal("Failed to send receiver for formula".to_string()) + })?; + } + } + + Ok(components) + } } /// Returns the next timestamp aligned to the epoch based on the given interval. @@ -58,8 +254,7 @@ pub(crate) fn epoch_align(timestamp: DateTime, interval: TimeDelta) -> Opti let aligned_timestamp = DateTime::from_timestamp_millis(aligned_millis_since_epoch)?; - let next_aligned_ts = aligned_timestamp + interval; - Some(next_aligned_ts) + Some(aligned_timestamp) } impl LogicalMeterActor { @@ -69,7 +264,7 @@ impl LogicalMeterActor { config: LogicalMeterConfig, ) -> Result { let now = Utc::now(); - let next_ts = epoch_align(now, config.resampling_interval).ok_or_else(|| { + let last_aligned_ts = epoch_align(now, config.resampling_interval).ok_or_else(|| { Error::chrono_error("Failed to align current time to the epoch".to_string()) })?; let mut timer = @@ -80,7 +275,7 @@ impl LogicalMeterActor { // The next tick should be at the next aligned timestamp. timer.reset_after( - (next_ts - now) + (last_aligned_ts + config.resampling_interval - now) .to_std() .map_err(|e| Error::chrono_error(format!("Failed to calculate time delta: {e}")))?, ); @@ -89,20 +284,49 @@ impl LogicalMeterActor { instructions_rx, client, config, - next_ts, - timer, + resampler_ts: last_aligned_ts, + resampler_timer: timer, }) } pub async fn run(mut self) { let mut resamplers: HashMap<(u64, Metric), ComponentDataResampler> = HashMap::new(); - let mut formulas: HashMap<(String, Metric), LogicalMeterFormula> = HashMap::new(); + let mut formulas = Formulas::default(); loop { tokio::select! { - _ = self.timer.tick() => { - if let Err(err) = self.do_next(&mut resamplers, &mut formulas) { - tracing::error!("Error resampling: {}", err); + _ = self.resampler_timer.tick() => { + self.resampler_ts += self.config.resampling_interval; + + let mut resampled = match self.resample_metrics(&mut resamplers) { + Ok(resampled) => resampled, + Err(err) => { + tracing::error!("Error resampling metrics: {}", err); + continue; + } + }; + if let Some(err) = { + self.evaluate_formulas( + &mut resampled, &mut formulas.power, Power::from_watts + ) + .err() + .or(self.evaluate_formulas( + &mut resampled, &mut formulas.voltage, Voltage::from_volts + ).err()) + .or(self.evaluate_formulas( + &mut resampled, &mut formulas.current, Current::from_amperes + ).err()) + .or(self.evaluate_formulas( + &mut resampled, + &mut formulas.reactive_power, + ReactivePower::from_volt_amperes_reactive + ).err()) + } { + if err.kind() == ErrorKind::DroppedUnusedFormulas { + self.cleanup_resamplers(&formulas, &mut resamplers); + } else { + tracing::error!("Error evaluating formulas: {}", err); + } }; } instruction = self.instructions_rx.recv() => { @@ -133,39 +357,15 @@ impl LogicalMeterActor { } } - /// Handles SubscribeFormula instructions. - /// - /// If the formula already exists, it sends the existing receiver to the - /// `response_tx`. - /// - /// If the formula does not exist, it creates a new `LogicalMeterFormula` with - /// the given formula, metric, and a new broadcast channel. - /// - /// It also initializes the necessary `ComponentDataResampler` for each component - /// in the formula, if it does not already exist. - async fn handle_subscribe_formula( + async fn start_resamplers( &mut self, - formula: String, + components: &HashSet, metric: Metric, - receiver_tx: oneshot::Sender>, - formulas: &mut HashMap<(String, Metric), LogicalMeterFormula>, resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>, ) -> Result<(), Error> { - let formula_key = (formula, metric); - if formulas.contains_key(&formula_key) { - receiver_tx - .send(formulas[&formula_key].sender.subscribe()) - .map_err(|_| Error::internal("Failed to send receiver for formula".to_string()))?; - return Ok(()); - } - - let formula_engine = FormulaEngine::try_new(&formula_key.0) - .map_err(|e| Error::formula_engine_error(format!("Failed to parse formula: {e}")))?; - let (sender, receiver) = broadcast::channel(8); - - for component_id in formula_engine.components() { - let resampler_key = (*component_id, metric); - if resamplers.contains_key(&resampler_key) { + for component_id in components { + let resampler_key = &(*component_id, metric); + if resamplers.contains_key(resampler_key) { continue; } let resampler = ComponentDataResampler { @@ -185,48 +385,45 @@ impl LogicalMeterActor { .receive_electrical_component_telemetry_stream(*component_id) .await?, }; - resamplers.insert(resampler_key, resampler); + resamplers.insert(*resampler_key, resampler); } - - formulas.insert( - formula_key, - LogicalMeterFormula { - formula: formula_engine, - sender, - }, - ); - receiver_tx - .send(receiver) - .map_err(|_| Error::internal("Failed to send receiver for formula".to_string()))?; - Ok(()) } - /// Resamples component data and evaluates formulas for the next timestamp. - fn do_next( + /// Handles SubscribeFormula instructions. + /// + /// If the formula already exists, it sends the existing receiver to the + /// `response_tx`. + /// + /// If the formula does not exist, it creates a new `LogicalMeterFormula` with + /// the given formula, metric, and a new broadcast channel. + /// + /// It also initializes the necessary `ComponentDataResampler` for each component + /// in the formula, if it does not already exist. + async fn handle_subscribe_formula( &mut self, + formula: String, + metric: Metric, + receiver_tx: TypedFormulaResponseSender, + all_formulas: &mut Formulas, resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>, - formulas: &mut HashMap<(String, Metric), LogicalMeterFormula>, ) -> Result<(), Error> { - let mut resampled_metrics: HashMap>> = HashMap::new(); - - for (_, resampler) in resamplers.iter_mut() { - while let Ok(data) = resampler.receiver.try_recv() { - self.push_to_resampler(resampler, data, resampler.metric); - } - let resampled = resampler.resampler.resample(self.next_ts); - if resampled.len() != 1 { - return Err(Error::connection_failure(format!( - "Resampling produced {} values", - resampled.len() - ))); - } - resampled_metrics - .entry(resampler.metric) - .or_default() - .insert(resampler.component_id, resampled[0].clone().value()); + let formula_key = (formula.clone(), metric); + if all_formulas.contains_key(&formula_key) { + all_formulas.send_subscription(&formula_key, receiver_tx) + } else { + let components = all_formulas.start_formulas(formula, metric, receiver_tx)?; + self.start_resamplers(&components, metric, resamplers).await } + } + /// Resamples component data and evaluates formulas for the next timestamp. + fn evaluate_formulas( + &mut self, + resampled_metrics: &mut HashMap>>, + formulas: &mut HashMap<(String, Metric), LogicalMeterFormula>, + transform: impl Fn(f32) -> Q, + ) -> Result<(), Error> { let mut formulas_to_drop = vec![]; for (formula_key, formula) in formulas.iter_mut() { let result = formula @@ -236,7 +433,10 @@ impl LogicalMeterActor { Error::formula_engine_error(format!("Failed to evaluate formula: {e}")) })?; - if let Err(e) = formula.sender.send(Sample::new(self.next_ts, result)) { + if let Err(e) = formula + .sender + .send(Sample::new(self.resampler_ts, result.map(&transform))) + { tracing::debug!( "No remaining subscribers for formula: {}:({}). Err: {e}", formula_key.1.as_str_name(), @@ -257,28 +457,72 @@ impl LogicalMeterActor { } } if !formulas_to_drop.is_empty() { - let mut components = HashSet::<(u64, Metric)>::new(); - for ((_, metric), formula) in formulas.iter() { - components.extend(formula.formula.components().iter().map(|&id| (id, *metric))); - } - resamplers.retain(|component_id, _| { - if components.contains(component_id) { - true - } else { - tracing::debug!( - "Dropping resampler for component {}:{}", - component_id.0, - component_id.1.as_str_name() - ); - false - } - }); + return Err(Error::dropped_unused_formulas("Dropped unused formulas")); } - self.next_ts += self.config.resampling_interval; Ok(()) } + /// Resamples component telemetry + fn resample_metrics( + &mut self, + resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>, + ) -> Result>>, Error> { + let mut resampled_metrics: HashMap>> = HashMap::new(); + + for (_, resampler) in resamplers.iter_mut() { + while let Ok(data) = resampler.receiver.try_recv() { + self.push_to_resampler(resampler, data, resampler.metric); + } + let resampled = resampler.resampler.resample(self.resampler_ts); + if resampled.len() != 1 { + return Err(Error::connection_failure(format!( + "Resampling produced {} values", + resampled.len() + ))); + } + resampled_metrics + .entry(resampler.metric) + .or_default() + .insert(resampler.component_id, resampled[0].clone().value()); + } + + Ok(resampled_metrics) + } + + /// Cleans up resamplers that are no longer needed by any formula. + fn cleanup_resamplers( + &mut self, + formulas: &Formulas, + resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>, + ) { + let mut components = HashSet::<(u64, Metric)>::new(); + for ((_, metric), formula) in formulas.power.iter() { + components.extend(formula.formula.components().iter().map(|&id| (id, *metric))); + } + for ((_, metric), formula) in formulas.voltage.iter() { + components.extend(formula.formula.components().iter().map(|&id| (id, *metric))); + } + for ((_, metric), formula) in formulas.reactive_power.iter() { + components.extend(formula.formula.components().iter().map(|&id| (id, *metric))); + } + for ((_, metric), formula) in formulas.current.iter() { + components.extend(formula.formula.components().iter().map(|&id| (id, *metric))); + } + resamplers.retain(|component_id, _| { + if components.contains(component_id) { + true + } else { + tracing::debug!( + "Dropping resampler for component {}:{}", + component_id.0, + component_id.1.as_str_name() + ); + false + } + }); + } + /// Extracts the given metric from the given ComponentData and pushes it to /// the resampler's internal buffer. fn push_to_resampler( @@ -292,7 +536,7 @@ impl LogicalMeterActor { .iter() .find(|s| s.metric == metric as i32) else { - tracing::warn!( + tracing::debug!( "No data for metric {:?} in component {}", metric, resampler.component_id diff --git a/src/logical_meter/metric.rs b/src/logical_meter/metric.rs index c42c50b..d519c2d 100644 --- a/src/logical_meter/metric.rs +++ b/src/logical_meter/metric.rs @@ -8,13 +8,20 @@ use crate::proto::common::v1alpha8::metrics::Metric as MetricPb; use super::formula; pub trait Metric: std::fmt::Display + std::fmt::Debug + Clone + Copy + PartialEq + Eq { - type FormulaType: formula::Formula + formula::graph_formula_provider::GraphFormulaProvider; + type FormulaType: formula::Formula + + formula::graph_formula_provider::GraphFormulaProvider; + + type QuantityType: crate::quantity::Quantity; const METRIC: MetricPb; } macro_rules! define_metric { - ($({name: $metric_name:ident, formula: $formula:ident}),+ $(,)?) => { + ($({ + name: $metric_name:ident, + formula: $formula:ident, + quantity: $quantity:ident + }),+ $(,)?) => { $( // Define a metric #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -22,7 +29,8 @@ macro_rules! define_metric { // Implement the AcMetric trait for the metric impl Metric for $metric_name { - type FormulaType = formula::$formula; + type FormulaType = formula::$formula<$metric_name>; + type QuantityType = crate::quantity::$quantity; const METRIC: MetricPb = MetricPb::$metric_name; } @@ -38,20 +46,20 @@ macro_rules! define_metric { } define_metric! { - {name: AcPowerActive, formula: AggregationFormula}, - {name: AcPowerReactive, formula: AggregationFormula}, - {name: AcCurrent, formula: AggregationFormula}, - {name: AcCurrentPhase1, formula: AggregationFormula}, - {name: AcCurrentPhase2, formula: AggregationFormula}, - {name: AcCurrentPhase3, formula: AggregationFormula}, - - {name: AcVoltage, formula: CoalesceFormula}, - {name: AcVoltagePhase1N, formula: CoalesceFormula}, - {name: AcVoltagePhase2N, formula: CoalesceFormula}, - {name: AcVoltagePhase3N, formula: CoalesceFormula}, - {name: AcVoltagePhase1Phase2, formula: CoalesceFormula}, - {name: AcVoltagePhase2Phase3, formula: CoalesceFormula}, - {name: AcVoltagePhase3Phase1, formula: CoalesceFormula}, - - {name: AcFrequency, formula: CoalesceFormula}, + { name: AcPowerActive, formula: AggregationFormula, quantity: Power }, + { name: AcPowerReactive, formula: AggregationFormula, quantity: ReactivePower }, + { name: AcCurrent, formula: AggregationFormula, quantity: Current }, + { name: AcCurrentPhase1, formula: AggregationFormula, quantity: Current }, + { name: AcCurrentPhase2, formula: AggregationFormula, quantity: Current }, + { name: AcCurrentPhase3, formula: AggregationFormula, quantity: Current }, + + { name: AcVoltage, formula: CoalesceFormula, quantity: Voltage }, + { name: AcVoltagePhase1N, formula: CoalesceFormula, quantity: Voltage }, + { name: AcVoltagePhase2N, formula: CoalesceFormula, quantity: Voltage }, + { name: AcVoltagePhase3N, formula: CoalesceFormula, quantity: Voltage }, + { name: AcVoltagePhase1Phase2, formula: CoalesceFormula, quantity: Voltage }, + { name: AcVoltagePhase2Phase3, formula: CoalesceFormula, quantity: Voltage }, + { name: AcVoltagePhase3Phase1, formula: CoalesceFormula, quantity: Voltage }, + + { name: AcFrequency, formula: CoalesceFormula, quantity: Frequency }, } diff --git a/src/quantity.rs b/src/quantity.rs new file mode 100644 index 0000000..1368aa0 --- /dev/null +++ b/src/quantity.rs @@ -0,0 +1,230 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! This module defines various physical quantities and their operations. + +/// A trait for physical quantities that supports basic arithmetic operations. +pub trait Quantity: + std::ops::Add + + std::ops::Sub + + std::ops::Mul + + std::ops::Mul + + std::ops::Div + + std::ops::Div + + std::cmp::PartialOrd + + std::fmt::Display + + Copy + + Clone + + std::fmt::Debug + + Default + + Sized + + Send + + Sync +{ + fn zero() -> Self { + Self::default() + } +} + +impl std::ops::Mul for f32 { + type Output = f32; + + fn mul(self, other: Percentage) -> Self::Output { + self * other.as_fraction() + } +} + +impl Quantity for f32 {} + +/// Formats an f32 with a given precision and removes trailing zeros +fn format_float(value: f32, precision: usize) -> String { + let mut s = format!("{:.1$}", value, precision); + if s.contains('.') { + s = s.trim_end_matches('0').to_string(); + } + if s.ends_with('.') { + s.pop(); + } + s +} + +macro_rules! qty_format { + (@impl $self:ident, $f:ident, $prec:ident, + ($ctor:ident, $getter:ident, $unit:literal, $exp:literal), + ) => { + write!($f, "{} {}", format_float( $self.$getter(), $prec), $unit) + }; + + (@impl $self:ident, $f:ident, $prec:ident, + ($ctor1:ident, $getter1:ident, $unit1:literal, $exp1:literal), + ($ctor2:ident, $getter2:ident, $unit2:literal, $exp2:literal), $($rest:tt)* + ) => {{ + const {assert!($exp1 < $exp2, "Units must be in increasing order of magnitude.")}; + + if $exp1 <= $self.value.abs() && $self.value.abs() < $exp2 { + write!($f, "{} {}", format_float( $self.$getter1(), $prec), $unit1) + } else { + qty_format!(@impl $self, $f, $prec, ($ctor2, $getter2, $unit2, $exp2), $($rest)*) + }} + }; + + (@impl $self:ident, $f:ident, $prec:ident, + ($ctor1:ident, $getter1:ident, $unit1:literal, $exp1:literal), + ($ctor2:ident, $getter2:ident, None, $exp2:literal), + ) => { + write!($f, "{} {}", format_float( $self.$getter1(), $prec), $unit1) + }; + + (@start $self:ident, $f:ident, $prec:ident, + ($ctor:ident, $getter:ident, $unit:literal, $exp:literal), $($rest:tt)* + ) => { + if $self.value.abs() <= $exp { + write!($f, "{} {}", format_float( $self.$getter(), $prec), $unit) + } else { + qty_format!(@impl $self, $f, $prec, ($ctor, $getter, $unit, $exp), $($rest)*) + } + }; + + ($typename:ident => {$($rest:tt)*}) => { + use super::format_float; + impl std::fmt::Display for $typename { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let prec = if let Some(prec) = f.precision() { + prec + } else { + 3 + }; + qty_format!(@start self, f, prec, $($rest)*) + } + + } + }; +} + +macro_rules! qty_ctor { + (@impl ($ctor:ident, $getter:ident, $unit:tt, $exp:literal) $(,)?) => { + pub fn $ctor(value: f32) -> Self { + Self { value: value * $exp } + } + pub fn $getter(&self) -> f32 { + self.value / $exp + } + }; + (@impl ($ctor:ident, $getter:ident, $unit:tt, $exp:literal), $($rest:tt)*) => { + qty_ctor!(@impl ($ctor, $getter, $unit, $exp)); + qty_ctor!(@impl $($rest)*); + }; + (@impl_arith_ops $typename:ident) => { + impl std::ops::Add for $typename { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + Self { + value: self.value + rhs.value, + } + } + } + + impl std::ops::Sub for $typename { + type Output = Self; + + fn sub(self, rhs: Self) -> Self::Output { + Self { + value: self.value - rhs.value, + } + } + } + + impl std::ops::Mul for $typename { + type Output = Self; + + fn mul(self, other: super::Percentage) -> Self::Output { + Self { + value: self.value * other.as_fraction(), + } + } + } + + impl std::ops::Mul for $typename { + type Output = Self; + + fn mul(self, other: f32) -> Self::Output { + Self { + value: self.value * other, + } + } + } + + impl std::ops::Div for $typename { + type Output = Self; + + fn div(self, other: f32) -> Self::Output { + Self { + value: self.value / other, + } + } + } + + impl std::ops::Div<$typename> for $typename { + type Output = f32; + + fn div(self, other: Self) -> Self::Output { + self.value / other.value + } + } + + impl std::cmp::PartialOrd for $typename { + fn partial_cmp(&self, other: &Self) -> Option { + self.value.partial_cmp(&other.value) + } + } + + }; + (#[$meta:meta] $typename:ident => {$($rest:tt)*}) => { + #[$meta] + #[derive(Copy, Clone, Debug, Default, PartialEq)] + pub struct $typename { + value: f32, + } + + impl $typename { + qty_ctor!(@impl $($rest)*); + } + + qty_ctor!{@impl_arith_ops $typename} + qty_format!{$typename => {$($rest)*}} + + impl super::Quantity for $typename {} + }; +} + +mod current; +mod energy; +mod frequency; +mod percentage; +mod power; +mod reactive_power; +mod voltage; + +pub use current::Current; +pub use energy::Energy; +pub use frequency::Frequency; +pub use percentage::Percentage; +pub use power::Power; +pub use reactive_power::ReactivePower; +pub use voltage::Voltage; + +#[cfg(test)] +mod test_utils { + /// Asserts that two f32 values are approximately equal within a small epsilon. + #[track_caller] + pub(crate) fn assert_f32_eq(a: f32, b: f32) { + let epsilon: f32 = 10.0_f32.powf(a.log10().min(b.log10())) * 1e-6; + if (a - b).abs() > epsilon { + panic!( + "assertion failed: `(left ~= right)` (epsilon: {})\n left: `{}`,\n right: `{}`", + epsilon, a, b + ); + } + } +} diff --git a/src/quantity/current.rs b/src/quantity/current.rs new file mode 100644 index 0000000..cda4ad4 --- /dev/null +++ b/src/quantity/current.rs @@ -0,0 +1,89 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! This module defines the `Current` quantity and its operations. + +use super::{Power, Voltage}; + +qty_ctor! { + #[doc = "A physical quantity representing electric current."] + Current => { + (from_milliamperes, as_milliamperes, "mA", 1e-3), + (from_amperes, as_amperes, "A", 1e0), + } +} + +impl std::ops::Mul for Current { + type Output = Power; + + fn mul(self, voltage: Voltage) -> Self::Output { + Power::from_watts(self.as_amperes() * voltage.as_volts()) + } +} + +#[cfg(test)] +mod tests { + use crate::quantity::{Percentage, Quantity as _, Voltage, test_utils::assert_f32_eq}; + + use super::Current; + + #[test] + fn test_current() { + let current_1 = Current::from_milliamperes(1000.0); + let current_2 = Current::from_amperes(1.2); + assert_f32_eq(current_1.as_amperes(), 1.0); + assert_f32_eq(current_2.as_milliamperes(), 1200.0); + + assert!(current_1 < current_2); + assert!(current_2 > current_1); + + assert_f32_eq((current_1 + current_2).as_amperes(), 2.2); + assert_f32_eq((current_2 - current_1).as_amperes(), 0.2); + assert_f32_eq((current_2 * 2.0).as_amperes(), 2.4); + assert_f32_eq( + (current_2 * Percentage::from_percentage(50.0)).as_amperes(), + 0.6, + ); + assert_f32_eq((current_2 / 3.0).as_amperes(), 0.4); + assert_f32_eq(current_2 / current_1, 1.2); + + assert_f32_eq(Current::zero().as_amperes(), 0.0); + } + + #[test] + fn test_current_power_voltage() { + let current = Current::from_amperes(2.0); + let voltage = Voltage::from_volts(230.0); + let power = current * voltage; + assert_f32_eq(power.as_watts(), 460.0); + } + + #[test] + fn test_current_formatting() { + let s = |value| Current::from_amperes(value).to_string(); + let p = |value, prec| format!("{:.prec$}", Current::from_amperes(value), prec = prec); + assert_eq!(s(0.0), "0 mA"); + + assert_eq!(s(1.558), "1.558 A"); + assert_eq!(p(1.558, 1), "1.6 A"); + + assert_eq!(s(0.001558), "1.558 mA"); + assert_eq!(p(0.001558, 1), "1.6 mA"); + + assert_eq!(s(1.5508), "1.551 A"); + assert_eq!(p(1.5508, 5), "1.5508 A"); + + assert_eq!(s(0.0015508), "1.551 mA"); + assert_eq!(p(0.0015508, 5), "1.5508 mA"); + + assert_eq!(s(-1.558), "-1.558 A"); + assert_eq!(p(-1.558, 1), "-1.6 A"); + + assert_eq!(s(-0.001558), "-1.558 mA"); + assert_eq!(p(-0.001558, 1), "-1.6 mA"); + + assert_eq!(s(-2030.04487), "-2030.045 A"); + assert_eq!(p(-2030.04487, 1), "-2030 A"); + assert_eq!(p(-2030.04487, 2), "-2030.04 A"); + } +} diff --git a/src/quantity/energy.rs b/src/quantity/energy.rs new file mode 100644 index 0000000..1550163 --- /dev/null +++ b/src/quantity/energy.rs @@ -0,0 +1,129 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! This module defines the `Energy` quantity and its operations. + +use super::Power; + +qty_ctor! { + #[doc = "A physical quantity representing energy."] + Energy => { + (from_milliwatthours, as_milliwatthours, "mWh", 1e-3), + (from_watthours, as_watthours, "Wh", 1e0), + (from_kilowatthours, as_kilowatthours, "kWh", 1e3), + (from_megawatthours, as_megawatthours, "MWh", 1e6), + (from_gigawatthours, as_gigawatthours, "GWh", 1e9), + } +} + +impl std::ops::Div for Energy { + type Output = std::time::Duration; + + fn div(self, power: Power) -> Self::Output { + let seconds = (self.as_watthours() / power.as_watts()) * 3600.0; + std::time::Duration::from_secs_f32(seconds) + } +} + +impl std::ops::Div for Energy { + type Output = Power; + + fn div(self, duration: std::time::Duration) -> Self::Output { + Power::from_watts(self.as_watthours() * 3600.0 / duration.as_secs_f32()) + } +} + +#[cfg(test)] +mod tests { + use crate::quantity::{Percentage, Power, Quantity as _, test_utils::assert_f32_eq}; + + use super::Energy; + + #[test] + fn test_energy() { + let energy_1 = Energy::from_watthours(1000.0); + + assert_f32_eq(energy_1.as_milliwatthours(), 1_000_000.0); + assert_f32_eq(energy_1.as_watthours(), 1000.0); + assert_f32_eq(energy_1.as_kilowatthours(), 1.0); + assert_f32_eq(energy_1.as_megawatthours(), 0.001); + assert_f32_eq(energy_1.as_gigawatthours(), 0.000_001); + + let energy_2 = Energy::from_milliwatthours(1_200_000.0); + assert_f32_eq(energy_2.as_watthours(), 1200.0); + + let energy_2 = Energy::from_kilowatthours(1.2); + assert_f32_eq(energy_2.as_watthours(), 1200.0); + + let energy_2 = Energy::from_megawatthours(0.0012); + assert_f32_eq(energy_2.as_watthours(), 1200.0); + + let energy_2 = Energy::from_gigawatthours(0.000_001_2); + assert_f32_eq(energy_2.as_watthours(), 1200.0); + + assert!(energy_1 < energy_2); + assert!(energy_2 > energy_1); + + assert_f32_eq((energy_1 + energy_2).as_watthours(), 2200.0); + assert_f32_eq((energy_2 - energy_1).as_watthours(), 200.0); + assert_f32_eq((energy_2 * 2.0).as_watthours(), 2400.0); + assert_f32_eq( + (energy_2 * Percentage::from_percentage(50.0)).as_watthours(), + 600.0, + ); + assert_f32_eq((energy_2 / 3.0).as_watthours(), 400.0); + assert_f32_eq(energy_2 / energy_1, 1.2); + + assert_f32_eq(Energy::zero().as_watthours(), 0.0); + } + + #[test] + fn test_energy_power_duration() { + let energy = Energy::from_kilowatthours(1.0); + let power = Power::from_kilowatts(0.5); + + let duration = energy / power; + assert_f32_eq(duration.as_secs_f32(), 7200.0); + + let power_calculated = energy / duration; + assert_f32_eq(power_calculated.as_kilowatts(), 0.5); + } + + #[test] + fn test_energy_formatting() { + let s = |value| Energy::from_watthours(value).to_string(); + let p = |value, prec| format!("{:.prec$}", Energy::from_watthours(value), prec = prec); + + assert_eq!(s(0.0), "0 mWh"); + assert_eq!(s(1.558), "1.558 Wh"); + assert_eq!(p(1.558, 1), "1.6 Wh"); + + assert_eq!(s(0.001558), "1.558 mWh"); + assert_eq!(p(0.001558, 1), "1.6 mWh"); + + assert_eq!(s(1.5508), "1.551 Wh"); + assert_eq!(p(1.5508, 5), "1.5508 Wh"); + + assert_eq!(s(0.0015508), "1.551 mWh"); + assert_eq!(p(0.0015508, 5), "1.5508 mWh"); + + assert_eq!(s(1030.04487), "1.03 kWh"); + assert_eq!(p(1030.04487, 1), "1 kWh"); + + assert_eq!(s(2_030_022.0), "2.03 MWh"); + assert_eq!(s(2_030_022_123.0), "2.03 GWh"); + assert_eq!(p(2_030_022_123.0, 6), "2.030022 GWh"); + + assert_eq!(s(-1.558), "-1.558 Wh"); + assert_eq!(p(-1.558, 1), "-1.6 Wh"); + + assert_eq!(s(-1030.04487), "-1.03 kWh"); + assert_eq!(p(-1030.04487, 1), "-1 kWh"); + + assert_eq!(s(-2_030_022.0), "-2.03 MWh"); + assert_eq!(p(-2_030_022.0, 1), "-2 MWh"); + + assert_eq!(s(-2_030_022_123.0), "-2.03 GWh"); + assert_eq!(p(-2_030_022_123.0, 6), "-2.030022 GWh"); + } +} diff --git a/src/quantity/frequency.rs b/src/quantity/frequency.rs new file mode 100644 index 0000000..ef9e63a --- /dev/null +++ b/src/quantity/frequency.rs @@ -0,0 +1,83 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! This module defines the `Frequency` quantity and its operations. + +qty_ctor! { + #[doc = "A physical quantity representing frequency."] + Frequency => { + (from_hertz, as_hertz, "Hz", 1.0), + (from_kilohertz, as_kilohertz, "kHz", 1e3), + (from_megahertz, as_megahertz, "MHz", 1e6), + (from_gigahertz, as_gigahertz, "GHz", 1e9), + } +} + +#[cfg(test)] +mod tests { + use super::Frequency; + use crate::quantity::{Percentage, Quantity as _, test_utils::assert_f32_eq}; + + #[test] + fn test_frequency() { + let freq_1 = Frequency::from_hertz(1000.0); + + assert_f32_eq(freq_1.as_hertz(), 1000.0); + assert_f32_eq(freq_1.as_kilohertz(), 1.0); + assert_f32_eq(freq_1.as_megahertz(), 0.001); + assert_f32_eq(freq_1.as_gigahertz(), 0.000_001); + + let freq_2 = Frequency::from_kilohertz(1.2); + assert_f32_eq(freq_2.as_hertz(), 1200.0); + + let freq_2 = Frequency::from_megahertz(0.0012); + assert_f32_eq(freq_2.as_hertz(), 1200.0); + + let freq_2 = Frequency::from_gigahertz(0.000_0012); + assert_f32_eq(freq_2.as_hertz(), 1200.0); + + assert!(freq_1 < freq_2); + assert!(freq_2 > freq_1); + + assert_f32_eq((freq_1 + freq_2).as_hertz(), 2200.0); + assert_f32_eq((freq_2 - freq_1).as_hertz(), 200.0); + assert_f32_eq((freq_1 * 2.0).as_hertz(), 2000.0); + assert_f32_eq((freq_2 / 2.0).as_hertz(), 600.0); + assert_f32_eq( + (freq_2 * Percentage::from_percentage(50.0)).as_hertz(), + 600.0, + ); + assert_f32_eq(freq_2 / freq_1, 1.2); + + assert_f32_eq(Frequency::zero().as_hertz(), 0.0); + } + + #[test] + fn test_frequency_formatting() { + let s = |value| Frequency::from_hertz(value).to_string(); + let p = |value, prec| format!("{:.prec$}", Frequency::from_hertz(value), prec = prec); + assert_eq!(s(0.0), "0 Hz"); + + assert_eq!(s(1.558), "1.558 Hz"); + assert_eq!(p(1.558, 1), "1.6 Hz"); + + assert_eq!(s(1.5508), "1.551 Hz"); + assert_eq!(p(1.5508, 5), "1.5508 Hz"); + + assert_eq!(s(2030.0), "2.03 kHz"); + + assert_eq!(s(2_030_022.0), "2.03 MHz"); + assert_eq!(s(2_030_022_123.0), "2.03 GHz"); + assert_eq!(p(2_030_022_123.0, 6), "2.030022 GHz"); + + assert_eq!(s(-1.558), "-1.558 Hz"); + assert_eq!(p(-1.558, 1), "-1.6 Hz"); + + assert_eq!(s(-2030.0), "-2.03 kHz"); + assert_eq!(p(-2030.0, 1), "-2 kHz"); + + assert_eq!(s(-2_030_022.0), "-2.03 MHz"); + assert_eq!(s(-2_030_022_123.0), "-2.03 GHz"); + assert_eq!(p(-2_030_022_123.0, 6), "-2.030022 GHz"); + } +} diff --git a/src/quantity/percentage.rs b/src/quantity/percentage.rs new file mode 100644 index 0000000..df4f171 --- /dev/null +++ b/src/quantity/percentage.rs @@ -0,0 +1,54 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! This module defines the `Percentage` quantity and its operations. + +qty_ctor! { + #[doc = "A quantity representing a percentage (0% to 100%)."] + Percentage => { + (from_percentage, as_percentage, "%", 1.0), + (from_fraction, as_fraction, None, 100.0), + } +} + +#[cfg(test)] +mod tests { + use super::Percentage; + use crate::quantity::{Quantity as _, test_utils::assert_f32_eq}; + + #[test] + fn test_percentage() { + let perc_1 = Percentage::from_percentage(50.0); + + assert_f32_eq(perc_1.as_percentage(), 50.0); + assert_f32_eq(perc_1.as_fraction(), 0.5); + + let perc_2 = Percentage::from_fraction(0.8); + assert_f32_eq(perc_2.as_percentage(), 80.0); + assert_f32_eq(perc_2.as_fraction(), 0.8); + + assert!(perc_1 < perc_2); + assert!(perc_2 > perc_1); + + assert_f32_eq((perc_1 + perc_2).as_percentage(), 130.0); + assert_f32_eq((perc_2 - perc_1).as_percentage(), 30.0); + assert_f32_eq((perc_1 * 2.0).as_percentage(), 100.0); + assert_f32_eq((perc_2 / 2.0).as_percentage(), 40.0); + assert_f32_eq((perc_1 * perc_2).as_percentage(), 40.0); + assert_f32_eq(perc_2 / perc_1, 1.6); + + assert_f32_eq(Percentage::zero().as_percentage(), 0.0); + } + + #[test] + fn test_percentage_formatting() { + let s = |value| Percentage::from_percentage(value).to_string(); + let p = |value, prec| format!("{:.prec$}", Percentage::from_percentage(value), prec = prec); + assert_eq!(s(0.0), "0 %"); + assert_eq!(s(12.3456), "12.346 %"); + assert_eq!(p(12.3456, 2), "12.35 %"); + assert_eq!(p(12.3456, 4), "12.3456 %"); + assert_eq!(p(12.3456, 5), "12.3456 %"); + assert_eq!(s(100.0), "100 %"); + } +} diff --git a/src/quantity/power.rs b/src/quantity/power.rs new file mode 100644 index 0000000..8edae43 --- /dev/null +++ b/src/quantity/power.rs @@ -0,0 +1,138 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! This module defines the `Power` quantity and its operations. + +use super::{Current, Energy, Voltage}; + +qty_ctor! { + #[doc = "A physical quantity representing active power."] + Power => { + (from_milliwatts, as_milliwatts, "mW", 1e-3), + (from_watts, as_watts, "W", 1e0), + (from_kilowatts, as_kilowatts, "kW", 1e3), + (from_megawatts, as_megawatts, "MW", 1e6), + (from_gigawatts, as_gigawatts, "GW", 1e9), + } +} + +impl std::ops::Div for Power { + type Output = Current; + + fn div(self, voltage: Voltage) -> Self::Output { + Current::from_amperes(self.as_watts() / voltage.as_volts()) + } +} + +impl std::ops::Div for Power { + type Output = Voltage; + + fn div(self, current: Current) -> Self::Output { + Voltage::from_volts(self.as_watts() / current.as_amperes()) + } +} + +impl std::ops::Mul for Power { + type Output = Energy; + + fn mul(self, duration: std::time::Duration) -> Self::Output { + Energy::from_watthours(self.as_watts() * duration.as_secs_f32() / 3600.0) + } +} + +#[cfg(test)] +mod tests { + use super::Power; + use super::{Current, Energy, Voltage}; + use crate::quantity::Percentage; + use crate::quantity::{Quantity as _, test_utils::assert_f32_eq}; + + #[test] + fn test_power() { + let power_1 = Power::from_watts(1000.0); + assert_f32_eq(power_1.as_milliwatts(), 1_000_000.0); + assert_f32_eq(power_1.as_watts(), 1000.0); + assert_f32_eq(power_1.as_kilowatts(), 1.0); + assert_f32_eq(power_1.as_megawatts(), 0.001); + assert_f32_eq(power_1.as_gigawatts(), 0.000_001); + + let power_2 = Power::from_milliwatts(1_200_000.0); + assert_f32_eq(power_2.as_watts(), 1200.0); + let power_2 = Power::from_kilowatts(1.2); + assert_f32_eq(power_2.as_watts(), 1200.0); + let power_2 = Power::from_megawatts(0.001_2); + assert_f32_eq(power_2.as_watts(), 1200.0); + let power_2 = Power::from_gigawatts(0.000_001_2); + assert_f32_eq(power_2.as_watts(), 1200.0); + + assert!(power_1 < power_2); + assert!(power_2 > power_1); + + assert_f32_eq((power_1 + power_2).as_watts(), 2200.0); + assert_f32_eq((power_2 - power_1).as_watts(), 200.0); + assert_f32_eq((power_2 * 2.0).as_watts(), 2400.0); + assert_f32_eq((power_2 / 2.0).as_watts(), 600.0); + assert_f32_eq( + (power_2 * Percentage::from_percentage(80.0)).as_watts(), + 960.0, + ); + assert_f32_eq(power_2 / power_1, 1.2); + + assert_f32_eq(Power::zero().as_watts(), 0.0); + } + + #[test] + fn test_power_voltage_current() { + let power = Power::from_kilowatts(2.0); + let voltage = Voltage::from_volts(400.0); + let current = Current::from_amperes(5.0); + + let computed_current = power / voltage; + assert_f32_eq(computed_current.as_amperes(), 5.0); + + let computed_voltage = power / current; + assert_f32_eq(computed_voltage.as_volts(), 400.0); + } + + #[test] + fn test_power_energy_duration() { + let power = Power::from_kilowatts(0.5); + let duration = std::time::Duration::from_secs(7200); + let energy = Energy::from_kilowatthours(1.0); + + let computed_energy = power * duration; + assert_f32_eq(computed_energy.as_kilowatthours(), 1.0); + + let computed_power = energy / duration; + assert_f32_eq(computed_power.as_kilowatts(), 0.5); + } + + #[test] + fn test_power_formatting() { + let s = |value| Power::from_watts(value).to_string(); + let p = |value, prec| format!("{:.prec$}", Power::from_watts(value), prec = prec); + assert_eq!(s(0.0), "0 mW"); + + assert_eq!(s(1.558), "1.558 W"); + assert_eq!(p(1.558, 1), "1.6 W"); + + assert_eq!(s(1.5508), "1.551 W"); + assert_eq!(p(1.5508, 5), "1.5508 W"); + + assert_eq!(s(2030.0), "2.03 kW"); + + assert_eq!(s(2_030_022.0), "2.03 MW"); + assert_eq!(s(2_030_022_123.0), "2.03 GW"); + assert_eq!(p(2_030_022_123.0, 6), "2.030022 GW"); + + assert_eq!(s(-1.558), "-1.558 W"); + assert_eq!(p(-1.558, 1), "-1.6 W"); + + assert_eq!(s(-2030.0), "-2.03 kW"); + assert_eq!(p(-2030.0, 1), "-2 kW"); + + assert_eq!(s(-2_030_022.0), "-2.03 MW"); + assert_eq!(s(-2_030_022_123.0), "-2.03 GW"); + assert_eq!(p(-2_030_022_123.0, 6), "-2.030022 GW"); + } +} diff --git a/src/quantity/reactive_power.rs b/src/quantity/reactive_power.rs new file mode 100644 index 0000000..0c81434 --- /dev/null +++ b/src/quantity/reactive_power.rs @@ -0,0 +1,131 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! This module defines the `ReactivePower` quantity and its operations. + +use super::{Current, Voltage}; + +qty_ctor! { + #[doc = "A physical quantity representing reactive power."] + ReactivePower => { + (from_millivolt_amperes_reactive, as_millivolt_amperes_reactive, "mVAR", 1e-3), + (from_volt_amperes_reactive, as_volt_amperes_reactive, "VAR", 1e0), + (from_kilovolt_amperes_reactive, as_kilovolt_amperes_reactive, "kVAR", 1e3), + (from_megavolt_amperes_reactive, as_megavolt_amperes_reactive, "MVAR", 1e6), + (from_gigavolt_amperes_reactive, as_gigavolt_amperes_reactive, "GVAR", 1e9), + } +} + +impl std::ops::Div for ReactivePower { + type Output = Current; + + fn div(self, voltage: Voltage) -> Self::Output { + Current::from_amperes(self.as_volt_amperes_reactive() / voltage.as_volts()) + } +} + +impl std::ops::Div for ReactivePower { + type Output = Voltage; + + fn div(self, current: Current) -> Self::Output { + Voltage::from_volts(self.as_volt_amperes_reactive() / current.as_amperes()) + } +} + +#[cfg(test)] +mod tests { + use crate::quantity::{ + Percentage, Quantity as _, ReactivePower, Voltage, test_utils::assert_f32_eq, + }; + + #[test] + fn test_reactive_power() { + let reactive_power_1 = ReactivePower::from_volt_amperes_reactive(1000.0); + assert_f32_eq( + reactive_power_1.as_millivolt_amperes_reactive(), + 1_000_000.0, + ); + assert_f32_eq(reactive_power_1.as_volt_amperes_reactive(), 1000.0); + assert_f32_eq(reactive_power_1.as_kilovolt_amperes_reactive(), 1.0); + assert_f32_eq(reactive_power_1.as_megavolt_amperes_reactive(), 0.001); + assert_f32_eq(reactive_power_1.as_gigavolt_amperes_reactive(), 0.000_001); + + let reactive_power_2 = ReactivePower::from_millivolt_amperes_reactive(1_200_000.0); + assert_f32_eq(reactive_power_2.as_volt_amperes_reactive(), 1200.0); + let reactive_power_2 = ReactivePower::from_kilovolt_amperes_reactive(1.2); + assert_f32_eq(reactive_power_2.as_volt_amperes_reactive(), 1200.0); + let reactive_power_2 = ReactivePower::from_megavolt_amperes_reactive(0.0012); + assert_f32_eq(reactive_power_2.as_volt_amperes_reactive(), 1200.0); + let reactive_power_2 = ReactivePower::from_gigavolt_amperes_reactive(0.000_001_2); + assert_f32_eq(reactive_power_2.as_volt_amperes_reactive(), 1200.0); + + assert!(reactive_power_1 < reactive_power_2); + assert!(reactive_power_2 > reactive_power_1); + + assert_f32_eq( + (reactive_power_1 + reactive_power_2).as_volt_amperes_reactive(), + 2200.0, + ); + assert_f32_eq( + (reactive_power_2 - reactive_power_1).as_volt_amperes_reactive(), + 200.0, + ); + assert_f32_eq((reactive_power_1 * 2.0).as_volt_amperes_reactive(), 2000.0); + assert_f32_eq((reactive_power_2 / 2.0).as_volt_amperes_reactive(), 600.0); + assert_f32_eq( + (reactive_power_2 * Percentage::from_percentage(50.0)).as_volt_amperes_reactive(), + 600.0, + ); + assert_f32_eq(reactive_power_2 / reactive_power_1, 1.2); + assert_f32_eq(ReactivePower::zero().as_volt_amperes_reactive(), 0.0); + } + + #[test] + fn test_reactive_power_voltage_current() { + let reactive_power = ReactivePower::from_kilovolt_amperes_reactive(1.0); + let voltage = Voltage::from_volts(1000.0); + + let current = reactive_power / voltage; + assert_f32_eq(current.as_amperes(), 1.0); + + let voltage_calculated = reactive_power / current; + assert_f32_eq(voltage_calculated.as_volts(), 1000.0); + } + + #[test] + fn test_reactive_power_formatting() { + let s = |value| ReactivePower::from_volt_amperes_reactive(value).to_string(); + let p = |value, prec| { + format!( + "{:.prec$}", + ReactivePower::from_volt_amperes_reactive(value), + prec = prec + ) + }; + assert_eq!(s(0.0), "0 mVAR"); + + assert_eq!(s(1.558), "1.558 VAR"); + assert_eq!(p(1.558, 1), "1.6 VAR"); + + assert_eq!(s(1.5508), "1.551 VAR"); + assert_eq!(p(1.5508, 5), "1.5508 VAR"); + + assert_eq!(s(2030.0), "2.03 kVAR"); + + assert_eq!(s(2_030_022.0), "2.03 MVAR"); + assert_eq!(s(2_030_022_123.0), "2.03 GVAR"); + assert_eq!(p(2_030_022_123.0, 6), "2.030022 GVAR"); + + assert_eq!(s(-1.558), "-1.558 VAR"); + assert_eq!(p(-1.558, 1), "-1.6 VAR"); + + assert_eq!(s(-2030.0), "-2.03 kVAR"); + assert_eq!(p(-2030.0, 1), "-2 kVAR"); + + assert_eq!(s(-2_030_022.0), "-2.03 MVAR"); + assert_eq!(p(-2_030_022.0, 6), "-2.030022 MVAR"); + + assert_eq!(s(-2_030_022_123.0), "-2.03 GVAR"); + assert_eq!(p(-2_030_022_123.0, 6), "-2.030022 GVAR"); + } +} diff --git a/src/quantity/voltage.rs b/src/quantity/voltage.rs new file mode 100644 index 0000000..55b3953 --- /dev/null +++ b/src/quantity/voltage.rs @@ -0,0 +1,102 @@ +// License: MIT +// Copyright © 2025 Frequenz Energy-as-a-Service GmbH + +//! This module defines the `Voltage` quantity and its operations. + +use super::{Current, Power}; + +qty_ctor! { + #[doc = "A physical quantity representing voltage."] + Voltage => { + (from_millivolts, as_millivolts, "mV", 1e-3), + (from_volts, as_volts, "V", 1e0), + (from_kilovolts, as_kilovolts, "kV", 1e3), + } +} + +impl std::ops::Mul for Voltage { + type Output = Power; + + fn mul(self, current: Current) -> Self::Output { + Power::from_watts(self.as_volts() * current.as_amperes()) + } +} + +#[cfg(test)] +mod tests { + use crate::quantity::{ + Current, Percentage, Power, Quantity as _, Voltage, test_utils::assert_f32_eq, + }; + + #[test] + fn test_voltage() { + let voltage_1 = Voltage::from_volts(1000.0); + + assert_f32_eq(voltage_1.as_millivolts(), 1_000_000.0); + assert_f32_eq(voltage_1.as_volts(), 1000.0); + assert_f32_eq(voltage_1.as_kilovolts(), 1.0); + + let voltage_2 = Voltage::from_millivolts(1_200_000.0); + assert_f32_eq(voltage_2.as_volts(), 1200.0); + + let voltage_2 = Voltage::from_kilovolts(1.2); + assert_f32_eq(voltage_2.as_volts(), 1200.0); + + assert!(voltage_1 < voltage_2); + assert!(voltage_2 > voltage_1); + + assert_f32_eq((voltage_1 + voltage_2).as_volts(), 2200.0); + assert_f32_eq((voltage_2 - voltage_1).as_volts(), 200.0); + assert_f32_eq((voltage_1 * 2.0).as_volts(), 2000.0); + assert_f32_eq((voltage_2 / 2.0).as_volts(), 600.0); + assert_f32_eq( + (voltage_2 * Percentage::from_percentage(50.0)).as_volts(), + 600.0, + ); + assert_f32_eq(voltage_2 / voltage_1, 1.2); + + assert_f32_eq(Voltage::zero().as_volts(), 0.0); + } + + #[test] + fn test_voltage_current_power() { + let voltage = Voltage::from_volts(230.0); + let current = Current::from_amperes(10.0); + let power = Power::from_watts(2300.0); + + assert_f32_eq((voltage * current).as_watts(), 2300.0); + assert_f32_eq((power / voltage).as_amperes(), 10.0); + assert_f32_eq((power / current).as_volts(), 230.0); + } + + #[test] + fn test_voltage_formatting() { + let s = |value| Voltage::from_volts(value).to_string(); + let p = |value, prec| format!("{:.prec$}", Voltage::from_volts(value), prec = prec); + assert_eq!(s(0.0), "0 mV"); + + assert_eq!(s(0.000_5), "0.5 mV"); + assert_eq!(p(0.000_5, 1), "0.5 mV"); + assert_eq!(s(0.5), "500 mV"); + + assert_eq!(s(1.558), "1.558 V"); + assert_eq!(p(1.558, 1), "1.6 V"); + assert_eq!(s(1_558.0), "1.558 kV"); + assert_eq!(p(1_558.0, 1), "1.6 kV"); + + assert_eq!(s(1.5508), "1.551 V"); + assert_eq!(p(1.5508, 5), "1.5508 V"); + + assert_eq!(s(2_030_000.0), "2030 kV"); + + assert_eq!(s(-0.000_5), "-0.5 mV"); + assert_eq!(p(-0.000_5, 1), "-0.5 mV"); + assert_eq!(s(-0.5), "-500 mV"); + + assert_eq!(s(-1.558), "-1.558 V"); + assert_eq!(p(-1.558, 1), "-1.6 V"); + assert_eq!(s(-1_558.0), "-1.558 kV"); + assert_eq!(p(-1_558.0, 1), "-1.6 kV"); + assert_eq!(s(-2_030_000.0), "-2030 kV"); + } +} diff --git a/src/sample.rs b/src/sample.rs index f118c03..37f432f 100644 --- a/src/sample.rs +++ b/src/sample.rs @@ -5,13 +5,13 @@ use chrono::{DateTime, Utc}; /// Represents a measurement of a microgrid metric, made at a specific time. #[derive(Clone, Debug, Default)] -pub struct Sample { +pub struct Sample { timestamp: DateTime, - value: Option, + value: Option, } -impl frequenz_resampling::Sample for Sample { - type Value = f32; +impl frequenz_resampling::Sample for Sample { + type Value = Q; fn new(timestamp: DateTime, value: Option) -> Self { Self { timestamp, value } @@ -26,9 +26,9 @@ impl frequenz_resampling::Sample for Sample { } } -impl Sample { +impl Sample { /// Creates a new `Sample` with the given timestamp and value. - pub fn new(timestamp: DateTime, value: Option) -> Self { + pub fn new(timestamp: DateTime, value: Option) -> Self { Self { timestamp, value } } @@ -38,7 +38,7 @@ impl Sample { } /// Returns the value of the sample. - pub fn value(&self) -> Option { + pub fn value(&self) -> Option { self.value } }