Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
856981e
Update resampler time at the start of resampling
shsms Aug 25, 2025
c4d4abb
Rename timer field to resampler_timer for clarity
shsms Aug 25, 2025
bb82ad5
Refactor resampling logic into a dedicated function
shsms Aug 25, 2025
196e222
Add `kind` method to `Error` for retrieving error kind
shsms Aug 25, 2025
e260df0
Refactor resampler cleanup
shsms Aug 25, 2025
fa24e91
Rename `do_next` to `evaluate_formulas`
shsms Aug 25, 2025
7b666b6
Add quantity module with support for various physical quantities
shsms Aug 25, 2025
98613ed
Add tests for the quantity types
shsms Sep 30, 2025
ab13386
Make `Sample` struct generic over value type
shsms Aug 25, 2025
aa2d311
Add QuantityType to Metric trait as an associated type
shsms Aug 25, 2025
112c200
Update Formula trait to support generic Quantity type
shsms Aug 25, 2025
0b61acb
Update LogicalMeterFormula to support generic Quantity type
shsms Aug 25, 2025
7201570
Update `evaluate_formulas` to support generic transformation
shsms Aug 25, 2025
fd57f5e
Add TypedFormulaResponseSender enum and TryFrom implementation
shsms Aug 25, 2025
9ce335f
Add `Formulas` struct to manage logical meter formulas
shsms Aug 25, 2025
cb25391
Change log level to debug for missing metric data in LogicalMeterActor
shsms Aug 25, 2025
082ee30
Refactor resampler initialization into a dedicated method
shsms Aug 25, 2025
af95e6e
Implement `Quantity` types in logical meter streams
shsms Aug 25, 2025
254f0bf
Remove invalid metric checks from formula operations
shsms Aug 25, 2025
a181981
Remove default generic type for `Formula` and `Sample`
shsms Aug 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ macro_rules! ErrorKind {
}
}
)*

/// Returns the kind of error that occurred.
pub fn kind(&self) -> ErrorKind {
self.kind.clone()
}
}
};
}
Expand All @@ -51,8 +56,8 @@ ErrorKind!(
(ComponentGraphError, component_graph_error),
(ConnectionFailure, connection_failure),
(ChronoError, chrono_error),
(DroppedUnusedFormulas, dropped_unused_formulas),
(FormulaEngineError, formula_engine_error),
(InvalidMetric, invalid_metric),
(Internal, internal)
);

Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub use client::MicrogridClientHandle;
mod error;
pub use error::{Error, ErrorKind};

mod quantity;

mod proto;

mod sample;
Expand Down
81 changes: 38 additions & 43 deletions src/logical_meter/formula.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub(crate) mod graph_formula_provider;
pub use aggregation_formula::AggregationFormula;
pub use coalesce_formula::CoalesceFormula;

use crate::{Error, Sample, proto::common::v1alpha8::metrics::Metric};
use crate::{Error, Sample, metric::Metric, quantity::Quantity};
use tokio::sync::{broadcast, mpsc};

