Skip to content

Commit 3f549ba

Browse files
authored
Implement Quantity types in logical meter streams (#11)
This PR introduces `Quantity` types like `Current`, `Power`, `Voltage`, and updates the logical meter formulas to return values of these types, instead of floats. This PR also includes a number of other small improvements.
2 parents 1cb99a6 + a181981 commit 3f549ba

17 files changed

Lines changed: 1439 additions & 226 deletions

src/error.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ macro_rules! ErrorKind {
4343
}
4444
}
4545
)*
46+
47+
/// Returns the kind of error that occurred.
48+
pub fn kind(&self) -> ErrorKind {
49+
self.kind.clone()
50+
}
4651
}
4752
};
4853
}
@@ -51,8 +56,8 @@ ErrorKind!(
5156
(ComponentGraphError, component_graph_error),
5257
(ConnectionFailure, connection_failure),
5358
(ChronoError, chrono_error),
59+
(DroppedUnusedFormulas, dropped_unused_formulas),
5460
(FormulaEngineError, formula_engine_error),
55-
(InvalidMetric, invalid_metric),
5661
(Internal, internal)
5762
);
5863

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ pub use client::MicrogridClientHandle;
99
mod error;
1010
pub use error::{Error, ErrorKind};
1111

12+
mod quantity;
13+
1214
mod proto;
1315

1416
mod sample;

src/logical_meter/formula.rs

Lines changed: 38 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub(crate) mod graph_formula_provider;
1010
pub use aggregation_formula::AggregationFormula;
1111
pub use coalesce_formula::CoalesceFormula;
1212

13-
use crate::{Error, Sample, proto::common::v1alpha8::metrics::Metric};
13+
use crate::{Error, Sample, metric::Metric, quantity::Quantity};
1414
use tokio::sync::{broadcast, mpsc};
1515

