Skip to content

Commit 122626d

Browse files
authored
Implement an async composable Formula type (#12)
2 parents 3f549ba + ce65e8b commit 122626d

14 files changed

Lines changed: 1053 additions & 187 deletions

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ name = "frequenz_microgrid"
88
path = "src/lib.rs"
99

1010
[dependencies]
11+
async-trait = "0.1.89"
1112
chrono = "0.4"
1213
frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "0b28f99" }
1314
frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "463414a" }
@@ -23,5 +24,5 @@ tracing-subscriber = { version = "0.3" }
2324
tracing-subscriber = { version = "0.3", features = ["std", "env-filter"] }
2425

2526
[build-dependencies]
27+
prost-build = { version = "0.13", features = ["cleanup-markdown"] }
2628
tonic-build = "0.13"
27-
prost-build = "0.13"

examples/logical_meter.rs

Lines changed: 83 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use chrono::TimeDelta;
55
use frequenz_microgrid::{
6-
Error, Formula, LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, metric,
6+
Error, LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, metric, quantity::Power,
77
};
88
use tracing_subscriber::{
99
EnvFilter,
@@ -14,7 +14,7 @@ use tracing_subscriber::{
1414
#[tokio::main]
1515
async fn main() -> Result<(), Error> {
1616
tracing_subscriber::registry()
17-
.with(EnvFilter::new("info,frequenz_microgrid=debug"))
17+
.with(EnvFilter::new("info,frequenz_microgrid=warn"))
1818
.with(fmt::layer().with_file(true).with_line_number(true))
1919
.init();
2020

@@ -29,49 +29,113 @@ async fn main() -> Result<(), Error> {
2929

3030
// Create a formula that calculates `grid_power - battery_power`.
3131
let formula_grid = logical_meter.grid(metric::AcPowerActive)?;
32-
let formula_battery = logical_meter.battery(None, metric::AcPowerActive)?;
32+
let formula_pv = logical_meter.pv(None, metric::AcPowerActive)?;
3333
let formula_consumer = logical_meter.consumer(metric::AcPowerActive)?;
3434

35-
let formula = (logical_meter.grid(metric::AcPowerActive)?
36-
- logical_meter.battery(None, metric::AcPowerActive)?
37-
+ logical_meter.consumer(metric::AcPowerActive)?)?;
35+
let formula = logical_meter.grid(metric::AcPowerActive)?
36+
- logical_meter.pv(None, metric::AcPowerActive)?
37+
+ logical_meter.consumer(metric::AcPowerActive)?
38+
+ Power::from_kilowatts(100.0);
3839

3940
let mut rx = formula.subscribe().await?;
4041
let mut grid_rx = formula_grid.subscribe().await?;
41-
let mut battery_rx = formula_battery.subscribe().await?;
42+
let mut pv_rx = formula_pv.subscribe().await?;
4243
let mut consumer_rx = formula_consumer.subscribe().await?;
4344

4445
for _ in 0..3 {
4546
let sample = rx.recv().await.unwrap();
4647
let grid_sample = grid_rx.recv().await.unwrap();
47-
let battery_sample = battery_rx.recv().await.unwrap();
48+
let pv_sample = pv_rx.recv().await.unwrap();
4849
let consumer_sample = consumer_rx.recv().await.unwrap();
4950
tracing::info!(
50-
"grid({}) - battery({}) + consumer({}) = {}",
51-
grid_sample.value().unwrap(),
52-
battery_sample.value().unwrap(),
53-
consumer_sample.value().unwrap(),
54-
sample.value().unwrap()
51+
"grid({}) - pv({}) + consumer({}) + 100kW = {}",
52+
grid_sample
53+
.value()
54+
.map(|v| v.to_string())
55+
.unwrap_or_else(|| "None".to_string()),
56+
pv_sample
57+
.value()
58+
.map(|v| v.to_string())
59+
.unwrap_or_else(|| "None".to_string()),
60+
consumer_sample
61+
.value()
62+
.map(|v| v.to_string())
63+
.unwrap_or_else(|| "None".to_string()),
64+
sample
65+
.value()
66+
.map(|v| v.to_string())
67+
.unwrap_or_else(|| "None".to_string())
5568
);
5669
}
57-
let formula_grid_voltage = logical_meter
58-
.battery(None, metric::AcVoltagePhase1N)?
59-
.coalesce(logical_meter.pv(None, metric::AcVoltagePhase1N)?)?;
70+
71+
let formula_grid_voltage = logical_meter.grid(metric::AcVoltage)?.coalesce(
72+
logical_meter.grid(metric::AcVoltagePhase1N)?.avg(vec![
73+
logical_meter.grid(metric::AcVoltagePhase2N)?,
74+
logical_meter.grid(metric::AcVoltagePhase3N)?,
75+
])? * 3.0_f32.sqrt(),
76+
)?;
6077

6178
tracing::info!("formula_grid_voltage: {}", formula_grid_voltage);
6279
let mut grid_voltage_rx = formula_grid_voltage.subscribe().await?;
80+
6381
for _ in 0..3 {
6482
let sample = grid_voltage_rx.recv().await.unwrap();
65-
tracing::info!("grid voltage: {}", sample.value().unwrap());
83+
tracing::info!(
84+
"grid voltage: {}",
85+
sample
86+
.value()
87+
.map(|v| v.to_string())
88+
.unwrap_or_else(|| "None".to_string())
89+
);
6690
}
6791

6892
drop(rx);
6993
drop(grid_rx);
70-
drop(battery_rx);
94+
drop(pv_rx);
7195
drop(consumer_rx);
7296

97+
let mut p1 = logical_meter
98+
.grid(metric::AcVoltagePhase1N)?
99+
.subscribe()
100+
.await?;
101+
let mut p2 = logical_meter
102+
.grid(metric::AcVoltagePhase2N)?
103+
.subscribe()
104+
.await?;
105+
let mut p3 = logical_meter
106+
.grid(metric::AcVoltagePhase3N)?
107+
.subscribe()
108+
.await?;
109+
let mut three_phase = logical_meter.grid(metric::AcVoltage)?.subscribe().await?;
110+
73111
loop {
74112
let sample = grid_voltage_rx.recv().await.unwrap();
75-
tracing::info!("grid voltage: {}", sample.value().unwrap());
113+
let p1_sample = p1.recv().await.unwrap();
114+
let p2_sample = p2.recv().await.unwrap();
115+
let p3_sample = p3.recv().await.unwrap();
116+
let three_phase_sample = three_phase.recv().await.unwrap();
117+
tracing::info!(
118+
"grid voltage: Coalesce({}, Avg({}, {}, {}) * sqrt(3)) = {}",
119+
three_phase_sample
120+
.value()
121+
.map(|v| v.to_string())
122+
.unwrap_or_else(|| "None".to_string()),
123+
p1_sample
124+
.value()
125+
.map(|v| v.to_string())
126+
.unwrap_or_else(|| "None".to_string()),
127+
p2_sample
128+
.value()
129+
.map(|v| v.to_string())
130+
.unwrap_or_else(|| "None".to_string()),
131+
p3_sample
132+
.value()
133+
.map(|v| v.to_string())
134+
.unwrap_or_else(|| "None".to_string()),
135+
sample
136+
.value()
137+
.map(|v| v.to_string())
138+
.unwrap_or_else(|| "None".to_string())
139+
);
76140
}
77141
}

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub use client::MicrogridClientHandle;
99
mod error;
1010
pub use error::{Error, ErrorKind};
1111

12-
mod quantity;
12+
pub mod quantity;
1313

1414
mod proto;
1515

@@ -18,5 +18,5 @@ pub use sample::Sample;
1818

1919
mod logical_meter;
2020
pub use logical_meter::{
21-
AggregationFormula, Formula, LogicalMeterConfig, LogicalMeterHandle, metric,
21+
Formula, FormulaSubscriber, LogicalMeterConfig, LogicalMeterHandle, metric,
2222
};

src/logical_meter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
66
mod config;
77
mod formula;
8+
pub use formula::{Formula, FormulaSubscriber};
9+
810
mod logical_meter_actor;
911
mod logical_meter_handle;
1012
pub use logical_meter_handle::LogicalMeterHandle;
1113
pub mod metric;
1214

1315
pub use config::LogicalMeterConfig;
14-
pub use formula::{AggregationFormula, Formula};

src/logical_meter/formula.rs

Lines changed: 106 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3,45 +3,40 @@
33

44
//! Formula module for the logical meter.
55
6-
use frequenz_microgrid_component_graph::Formula as _;
7-
mod aggregation_formula;
8-
mod coalesce_formula;
6+
use async_trait::async_trait;
7+
pub(crate) mod aggregation_formula;
8+
mod async_formula;
9+
pub(crate) mod coalesce_formula;
910
pub(crate) mod graph_formula_provider;
10-
pub use aggregation_formula::AggregationFormula;
11-
pub use coalesce_formula::CoalesceFormula;
11+
pub use async_formula::Formula;
1212

13-
use crate::{Error, Sample, metric::Metric, quantity::Quantity};
13+
use crate::{
14+
Error, Sample, logical_meter::formula::async_formula::FormulaOperand, metric::Metric,
15+
quantity::Quantity,
16+
};
1417
use tokio::sync::{broadcast, mpsc};
1518

1619
use super::logical_meter_actor;
1720

1821
/// Connects logical meter formulas to the component graph formulas.
19-
pub(crate) trait GraphFormulaProvider: std::fmt::Display {
22+
pub(crate) trait GraphFormulaConnector: std::fmt::Display {
2023
type GraphFormulaType: frequenz_microgrid_component_graph::Formula;
2124
}
2225

23-
/// Defines a formula that can be subscribed to for receiving samples.
24-
pub(crate) trait FormulaSubscriber: std::fmt::Display {
25-
type MetricType: Metric;
26-
27-
fn subscribe(
28-
&self,
29-
) -> impl Future<
30-
Output = Result<
31-
broadcast::Receiver<Sample<<Self::MetricType as Metric>::QuantityType>>,
32-
Error,
33-
>,
34-
> + Send;
26+
#[async_trait]
27+
pub trait FormulaSubscriber: std::fmt::Display + Sync + Send {
28+
type QuantityType: Quantity;
29+
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<Self::QuantityType>>, Error>;
3530
}
3631

3732
/// Parameters for creating a logical meter formula.
38-
pub(super) struct FormulaParams<F: GraphFormulaProvider, M: Metric> {
33+
pub(super) struct FormulaParams<F: GraphFormulaConnector, M: Metric> {
3934
pub(super) formula: F::GraphFormulaType,
4035
pub(super) metric: M,
4136
pub(super) instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
4237
}
4338

44-
impl<F: GraphFormulaProvider, M: Metric> FormulaParams<F, M> {
39+
impl<F: GraphFormulaConnector, M: Metric> FormulaParams<F, M> {
4540
pub(super) fn new(
4641
formula: F::GraphFormulaType,
4742
metric: M,
@@ -55,54 +50,108 @@ impl<F: GraphFormulaProvider, M: Metric> FormulaParams<F, M> {
5550
}
5651
}
5752

58-
/// A trait that defines generic formula operations.
59-
pub trait Formula<Q: Quantity>: std::fmt::Display + Sized {
60-
fn coalesce(self, other: Self) -> Result<Self, Error>;
61-
fn min(self, other: Self) -> Result<Self, Error>;
62-
fn max(self, other: Self) -> Result<Self, Error>;
63-
fn subscribe(
64-
&self,
65-
) -> impl Future<Output = Result<broadcast::Receiver<Sample<Q>>, Error>> + Send;
53+
// TODO: extend previous Coalesce instead of creating a new one, etc.
54+
impl<Q> Formula<Q>
55+
where
56+
Q: Quantity + 'static,
57+
{
58+
pub fn coalesce(self, other: Formula<Q>) -> Result<Formula<Q>, Error> {
59+
match self {
60+
Formula::Coalesce(mut items) => {
61+
items.push(other.into());
62+
Ok(Formula::Coalesce(items))
63+
}
64+
_ => Ok(Formula::Coalesce(vec![
65+
FormulaOperand::Formula(Box::new(Formula::<Q, Q, f32>::Subscriber(Box::new(self)))),
66+
other.into(),
67+
])),
68+
}
69+
}
70+
71+
pub fn min(self, other: Formula<Q>) -> Result<Formula<Q>, Error> {
72+
match self {
73+
Formula::Min(mut items) => {
74+
items.push(other.into());
75+
Ok(Formula::Min(items))
76+
}
77+
_ => Ok(Formula::Min(vec![
78+
FormulaOperand::Formula(Box::new(Formula::<Q, Q, f32>::Subscriber(Box::new(self)))),
79+
other.into(),
80+
])),
81+
}
82+
}
83+
84+
pub fn max(self, other: Formula<Q>) -> Result<Formula<Q>, Error> {
85+
match self {
86+
Formula::Max(mut items) => {
87+
items.push(other.into());
88+
Ok(Formula::Max(items))
89+
}
90+
_ => Ok(Formula::Max(vec![
91+
FormulaOperand::Formula(Box::new(Formula::<Q, Q, f32>::Subscriber(Box::new(self)))),
92+
other.into(),
93+
])),
94+
}
95+
}
96+
97+
pub fn avg(self, others: Vec<Formula<Q>>) -> Result<Formula<Q>, Error> {
98+
let mut exprs: Vec<FormulaOperand<Q>> =
99+
vec![FormulaOperand::Formula(Box::new(
100+
Formula::<Q, Q, f32>::Subscriber(Box::new(self)),
101+
))];
102+
for other in others {
103+
exprs.push(other.into());
104+
}
105+
Ok(Formula::Avg(exprs))
106+
}
107+
108+
pub async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<Q>>, Error> {
109+
<Self as FormulaSubscriber>::subscribe(self).await
110+
}
66111
}
67112

68-
impl<T, Q, M> Formula<Q> for T
113+
impl<Q, F> std::ops::Add<F> for Formula<Q>
69114
where
70-
T: FormulaSubscriber<MetricType = M>
71-
+ GraphFormulaProvider
72-
+ From<FormulaParams<T, M>>
73-
+ Into<FormulaParams<T, M>>
74-
+ std::fmt::Display,
75-
Q: Quantity,
76-
M: Metric<QuantityType = Q>,
115+
F: Into<FormulaOperand<Q>>,
116+
Q: Quantity + 'static,
77117
{
78-
fn coalesce(self, other: Self) -> Result<Self, Error> {
79-
let mut params_self: FormulaParams<T, M> = self.into();
80-
let params_other: FormulaParams<T, M> = other.into();
118+
type Output = Formula<Q>;
81119

82-
params_self.formula = params_self.formula.coalesce(params_other.formula);
83-
Ok(params_self.into())
120+
fn add(self, other: F) -> Self::Output {
121+
Formula::Add(vec![FormulaOperand::Formula(Box::new(self)), other.into()])
84122
}
123+
}
85124

86-
fn min(self, other: Self) -> Result<Self, Error> {
87-
let mut params_self: FormulaParams<T, M> = self.into();
88-
let params_other: FormulaParams<T, M> = other.into();
125+
impl<Q, F> std::ops::Sub<F> for Formula<Q>
126+
where
127+
F: Into<FormulaOperand<Q>>,
128+
Q: Quantity + 'static,
129+
{
130+
type Output = Formula<Q>;
89131

90-
params_self.formula = params_self.formula.min(params_other.formula);
91-
Ok(params_self.into())
132+
fn sub(self, other: F) -> Self::Output {
133+
Formula::Subtract(vec![FormulaOperand::Formula(Box::new(self)), other.into()])
92134
}
135+
}
93136

94-
fn max(self, other: Self) -> Result<Self, Error> {
95-
let mut params_self: FormulaParams<T, M> = self.into();
96-
let params_other: FormulaParams<T, M> = other.into();
137+
impl<Q> std::ops::Mul<f32> for Formula<Q, Q, f32>
138+
where
139+
Q: Quantity + 'static,
140+
{
141+
type Output = Formula<Q, Q, f32>;
97142

98-
params_self.formula = params_self.formula.max(params_other.formula);
99-
Ok(params_self.into())
143+
fn mul(self, other: f32) -> Self::Output {
144+
Formula::<Q, Q, f32>::Multiply(FormulaOperand::<Q>::Formula(Box::new(self)), other.into())
100145
}
146+
}
147+
148+
impl<Q> std::ops::Div<f32> for Formula<Q, Q, f32>
149+
where
150+
Q: Quantity + 'static,
151+
{
152+
type Output = Formula<Q, Q, f32>;
101153

102-
fn subscribe(
103-
&self,
104-
) -> impl Future<Output = Result<broadcast::Receiver<Sample<M::QuantityType>>, Error>> + Send
105-
{
106-
<T as FormulaSubscriber>::subscribe(self)
154+
fn div(self, rhs: f32) -> Self::Output {
155+
Formula::<Q, Q, f32>::Divide(FormulaOperand::<Q>::Formula(Box::new(self)), rhs.into())
107156
}
108157
}

0 commit comments

Comments
 (0)