use super::logical_meter_actor;
Expand All @@ -22,20 +22,29 @@ pub(crate) trait GraphFormulaProvider: std::fmt::Display {

/// Defines a formula that can be subscribed to for receiving samples.
pub(crate) trait FormulaSubscriber: std::fmt::Display {
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send;
type MetricType: Metric;

fn subscribe(
&self,
) -> impl Future<
Output = Result<
broadcast::Receiver<Sample<<Self::MetricType as Metric>::QuantityType>>,
Error,
>,
> + Send;
Comment on lines +29 to +34
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably you have a good reason not to use async_trait here, but I tend to find these desugared async return types hard to read.
Can you maybe help me understand the decision against async_trait? Is it "fewer dependencies" (which is perfectly valid)?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but eventually this wasn't enough and we needed the dyn Future produced by async_trait, because of tokio requirements. So this changes in the next PR.

}

/// Parameters for creating a logical meter formula.
pub(super) struct FormulaParams<F: GraphFormulaProvider> {
pub(super) struct FormulaParams<F: GraphFormulaProvider, M: Metric> {
pub(super) formula: F::GraphFormulaType,
pub(super) metric: Metric,
pub(super) metric: M,
pub(super) instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
}

impl<F: GraphFormulaProvider> FormulaParams<F> {
impl<F: GraphFormulaProvider, M: Metric> FormulaParams<F, M> {
pub(super) fn new(
formula: F::GraphFormulaType,
metric: Metric,
metric: M,
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
) -> Self {
Self {
Expand All @@ -47,67 +56,53 @@ impl<F: GraphFormulaProvider> FormulaParams<F> {
}

/// A trait that defines generic formula operations.
pub trait Formula: std::fmt::Display + Sized {
pub trait Formula<Q: Quantity>: std::fmt::Display + Sized {
fn coalesce(self, other: Self) -> Result<Self, Error>;
fn min(self, other: Self) -> Result<Self, Error>;
fn max(self, other: Self) -> Result<Self, Error>;
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send;
fn subscribe(
&self,
) -> impl Future<Output = Result<broadcast::Receiver<Sample<Q>>, Error>> + Send;
}

impl<T> Formula for T
impl<T, Q, M> Formula<Q> for T
where
T: FormulaSubscriber
T: FormulaSubscriber<MetricType = M>
+ GraphFormulaProvider
+ From<FormulaParams<T>>
+ Into<FormulaParams<T>>
+ From<FormulaParams<T, M>>
+ Into<FormulaParams<T, M>>
+ std::fmt::Display,
Q: Quantity,
M: Metric<QuantityType = Q>,
{
fn coalesce(self, other: Self) -> Result<Self, Error> {
let mut params_self: FormulaParams<T> = self.into();
let params_other: FormulaParams<T> = other.into();

if params_self.metric != params_other.metric {
return Err(Error::invalid_metric(format!(
"Cannot coalesce formulas with different metrics: {} and {}",
params_self.metric.as_str_name(),
params_other.metric.as_str_name()
)));
}
let mut params_self: FormulaParams<T, M> = self.into();
let params_other: FormulaParams<T, M> = other.into();

params_self.formula = params_self.formula.coalesce(params_other.formula);
Ok(params_self.into())
}

fn min(self, other: Self) -> Result<Self, Error> {
let mut params_self: FormulaParams<T> = self.into();
let params_other: FormulaParams<T> = other.into();

if params_self.metric != params_other.metric {
return Err(Error::invalid_metric(format!(
"Cannot take min of formulas with different metrics: {} and {}",
params_self.metric.as_str_name(),
params_other.metric.as_str_name()
)));
}
let mut params_self: FormulaParams<T, M> = self.into();
let params_other: FormulaParams<T, M> = other.into();

params_self.formula = params_self.formula.min(params_other.formula);
Ok(params_self.into())
}

fn max(self, other: Self) -> Result<Self, Error> {
let mut params_self: FormulaParams<T> = self.into();
let params_other: FormulaParams<T> = other.into();

if params_self.metric != params_other.metric {
return Err(Error::invalid_metric(format!(
"Cannot take max of formulas with different metrics: {} and {}",
params_self.metric.as_str_name(),
params_other.metric.as_str_name()
)));
}
let mut params_self: FormulaParams<T, M> = self.into();
let params_other: FormulaParams<T, M> = other.into();

params_self.formula = params_self.formula.max(params_other.formula);
Ok(params_self.into())
}

fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send {
fn subscribe(
&self,
) -> impl Future<Output = Result<broadcast::Receiver<Sample<M::QuantityType>>, Error>> + Send
{
<T as FormulaSubscriber>::subscribe(self)
}
}
70 changes: 31 additions & 39 deletions src/logical_meter/formula/aggregation_formula.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,40 @@

use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider};
use crate::{
Error, Sample, logical_meter::logical_meter_actor, proto::common::v1alpha8::metrics::Metric,
Error, Sample, logical_meter::logical_meter_actor, metric::Metric, quantity::Quantity,
};
use tokio::sync::{broadcast, mpsc, oneshot};

#[derive(Clone)]
pub struct AggregationFormula {
pub struct AggregationFormula<M: Metric> {
formula: frequenz_microgrid_component_graph::AggregationFormula,
metric: Metric,
metric: M,
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
}

impl std::fmt::Display for AggregationFormula {
impl<M: Metric> std::fmt::Display for AggregationFormula<M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.formula.fmt(f)
}
}

impl GraphFormulaProvider for AggregationFormula {
impl<M: Metric> GraphFormulaProvider for AggregationFormula<M> {
type GraphFormulaType = frequenz_microgrid_component_graph::AggregationFormula;
}

impl FormulaSubscriber for AggregationFormula {
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample>, Error> {
impl<Q: Quantity + 'static, M: Metric<QuantityType = Q> + Sync> FormulaSubscriber
for AggregationFormula<M>
{
type MetricType = M;

async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<Q>>, Error> {
let (tx, rx) = oneshot::channel();

self.instructions_tx
.send(logical_meter_actor::Instruction::SubscribeFormula {
formula: self.formula.to_string(),
metric: self.metric,
response_tx: tx,
metric: M::METRIC,
response_tx: tx.try_into()?,
})
.await
.map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?;
Expand All @@ -46,8 +50,8 @@ impl FormulaSubscriber for AggregationFormula {
}
}

impl From<FormulaParams<AggregationFormula>> for AggregationFormula {
fn from(params: FormulaParams<AggregationFormula>) -> Self {
impl<M: Metric> From<FormulaParams<AggregationFormula<M>, M>> for AggregationFormula<M> {
fn from(params: FormulaParams<AggregationFormula<M>, M>) -> Self {
Self {
formula: params.formula,
metric: params.metric,
Expand All @@ -56,8 +60,8 @@ impl From<FormulaParams<AggregationFormula>> for AggregationFormula {
}
}

impl From<AggregationFormula> for FormulaParams<AggregationFormula> {
fn from(formula: AggregationFormula) -> Self {
impl<M: Metric> From<AggregationFormula<M>> for FormulaParams<AggregationFormula<M>, M> {
fn from(formula: AggregationFormula<M>) -> Self {
FormulaParams {
formula: formula.formula,
metric: formula.metric,
Expand All @@ -66,73 +70,61 @@ impl From<AggregationFormula> for FormulaParams<AggregationFormula> {
}
}

impl std::ops::Add for AggregationFormula {
impl<M: Metric> std::ops::Add for AggregationFormula<M> {
type Output = Result<Self, Error>;

fn add(self, other: Self) -> Self::Output {
if self.metric != other.metric {
return Err(Error::invalid_metric(format!(
"Cannot add formulas with different metrics: {} and {}",
self.metric as isize, other.metric as isize
)));
}
let new_formula = self.formula + other.formula;
Ok(FormulaParams::new(new_formula, self.metric, self.instructions_tx).into())
}
}

impl std::ops::Sub for AggregationFormula {
impl<M: Metric> std::ops::Sub for AggregationFormula<M> {
type Output = Result<Self, Error>;

fn sub(self, other: Self) -> Self::Output {
if self.metric != other.metric {
return Err(Error::invalid_metric(format!(
"Cannot subtract formulas with different metrics: {} and {}",
self.metric as isize, other.metric as isize
)));
}
let new_formula = self.formula - other.formula;
Ok(FormulaParams::new(new_formula, self.metric, self.instructions_tx).into())
}
}

impl std::ops::Add<AggregationFormula> for Result<AggregationFormula, Error> {
type Output = Result<AggregationFormula, Error>;
impl<M: Metric> std::ops::Add<AggregationFormula<M>> for Result<AggregationFormula<M>, Error> {
type Output = Result<AggregationFormula<M>, Error>;

fn add(self, other: AggregationFormula) -> Self::Output {
fn add(self, other: AggregationFormula<M>) -> Self::Output {
match self {
Ok(left) => left + other,
Err(e) => Err(e),
}
}
}

impl std::ops::Sub<AggregationFormula> for Result<AggregationFormula, Error> {
type Output = Result<AggregationFormula, Error>;
impl<M: Metric> std::ops::Sub<AggregationFormula<M>> for Result<AggregationFormula<M>, Error> {
type Output = Result<AggregationFormula<M>, Error>;

fn sub(self, other: AggregationFormula) -> Self::Output {
fn sub(self, other: AggregationFormula<M>) -> Self::Output {
match self {
Ok(left) => left - other,
Err(e) => Err(e),
}
}
}

impl std::ops::Add<Result<AggregationFormula, Error>> for AggregationFormula {
type Output = Result<AggregationFormula, Error>;
impl<M: Metric> std::ops::Add<Result<AggregationFormula<M>, Error>> for AggregationFormula<M> {
type Output = Result<AggregationFormula<M>, Error>;

fn add(self, other: Result<AggregationFormula, Error>) -> Self::Output {
fn add(self, other: Result<AggregationFormula<M>, Error>) -> Self::Output {
match other {
Ok(right) => self + right,
Err(e) => Err(e),
}
}
}

impl std::ops::Sub<Result<AggregationFormula, Error>> for AggregationFormula {
type Output = Result<AggregationFormula, Error>;
impl<M: Metric> std::ops::Sub<Result<AggregationFormula<M>, Error>> for AggregationFormula<M> {
type Output = Result<AggregationFormula<M>, Error>;

fn sub(self, other: Result<AggregationFormula, Error>) -> Self::Output {
fn sub(self, other: Result<AggregationFormula<M>, Error>) -> Self::Output {
match other {
Ok(right) => self - right,
Err(e) => Err(e),
Expand Down
30 changes: 17 additions & 13 deletions src/logical_meter/formula/coalesce_formula.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,40 @@

use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider};
use crate::{
Error, Sample, logical_meter::logical_meter_actor, proto::common::v1alpha8::metrics::Metric,
Error, Sample, logical_meter::logical_meter_actor, metric::Metric, quantity::Quantity,
};
use tokio::sync::{broadcast, mpsc, oneshot};

#[derive(Clone)]
pub struct CoalesceFormula {
pub struct CoalesceFormula<M: Metric> {
formula: frequenz_microgrid_component_graph::CoalesceFormula,
metric: Metric,
metric: M,
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
}

impl std::fmt::Display for CoalesceFormula {
impl<M: Metric> std::fmt::Display for CoalesceFormula<M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.formula.fmt(f)
}
}

impl GraphFormulaProvider for CoalesceFormula {
impl<M: Metric> GraphFormulaProvider for CoalesceFormula<M> {
type GraphFormulaType = frequenz_microgrid_component_graph::CoalesceFormula;
}

impl FormulaSubscriber for CoalesceFormula {
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample>, Error> {
impl<Q: Quantity + 'static, M: Metric<QuantityType = Q> + Sync> FormulaSubscriber
for CoalesceFormula<M>
{
type MetricType = M;

async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<Q>>, Error> {
let (tx, rx) = oneshot::channel();

self.instructions_tx
.send(logical_meter_actor::Instruction::SubscribeFormula {
formula: self.formula.to_string(),
metric: self.metric,
response_tx: tx,
metric: M::METRIC,
response_tx: tx.try_into()?,
})
.await
.map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?;
Expand All @@ -46,8 +50,8 @@ impl FormulaSubscriber for CoalesceFormula {
}
}

impl From<FormulaParams<CoalesceFormula>> for CoalesceFormula {
fn from(params: FormulaParams<CoalesceFormula>) -> Self {
impl<M: Metric> From<FormulaParams<CoalesceFormula<M>, M>> for CoalesceFormula<M> {
fn from(params: FormulaParams<CoalesceFormula<M>, M>) -> Self {
Self {
formula: params.formula,
metric: params.metric,
Expand All @@ -56,8 +60,8 @@ impl From<FormulaParams<CoalesceFormula>> for CoalesceFormula {
}
}

impl From<CoalesceFormula> for FormulaParams<CoalesceFormula> {
fn from(formula: CoalesceFormula) -> Self {
impl<M: Metric> From<CoalesceFormula<M>> for FormulaParams<CoalesceFormula<M>, M> {
fn from(formula: CoalesceFormula<M>) -> Self {
FormulaParams {
formula: formula.formula,
metric: formula.metric,
Expand Down
Loading