1616
use super::logical_meter_actor;
@@ -22,20 +22,29 @@ pub(crate) trait GraphFormulaProvider: std::fmt::Display {
2222

2323
/// Defines a formula that can be subscribed to for receiving samples.
2424
pub(crate) trait FormulaSubscriber: std::fmt::Display {
25-
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send;
25+
type MetricType: Metric;
26+
27+
fn subscribe(
28+
&self,
29+
) -> impl Future<
30+
Output = Result<
31+
broadcast::Receiver<Sample<<Self::MetricType as Metric>::QuantityType>>,
32+
Error,
33+
>,
34+
> + Send;
2635
}
2736

2837
/// Parameters for creating a logical meter formula.
29-
pub(super) struct FormulaParams<F: GraphFormulaProvider> {
38+
pub(super) struct FormulaParams<F: GraphFormulaProvider, M: Metric> {
3039
pub(super) formula: F::GraphFormulaType,
31-
pub(super) metric: Metric,
40+
pub(super) metric: M,
3241
pub(super) instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
3342
}
3443

35-
impl<F: GraphFormulaProvider> FormulaParams<F> {
44+
impl<F: GraphFormulaProvider, M: Metric> FormulaParams<F, M> {
3645
pub(super) fn new(
3746
formula: F::GraphFormulaType,
38-
metric: Metric,
47+
metric: M,
3948
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
4049
) -> Self {
4150
Self {
@@ -47,67 +56,53 @@ impl<F: GraphFormulaProvider> FormulaParams<F> {
4756
}
4857

4958
/// A trait that defines generic formula operations.
50-
pub trait Formula: std::fmt::Display + Sized {
59+
pub trait Formula<Q: Quantity>: std::fmt::Display + Sized {
5160
fn coalesce(self, other: Self) -> Result<Self, Error>;
5261
fn min(self, other: Self) -> Result<Self, Error>;
5362
fn max(self, other: Self) -> Result<Self, Error>;
54-
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send;
63+
fn subscribe(
64+
&self,
65+
) -> impl Future<Output = Result<broadcast::Receiver<Sample<Q>>, Error>> + Send;
5566
}
5667

57-
impl<T> Formula for T
68+
impl<T, Q, M> Formula<Q> for T
5869
where
59-
T: FormulaSubscriber
70+
T: FormulaSubscriber<MetricType = M>
6071
+ GraphFormulaProvider
61-
+ From<FormulaParams<T>>
62-
+ Into<FormulaParams<T>>
72+
+ From<FormulaParams<T, M>>
73+
+ Into<FormulaParams<T, M>>
6374
+ std::fmt::Display,
75+
Q: Quantity,
76+
M: Metric<QuantityType = Q>,
6477
{
6578
fn coalesce(self, other: Self) -> Result<Self, Error> {
66-
let mut params_self: FormulaParams<T> = self.into();
67-
let params_other: FormulaParams<T> = other.into();
68-
69-
if params_self.metric != params_other.metric {
70-
return Err(Error::invalid_metric(format!(
71-
"Cannot coalesce formulas with different metrics: {} and {}",
72-
params_self.metric.as_str_name(),
73-
params_other.metric.as_str_name()
74-
)));
75-
}
79+
let mut params_self: FormulaParams<T, M> = self.into();
80+
let params_other: FormulaParams<T, M> = other.into();
81+
7682
params_self.formula = params_self.formula.coalesce(params_other.formula);
7783
Ok(params_self.into())
7884
}
7985

8086
fn min(self, other: Self) -> Result<Self, Error> {
81-
let mut params_self: FormulaParams<T> = self.into();
82-
let params_other: FormulaParams<T> = other.into();
83-
84-
if params_self.metric != params_other.metric {
85-
return Err(Error::invalid_metric(format!(
86-
"Cannot take min of formulas with different metrics: {} and {}",
87-
params_self.metric.as_str_name(),
88-
params_other.metric.as_str_name()
89-
)));
90-
}
87+
let mut params_self: FormulaParams<T, M> = self.into();
88+
let params_other: FormulaParams<T, M> = other.into();
89+
9190
params_self.formula = params_self.formula.min(params_other.formula);
9291
Ok(params_self.into())
9392
}
9493

9594
fn max(self, other: Self) -> Result<Self, Error> {
96-
let mut params_self: FormulaParams<T> = self.into();
97-
let params_other: FormulaParams<T> = other.into();
98-
99-
if params_self.metric != params_other.metric {
100-
return Err(Error::invalid_metric(format!(
101-
"Cannot take max of formulas with different metrics: {} and {}",
102-
params_self.metric.as_str_name(),
103-
params_other.metric.as_str_name()
104-
)));
105-
}
95+
let mut params_self: FormulaParams<T, M> = self.into();
96+
let params_other: FormulaParams<T, M> = other.into();
97+
10698
params_self.formula = params_self.formula.max(params_other.formula);
10799
Ok(params_self.into())
108100
}
109101

110-
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send {
102+
fn subscribe(
103+
&self,
104+
) -> impl Future<Output = Result<broadcast::Receiver<Sample<M::QuantityType>>, Error>> + Send
105+
{
111106
<T as FormulaSubscriber>::subscribe(self)
112107
}
113108
}

src/logical_meter/formula/aggregation_formula.rs

Lines changed: 31 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,40 @@
55
66
use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider};
77
use crate::{
8-
Error, Sample, logical_meter::logical_meter_actor, proto::common::v1alpha8::metrics::Metric,
8+
Error, Sample, logical_meter::logical_meter_actor, metric::Metric, quantity::Quantity,
99
};
1010
use tokio::sync::{broadcast, mpsc, oneshot};
1111

1212
#[derive(Clone)]
13-
pub struct AggregationFormula {
13+
pub struct AggregationFormula<M: Metric> {
1414
formula: frequenz_microgrid_component_graph::AggregationFormula,
15-
metric: Metric,
15+
metric: M,
1616
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
1717
}
1818

19-
impl std::fmt::Display for AggregationFormula {
19+
impl<M: Metric> std::fmt::Display for AggregationFormula<M> {
2020
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2121
self.formula.fmt(f)
2222
}
2323
}
2424

25-
impl GraphFormulaProvider for AggregationFormula {
25+
impl<M: Metric> GraphFormulaProvider for AggregationFormula<M> {
2626
type GraphFormulaType = frequenz_microgrid_component_graph::AggregationFormula;
2727
}
2828

29-
impl FormulaSubscriber for AggregationFormula {
30-
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample>, Error> {
29+
impl<Q: Quantity + 'static, M: Metric<QuantityType = Q> + Sync> FormulaSubscriber
30+
for AggregationFormula<M>
31+
{
32+
type MetricType = M;
33+
34+
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<Q>>, Error> {
3135
let (tx, rx) = oneshot::channel();
3236

3337
self.instructions_tx
3438
.send(logical_meter_actor::Instruction::SubscribeFormula {
3539
formula: self.formula.to_string(),
36-
metric: self.metric,
37-
response_tx: tx,
40+
metric: M::METRIC,
41+
response_tx: tx.try_into()?,
3842
})
3943
.await
4044
.map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?;
@@ -46,8 +50,8 @@ impl FormulaSubscriber for AggregationFormula {
4650
}
4751
}
4852

49-
impl From<FormulaParams<AggregationFormula>> for AggregationFormula {
50-
fn from(params: FormulaParams<AggregationFormula>) -> Self {
53+
impl<M: Metric> From<FormulaParams<AggregationFormula<M>, M>> for AggregationFormula<M> {
54+
fn from(params: FormulaParams<AggregationFormula<M>, M>) -> Self {
5155
Self {
5256
formula: params.formula,
5357
metric: params.metric,
@@ -56,8 +60,8 @@ impl From<FormulaParams<AggregationFormula>> for AggregationFormula {
5660
}
5761
}
5862

59-
impl From<AggregationFormula> for FormulaParams<AggregationFormula> {
60-
fn from(formula: AggregationFormula) -> Self {
63+
impl<M: Metric> From<AggregationFormula<M>> for FormulaParams<AggregationFormula<M>, M> {
64+
fn from(formula: AggregationFormula<M>) -> Self {
6165
FormulaParams {
6266
formula: formula.formula,
6367
metric: formula.metric,
@@ -66,73 +70,61 @@ impl From<AggregationFormula> for FormulaParams<AggregationFormula> {
6670
}
6771
}
6872

69-
impl std::ops::Add for AggregationFormula {
73+
impl<M: Metric> std::ops::Add for AggregationFormula<M> {
7074
type Output = Result<Self, Error>;
7175

7276
fn add(self, other: Self) -> Self::Output {
73-
if self.metric != other.metric {
74-
return Err(Error::invalid_metric(format!(
75-
"Cannot add formulas with different metrics: {} and {}",
76-
self.metric as isize, other.metric as isize
77-
)));
78-
}
7977
let new_formula = self.formula + other.formula;
8078
Ok(FormulaParams::new(new_formula, self.metric, self.instructions_tx).into())
8179
}
8280
}
8381

84-
impl std::ops::Sub for AggregationFormula {
82+
impl<M: Metric> std::ops::Sub for AggregationFormula<M> {
8583
type Output = Result<Self, Error>;
8684

8785
fn sub(self, other: Self) -> Self::Output {
88-
if self.metric != other.metric {
89-
return Err(Error::invalid_metric(format!(
90-
"Cannot subtract formulas with different metrics: {} and {}",
91-
self.metric as isize, other.metric as isize
92-
)));
93-
}
9486
let new_formula = self.formula - other.formula;
9587
Ok(FormulaParams::new(new_formula, self.metric, self.instructions_tx).into())
9688
}
9789
}
9890

99-
impl std::ops::Add<AggregationFormula> for Result<AggregationFormula, Error> {
100-
type Output = Result<AggregationFormula, Error>;
91+
impl<M: Metric> std::ops::Add<AggregationFormula<M>> for Result<AggregationFormula<M>, Error> {
92+
type Output = Result<AggregationFormula<M>, Error>;
10193

102-
fn add(self, other: AggregationFormula) -> Self::Output {
94+
fn add(self, other: AggregationFormula<M>) -> Self::Output {
10395
match self {
10496
Ok(left) => left + other,
10597
Err(e) => Err(e),
10698
}
10799
}
108100
}
109101

110-
impl std::ops::Sub<AggregationFormula> for Result<AggregationFormula, Error> {
111-
type Output = Result<AggregationFormula, Error>;
102+
impl<M: Metric> std::ops::Sub<AggregationFormula<M>> for Result<AggregationFormula<M>, Error> {
103+
type Output = Result<AggregationFormula<M>, Error>;
112104

113-
fn sub(self, other: AggregationFormula) -> Self::Output {
105+
fn sub(self, other: AggregationFormula<M>) -> Self::Output {
114106
match self {
115107
Ok(left) => left - other,
116108
Err(e) => Err(e),
117109
}
118110
}
119111
}
120112

121-
impl std::ops::Add<Result<AggregationFormula, Error>> for AggregationFormula {
122-
type Output = Result<AggregationFormula, Error>;
113+
impl<M: Metric> std::ops::Add<Result<AggregationFormula<M>, Error>> for AggregationFormula<M> {
114+
type Output = Result<AggregationFormula<M>, Error>;
123115

124-
fn add(self, other: Result<AggregationFormula, Error>) -> Self::Output {
116+
fn add(self, other: Result<AggregationFormula<M>, Error>) -> Self::Output {
125117
match other {
126118
Ok(right) => self + right,
127119
Err(e) => Err(e),
128120
}
129121
}
130122
}
131123

132-
impl std::ops::Sub<Result<AggregationFormula, Error>> for AggregationFormula {
133-
type Output = Result<AggregationFormula, Error>;
124+
impl<M: Metric> std::ops::Sub<Result<AggregationFormula<M>, Error>> for AggregationFormula<M> {
125+
type Output = Result<AggregationFormula<M>, Error>;
134126

135-
fn sub(self, other: Result<AggregationFormula, Error>) -> Self::Output {
127+
fn sub(self, other: Result<AggregationFormula<M>, Error>) -> Self::Output {
136128
match other {
137129
Ok(right) => self - right,
138130
Err(e) => Err(e),

src/logical_meter/formula/coalesce_formula.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,40 @@
55
66
use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider};
77
use crate::{
8-
Error, Sample, logical_meter::logical_meter_actor, proto::common::v1alpha8::metrics::Metric,
8+
Error, Sample, logical_meter::logical_meter_actor, metric::Metric, quantity::Quantity,
99
};
1010
use tokio::sync::{broadcast, mpsc, oneshot};
1111

1212
#[derive(Clone)]
13-
pub struct CoalesceFormula {
13+
pub struct CoalesceFormula<M: Metric> {
1414
formula: frequenz_microgrid_component_graph::CoalesceFormula,
15-
metric: Metric,
15+
metric: M,
1616
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
1717
}
1818

19-
impl std::fmt::Display for CoalesceFormula {
19+
impl<M: Metric> std::fmt::Display for CoalesceFormula<M> {
2020
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2121
self.formula.fmt(f)
2222
}
2323
}
2424

25-
impl GraphFormulaProvider for CoalesceFormula {
25+
impl<M: Metric> GraphFormulaProvider for CoalesceFormula<M> {
2626
type GraphFormulaType = frequenz_microgrid_component_graph::CoalesceFormula;
2727
}
2828

29-
impl FormulaSubscriber for CoalesceFormula {
30-
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample>, Error> {
29+
impl<Q: Quantity + 'static, M: Metric<QuantityType = Q> + Sync> FormulaSubscriber
30+
for CoalesceFormula<M>
31+
{
32+
type MetricType = M;
33+
34+
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<Q>>, Error> {
3135
let (tx, rx) = oneshot::channel();
3236

3337
self.instructions_tx
3438
.send(logical_meter_actor::Instruction::SubscribeFormula {
3539
formula: self.formula.to_string(),
36-
metric: self.metric,
37-
response_tx: tx,
40+
metric: M::METRIC,
41+
response_tx: tx.try_into()?,
3842
})
3943
.await
4044
.map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?;
@@ -46,8 +50,8 @@ impl FormulaSubscriber for CoalesceFormula {
4650
}
4751
}
4852

49-
impl From<FormulaParams<CoalesceFormula>> for CoalesceFormula {
50-
fn from(params: FormulaParams<CoalesceFormula>) -> Self {
53+
impl<M: Metric> From<FormulaParams<CoalesceFormula<M>, M>> for CoalesceFormula<M> {
54+
fn from(params: FormulaParams<CoalesceFormula<M>, M>) -> Self {
5155
Self {
5256
formula: params.formula,
5357
metric: params.metric,
@@ -56,8 +60,8 @@ impl From<FormulaParams<CoalesceFormula>> for CoalesceFormula {
5660
}
5761
}
5862

59-
impl From<CoalesceFormula> for FormulaParams<CoalesceFormula> {
60-
fn from(formula: CoalesceFormula) -> Self {
63+
impl<M: Metric> From<CoalesceFormula<M>> for FormulaParams<CoalesceFormula<M>, M> {
64+
fn from(formula: CoalesceFormula<M>) -> Self {
6165
FormulaParams {
6266
formula: formula.formula,
6367
metric: formula.metric,

0 commit comments

Comments
 (0)