Skip to content

Commit 2c1d2b3

Browse files
committed
Implement Quantity types in logical meter streams
This is done as follows: - Add `QuantityType` as an associated type of the `Metric` trait. - Make `AggregationFormula` and `CoalesceFormula` generic over `Metric`, so that they get access to the `QuantityType`. - Add a lot more generics to intermediate types to achieve this. - Evaluate formulas in all 4 quantity groups in the active `Formulas`. - Cleanup resamplers only if they're not used in formulas in any of the 4 groups in `Formulas`. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
1 parent 7634490 commit 2c1d2b3

6 files changed

Lines changed: 136 additions & 101 deletions

File tree

src/logical_meter/formula.rs

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub(crate) mod graph_formula_provider;
1010
pub use aggregation_formula::AggregationFormula;
1111
pub use coalesce_formula::CoalesceFormula;
1212

13-
use crate::{Error, Sample, proto::common::v1alpha8::metrics::Metric, quantity::Quantity};
13+
use crate::{Error, Sample, metric::Metric, quantity::Quantity};
1414
use tokio::sync::{broadcast, mpsc};
1515

1616
use super::logical_meter_actor;
@@ -22,20 +22,29 @@ pub(crate) trait GraphFormulaProvider: std::fmt::Display {
2222

2323
/// Defines a formula that can be subscribed to for receiving samples.
2424
pub(crate) trait FormulaSubscriber: std::fmt::Display {
25-
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send;
25+
type MetricType: Metric;
26+
27+
fn subscribe(
28+
&self,
29+
) -> impl Future<
30+
Output = Result<
31+
broadcast::Receiver<Sample<<Self::MetricType as Metric>::QuantityType>>,
32+
Error,
33+
>,
34+
> + Send;
2635
}
2736

2837
/// Parameters for creating a logical meter formula.
29-
pub(super) struct FormulaParams<F: GraphFormulaProvider> {
38+
pub(super) struct FormulaParams<F: GraphFormulaProvider, M: Metric> {
3039
pub(super) formula: F::GraphFormulaType,
31-
pub(super) metric: Metric,
40+
pub(super) metric: M,
3241
pub(super) instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
3342
}
3443

35-
impl<F: GraphFormulaProvider> FormulaParams<F> {
44+
impl<F: GraphFormulaProvider, M: Metric> FormulaParams<F, M> {
3645
pub(super) fn new(
3746
formula: F::GraphFormulaType,
38-
metric: Metric,
47+
metric: M,
3948
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
4049
) -> Self {
4150
Self {
@@ -51,21 +60,24 @@ pub trait Formula<Q: Quantity = f32>: std::fmt::Display + Sized {
5160
fn coalesce(self, other: Self) -> Result<Self, Error>;
5261
fn min(self, other: Self) -> Result<Self, Error>;
5362
fn max(self, other: Self) -> Result<Self, Error>;
54-
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send;
63+
fn subscribe(
64+
&self,
65+
) -> impl Future<Output = Result<broadcast::Receiver<Sample<Q>>, Error>> + Send;
5566
}
5667

57-
impl<T, Q> Formula<Q> for T
68+
impl<T, Q, M> Formula<Q> for T
5869
where
59-
T: FormulaSubscriber
70+
T: FormulaSubscriber<MetricType = M>
6071
+ GraphFormulaProvider
61-
+ From<FormulaParams<T>>
62-
+ Into<FormulaParams<T>>
72+
+ From<FormulaParams<T, M>>
73+
+ Into<FormulaParams<T, M>>
6374
+ std::fmt::Display,
6475
Q: Quantity,
76+
M: Metric<QuantityType = Q>,
6577
{
6678
fn coalesce(self, other: Self) -> Result<Self, Error> {
67-
let mut params_self: FormulaParams<T> = self.into();
68-
let params_other: FormulaParams<T> = other.into();
79+
let mut params_self: FormulaParams<T, M> = self.into();
80+
let params_other: FormulaParams<T, M> = other.into();
6981

7082
if params_self.metric != params_other.metric {
7183
return Err(Error::invalid_metric(format!(
@@ -79,8 +91,8 @@ where
7991
}
8092

8193
fn min(self, other: Self) -> Result<Self, Error> {
82-
let mut params_self: FormulaParams<T> = self.into();
83-
let params_other: FormulaParams<T> = other.into();
94+
let mut params_self: FormulaParams<T, M> = self.into();
95+
let params_other: FormulaParams<T, M> = other.into();
8496

8597
if params_self.metric != params_other.metric {
8698
return Err(Error::invalid_metric(format!(
@@ -94,8 +106,8 @@ where
94106
}
95107

96108
fn max(self, other: Self) -> Result<Self, Error> {
97-
let mut params_self: FormulaParams<T> = self.into();
98-
let params_other: FormulaParams<T> = other.into();
109+
let mut params_self: FormulaParams<T, M> = self.into();
110+
let params_other: FormulaParams<T, M> = other.into();
99111

100112
if params_self.metric != params_other.metric {
101113
return Err(Error::invalid_metric(format!(
@@ -108,7 +120,10 @@ where
108120
Ok(params_self.into())
109121
}
110122

111-
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send {
123+
fn subscribe(
124+
&self,
125+
) -> impl Future<Output = Result<broadcast::Receiver<Sample<M::QuantityType>>, Error>> + Send
126+
{
112127
<T as FormulaSubscriber>::subscribe(self)
113128
}
114129
}

src/logical_meter/formula/aggregation_formula.rs

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,40 @@
55
66
use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider};
77
use crate::{
8-
Error, Sample, logical_meter::logical_meter_actor, proto::common::v1alpha8::metrics::Metric,
8+
Error, Sample, logical_meter::logical_meter_actor, metric::Metric, quantity::Quantity,
99
};
1010
use tokio::sync::{broadcast, mpsc, oneshot};
1111

1212
#[derive(Clone)]
13-
pub struct AggregationFormula {
13+
pub struct AggregationFormula<M: Metric> {
1414
formula: frequenz_microgrid_component_graph::AggregationFormula,
15-
metric: Metric,
15+
metric: M,
1616
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
1717
}
1818

19-
impl std::fmt::Display for AggregationFormula {
19+
impl<M: Metric> std::fmt::Display for AggregationFormula<M> {
2020
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2121
self.formula.fmt(f)
2222
}
2323
}
2424

25-
impl GraphFormulaProvider for AggregationFormula {
25+
impl<M: Metric> GraphFormulaProvider for AggregationFormula<M> {
2626
type GraphFormulaType = frequenz_microgrid_component_graph::AggregationFormula;
2727
}
2828

29-
impl FormulaSubscriber for AggregationFormula {
30-
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample>, Error> {
29+
impl<Q: Quantity + 'static, M: Metric<QuantityType = Q> + Sync> FormulaSubscriber
30+
for AggregationFormula<M>
31+
{
32+
type MetricType = M;
33+
34+
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<Q>>, Error> {
3135
let (tx, rx) = oneshot::channel();
3236

3337
self.instructions_tx
3438
.send(logical_meter_actor::Instruction::SubscribeFormula {
3539
formula: self.formula.to_string(),
36-
metric: self.metric,
37-
response_tx: tx,
40+
metric: M::METRIC,
41+
response_tx: tx.try_into()?,
3842
})
3943
.await
4044
.map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?;
@@ -46,8 +50,8 @@ impl FormulaSubscriber for AggregationFormula {
4650
}
4751
}
4852

49-
impl From<FormulaParams<AggregationFormula>> for AggregationFormula {
50-
fn from(params: FormulaParams<AggregationFormula>) -> Self {
53+
impl<M: Metric> From<FormulaParams<AggregationFormula<M>, M>> for AggregationFormula<M> {
54+
fn from(params: FormulaParams<AggregationFormula<M>, M>) -> Self {
5155
Self {
5256
formula: params.formula,
5357
metric: params.metric,
@@ -56,8 +60,8 @@ impl From<FormulaParams<AggregationFormula>> for AggregationFormula {
5660
}
5761
}
5862

59-
impl From<AggregationFormula> for FormulaParams<AggregationFormula> {
60-
fn from(formula: AggregationFormula) -> Self {
63+
impl<M: Metric> From<AggregationFormula<M>> for FormulaParams<AggregationFormula<M>, M> {
64+
fn from(formula: AggregationFormula<M>) -> Self {
6165
FormulaParams {
6266
formula: formula.formula,
6367
metric: formula.metric,
@@ -66,7 +70,7 @@ impl From<AggregationFormula> for FormulaParams<AggregationFormula> {
6670
}
6771
}
6872

69-
impl std::ops::Add for AggregationFormula {
73+
impl<M: Metric> std::ops::Add for AggregationFormula<M> {
7074
type Output = Result<Self, Error>;
7175

7276
fn add(self, other: Self) -> Self::Output {
@@ -81,7 +85,7 @@ impl std::ops::Add for AggregationFormula {
8185
}
8286
}
8387

84-
impl std::ops::Sub for AggregationFormula {
88+
impl<M: Metric> std::ops::Sub for AggregationFormula<M> {
8589
type Output = Result<Self, Error>;
8690

8791
fn sub(self, other: Self) -> Self::Output {
@@ -96,43 +100,43 @@ impl std::ops::Sub for AggregationFormula {
96100
}
97101
}
98102

99-
impl std::ops::Add<AggregationFormula> for Result<AggregationFormula, Error> {
100-
type Output = Result<AggregationFormula, Error>;
103+
impl<M: Metric> std::ops::Add<AggregationFormula<M>> for Result<AggregationFormula<M>, Error> {
104+
type Output = Result<AggregationFormula<M>, Error>;
101105

102-
fn add(self, other: AggregationFormula) -> Self::Output {
106+
fn add(self, other: AggregationFormula<M>) -> Self::Output {
103107
match self {
104108
Ok(left) => left + other,
105109
Err(e) => Err(e),
106110
}
107111
}
108112
}
109113

110-
impl std::ops::Sub<AggregationFormula> for Result<AggregationFormula, Error> {
111-
type Output = Result<AggregationFormula, Error>;
114+
impl<M: Metric> std::ops::Sub<AggregationFormula<M>> for Result<AggregationFormula<M>, Error> {
115+
type Output = Result<AggregationFormula<M>, Error>;
112116

113-
fn sub(self, other: AggregationFormula) -> Self::Output {
117+
fn sub(self, other: AggregationFormula<M>) -> Self::Output {
114118
match self {
115119
Ok(left) => left - other,
116120
Err(e) => Err(e),
117121
}
118122
}
119123
}
120124

121-
impl std::ops::Add<Result<AggregationFormula, Error>> for AggregationFormula {
122-
type Output = Result<AggregationFormula, Error>;
125+
impl<M: Metric> std::ops::Add<Result<AggregationFormula<M>, Error>> for AggregationFormula<M> {
126+
type Output = Result<AggregationFormula<M>, Error>;
123127

124-
fn add(self, other: Result<AggregationFormula, Error>) -> Self::Output {
128+
fn add(self, other: Result<AggregationFormula<M>, Error>) -> Self::Output {
125129
match other {
126130
Ok(right) => self + right,
127131
Err(e) => Err(e),
128132
}
129133
}
130134
}
131135

132-
impl std::ops::Sub<Result<AggregationFormula, Error>> for AggregationFormula {
133-
type Output = Result<AggregationFormula, Error>;
136+
impl<M: Metric> std::ops::Sub<Result<AggregationFormula<M>, Error>> for AggregationFormula<M> {
137+
type Output = Result<AggregationFormula<M>, Error>;
134138

135-
fn sub(self, other: Result<AggregationFormula, Error>) -> Self::Output {
139+
fn sub(self, other: Result<AggregationFormula<M>, Error>) -> Self::Output {
136140
match other {
137141
Ok(right) => self - right,
138142
Err(e) => Err(e),

src/logical_meter/formula/coalesce_formula.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,40 @@
55
66
use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider};
77
use crate::{
8-
Error, Sample, logical_meter::logical_meter_actor, proto::common::v1alpha8::metrics::Metric,
8+
Error, Sample, logical_meter::logical_meter_actor, metric::Metric, quantity::Quantity,
99
};
1010
use tokio::sync::{broadcast, mpsc, oneshot};
1111

1212
#[derive(Clone)]
13-
pub struct CoalesceFormula {
13+
pub struct CoalesceFormula<M: Metric> {
1414
formula: frequenz_microgrid_component_graph::CoalesceFormula,
15-
metric: Metric,
15+
metric: M,
1616
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
1717
}
1818

19-
impl std::fmt::Display for CoalesceFormula {
19+
impl<M: Metric> std::fmt::Display for CoalesceFormula<M> {
2020
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2121
self.formula.fmt(f)
2222
}
2323
}
2424

25-
impl GraphFormulaProvider for CoalesceFormula {
25+
impl<M: Metric> GraphFormulaProvider for CoalesceFormula<M> {
2626
type GraphFormulaType = frequenz_microgrid_component_graph::CoalesceFormula;
2727
}
2828

29-
impl FormulaSubscriber for CoalesceFormula {
30-
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample>, Error> {
29+
impl<Q: Quantity + 'static, M: Metric<QuantityType = Q> + Sync> FormulaSubscriber
30+
for CoalesceFormula<M>
31+
{
32+
type MetricType = M;
33+
34+
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<Q>>, Error> {
3135
let (tx, rx) = oneshot::channel();
3236

3337
self.instructions_tx
3438
.send(logical_meter_actor::Instruction::SubscribeFormula {
3539
formula: self.formula.to_string(),
36-
metric: self.metric,
37-
response_tx: tx,
40+
metric: M::METRIC,
41+
response_tx: tx.try_into()?,
3842
})
3943
.await
4044
.map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?;
@@ -46,8 +50,8 @@ impl FormulaSubscriber for CoalesceFormula {
4650
}
4751
}
4852

49-
impl From<FormulaParams<CoalesceFormula>> for CoalesceFormula {
50-
fn from(params: FormulaParams<CoalesceFormula>) -> Self {
53+
impl<M: Metric> From<FormulaParams<CoalesceFormula<M>, M>> for CoalesceFormula<M> {
54+
fn from(params: FormulaParams<CoalesceFormula<M>, M>) -> Self {
5155
Self {
5256
formula: params.formula,
5357
metric: params.metric,
@@ -56,8 +60,8 @@ impl From<FormulaParams<CoalesceFormula>> for CoalesceFormula {
5660
}
5761
}
5862

59-
impl From<CoalesceFormula> for FormulaParams<CoalesceFormula> {
60-
fn from(formula: CoalesceFormula) -> Self {
63+
impl<M: Metric> From<CoalesceFormula<M>> for FormulaParams<CoalesceFormula<M>, M> {
64+
fn from(formula: CoalesceFormula<M>) -> Self {
6165
FormulaParams {
6266
formula: formula.formula,
6367
metric: formula.metric,

src/logical_meter/formula/graph_formula_provider.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use crate::Error;
77
use crate::logical_meter::formula::FormulaParams;
88
use crate::logical_meter::logical_meter_actor;
9+
use crate::metric::Metric;
910
use crate::proto::common::v1alpha8::microgrid::electrical_components::{
1011
ElectricalComponent, ElectricalComponentConnection,
1112
};
@@ -19,9 +20,9 @@ use super::{AggregationFormula, CoalesceFormula};
1920
macro_rules! graph_formula_provider {
2021
($(($fnname:ident $(, ids:$idsparam:ident)? $(, id:$idparam:ident)?)),+ $(,)?) => {$(
2122

22-
fn $fnname<M: crate::metric::Metric>(
23+
fn $fnname(
2324
_graph: &ComponentGraph<ElectricalComponent, ElectricalComponentConnection>,
24-
_metric: M,
25+
_metric: Self::MetricType,
2526
_instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
2627
$($idsparam: Option<BTreeSet<u64>>,)?
2728
$($idparam: u64,)?
@@ -45,6 +46,8 @@ macro_rules! graph_formula_provider {
4546
/// `CoalesceFormula`s for each of these metrics. This trait provides a
4647
/// way to generalize them.
4748
pub trait GraphFormulaProvider: Sized {
49+
type MetricType: Metric;
50+
4851
graph_formula_provider!(
4952
(grid),
5053
(consumer),
@@ -65,9 +68,9 @@ macro_rules! impl_graph_formula_provider {
6568
$(, id:$idparam:ident)?
6669
)),+ $(,)?) => {$(
6770

68-
fn $fnname<M: crate::metric::Metric>(
71+
fn $fnname(
6972
graph: &ComponentGraph<ElectricalComponent, ElectricalComponentConnection>,
70-
_metric: M,
73+
_metric: Self::MetricType,
7174
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
7275
$($idsparam: Option<BTreeSet<u64>>,)?
7376
$($idparam: u64,)?
@@ -77,13 +80,15 @@ macro_rules! impl_graph_formula_provider {
7780
format!("Could not get {} formula: {e}", stringify!($fnname))
7881
)
7982
})?;
80-
Ok(FormulaParams::new(formula, M::METRIC, instructions_tx).into())
83+
Ok(FormulaParams::new(formula, _metric, instructions_tx).into())
8184
}
8285

8386
)+};
8487
}
8588

86-
impl GraphFormulaProvider for AggregationFormula {
89+
impl<M: Metric> GraphFormulaProvider for AggregationFormula<M> {
90+
type MetricType = M;
91+
8792
impl_graph_formula_provider!(
8893
(grid, grid_formula),
8994
(consumer, consumer_formula),
@@ -96,7 +101,9 @@ impl GraphFormulaProvider for AggregationFormula {
96101
);
97102
}
98103

99-
impl GraphFormulaProvider for CoalesceFormula {
104+
impl<M: Metric> GraphFormulaProvider for CoalesceFormula<M> {
105+
type MetricType = M;
106+
100107
impl_graph_formula_provider!(
101108
(grid, grid_coalesce_formula),
102109
(battery, battery_ac_coalesce_formula, ids: battery_ids),

0 commit comments

Comments
 (0)