Skip to content

Commit 2bea387

Browse files
authored
Support coalesce/min/max operations on all formulas (#8)
2 parents 3476bfe + e1d3515 commit 2bea387

9 files changed

Lines changed: 232 additions & 100 deletions

File tree

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ path = "src/lib.rs"
99

1010
[dependencies]
1111
chrono = "0.4"
12-
frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "94dc826" }
13-
frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "e0567c0" }
12+
frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "b2fd616" }
13+
frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "463414a" }
1414
frequenz-resampling = { git = "https://github.com/frequenz-floss/frequenz-resampling-rs.git", rev = "ce84d66" }
1515
prost = "0.13"
1616
prost-types = "0.13"
@@ -19,6 +19,8 @@ tonic = "0.13"
1919
tracing = { version = "0.1" }
2020
tracing-subscriber = { version = "0.3" }
2121

22+
[dev-dependencies]
23+
tracing-subscriber = { version = "0.3", features = ["std", "env-filter"] }
2224

2325
[build-dependencies]
2426
tonic-build = "0.13"

examples/logical_meter.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,17 @@ use chrono::TimeDelta;
55
use frequenz_microgrid::{
66
Error, Formula, LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, metric,
77
};
8+
use tracing_subscriber::{
9+
EnvFilter,
10+
fmt::{self},
11+
prelude::*,
12+
};
813

914
#[tokio::main]
1015
async fn main() -> Result<(), Error> {
11-
tracing_subscriber::fmt::fmt()
12-
.with_file(true)
13-
.with_line_number(true)
16+
tracing_subscriber::registry()
17+
.with(EnvFilter::new("info,frequenz_microgrid=debug"))
18+
.with(fmt::layer().with_file(true).with_line_number(true))
1419
.init();
1520

1621
let client = MicrogridClientHandle::new("http://[::1]:8800");
@@ -36,7 +41,7 @@ async fn main() -> Result<(), Error> {
3641
let mut battery_rx = formula_battery.subscribe().await?;
3742
let mut consumer_rx = formula_consumer.subscribe().await?;
3843

39-
for _ in 0..10 {
44+
for _ in 0..3 {
4045
let sample = rx.recv().await.unwrap();
4146
let grid_sample = grid_rx.recv().await.unwrap();
4247
let battery_sample = battery_rx.recv().await.unwrap();
@@ -49,9 +54,22 @@ async fn main() -> Result<(), Error> {
4954
sample.value().unwrap()
5055
);
5156
}
57+
let formula_grid_voltage = logical_meter
58+
.battery(None, metric::AcVoltagePhase1N)?
59+
.coalesce(logical_meter.pv(None, metric::AcVoltagePhase1N)?)?;
5260

53-
let formula_grid_voltage = logical_meter.grid(metric::AcVoltagePhase1N)?;
61+
tracing::info!("formula_grid_voltage: {}", formula_grid_voltage);
5462
let mut grid_voltage_rx = formula_grid_voltage.subscribe().await?;
63+
for _ in 0..3 {
64+
let sample = grid_voltage_rx.recv().await.unwrap();
65+
tracing::info!("grid voltage: {}", sample.value().unwrap());
66+
}
67+
68+
drop(rx);
69+
drop(grid_rx);
70+
drop(battery_rx);
71+
drop(consumer_rx);
72+
5573
loop {
5674
let sample = grid_voltage_rx.recv().await.unwrap();
5775
tracing::info!("grid voltage: {}", sample.value().unwrap());

src/logical_meter/formula.rs

Lines changed: 98 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,111 @@
33

44
//! Formula module for the logical meter.
55
6+
use frequenz_microgrid_component_graph::Formula as _;
67
mod aggregation_formula;
78
mod coalesce_formula;
89
pub(crate) mod graph_formula_provider;
910
pub use aggregation_formula::AggregationFormula;
1011
pub use coalesce_formula::CoalesceFormula;
1112

12-
use crate::{Error, Sample};
13-
use tokio::sync::broadcast;
13+
use crate::{Error, Sample, proto::common::v1::metrics::Metric};
14+
use tokio::sync::{broadcast, mpsc};
15+
16+
use super::logical_meter_actor;
17+
18+
/// Connects logical meter formulas to the component graph formulas.
19+
pub(crate) trait GraphFormulaProvider: std::fmt::Display {
20+
type GraphFormulaType: frequenz_microgrid_component_graph::Formula;
21+
}
1422

1523
/// Defines a formula that can be subscribed to for receiving samples.
16-
pub trait Formula: std::fmt::Display {
24+
pub(crate) trait FormulaSubscriber: std::fmt::Display {
25+
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send;
26+
}
27+
28+
/// Parameters for creating a logical meter formula.
29+
pub(super) struct FormulaParams<F: GraphFormulaProvider> {
30+
pub(super) formula: F::GraphFormulaType,
31+
pub(super) metric: Metric,
32+
pub(super) instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
33+
}
34+
35+
impl<F: GraphFormulaProvider> FormulaParams<F> {
36+
pub(super) fn new(
37+
formula: F::GraphFormulaType,
38+
metric: Metric,
39+
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
40+
) -> Self {
41+
Self {
42+
formula,
43+
metric,
44+
instructions_tx,
45+
}
46+
}
47+
}
48+
49+
/// A trait that defines generic formula operations.
50+
pub trait Formula: std::fmt::Display + Sized {
51+
fn coalesce(self, other: Self) -> Result<Self, Error>;
52+
fn min(self, other: Self) -> Result<Self, Error>;
53+
fn max(self, other: Self) -> Result<Self, Error>;
1754
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send;
1855
}
56+
57+
impl<T> Formula for T
58+
where
59+
T: FormulaSubscriber
60+
+ GraphFormulaProvider
61+
+ From<FormulaParams<T>>
62+
+ Into<FormulaParams<T>>
63+
+ std::fmt::Display,
64+
{
65+
fn coalesce(self, other: Self) -> Result<Self, Error> {
66+
let mut params_self: FormulaParams<T> = self.into();
67+
let params_other: FormulaParams<T> = other.into();
68+
69+
if params_self.metric != params_other.metric {
70+
return Err(Error::invalid_metric(format!(
71+
"Cannot coalesce formulas with different metrics: {} and {}",
72+
params_self.metric.as_str_name(),
73+
params_other.metric.as_str_name()
74+
)));
75+
}
76+
params_self.formula = params_self.formula.coalesce(params_other.formula);
77+
Ok(params_self.into())
78+
}
79+
80+
fn min(self, other: Self) -> Result<Self, Error> {
81+
let mut params_self: FormulaParams<T> = self.into();
82+
let params_other: FormulaParams<T> = other.into();
83+
84+
if params_self.metric != params_other.metric {
85+
return Err(Error::invalid_metric(format!(
86+
"Cannot take min of formulas with different metrics: {} and {}",
87+
params_self.metric.as_str_name(),
88+
params_other.metric.as_str_name()
89+
)));
90+
}
91+
params_self.formula = params_self.formula.min(params_other.formula);
92+
Ok(params_self.into())
93+
}
94+
95+
fn max(self, other: Self) -> Result<Self, Error> {
96+
let mut params_self: FormulaParams<T> = self.into();
97+
let params_other: FormulaParams<T> = other.into();
98+
99+
if params_self.metric != params_other.metric {
100+
return Err(Error::invalid_metric(format!(
101+
"Cannot take max of formulas with different metrics: {} and {}",
102+
params_self.metric.as_str_name(),
103+
params_other.metric.as_str_name()
104+
)));
105+
}
106+
params_self.formula = params_self.formula.max(params_other.formula);
107+
Ok(params_self.into())
108+
}
109+
110+
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send {
111+
<T as FormulaSubscriber>::subscribe(self)
112+
}
113+
}

src/logical_meter/formula/aggregation_formula.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
//! An formula that supports aggregation operations.
55
6-
use super::Formula;
6+
use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider};
77
use crate::{
88
Error, Sample, logical_meter::logical_meter_actor, proto::common::v1::metrics::Metric,
99
};
@@ -22,21 +22,11 @@ impl std::fmt::Display for AggregationFormula {
2222
}
2323
}
2424

25-
impl AggregationFormula {
26-
pub(crate) fn new(
27-
formula: frequenz_microgrid_component_graph::AggregationFormula,
28-
metric: Metric,
29-
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
30-
) -> Self {
31-
Self {
32-
formula,
33-
metric,
34-
instructions_tx,
35-
}
36-
}
25+
impl GraphFormulaProvider for AggregationFormula {
26+
type GraphFormulaType = frequenz_microgrid_component_graph::AggregationFormula;
3727
}
3828

39-
impl Formula for AggregationFormula {
29+
impl FormulaSubscriber for AggregationFormula {
4030
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample>, Error> {
4131
let (tx, rx) = oneshot::channel();
4232

@@ -56,6 +46,26 @@ impl Formula for AggregationFormula {
5646
}
5747
}
5848

49+
impl From<FormulaParams<AggregationFormula>> for AggregationFormula {
50+
fn from(params: FormulaParams<AggregationFormula>) -> Self {
51+
Self {
52+
formula: params.formula,
53+
metric: params.metric,
54+
instructions_tx: params.instructions_tx,
55+
}
56+
}
57+
}
58+
59+
impl From<AggregationFormula> for FormulaParams<AggregationFormula> {
60+
fn from(formula: AggregationFormula) -> Self {
61+
FormulaParams {
62+
formula: formula.formula,
63+
metric: formula.metric,
64+
instructions_tx: formula.instructions_tx,
65+
}
66+
}
67+
}
68+
5969
impl std::ops::Add for AggregationFormula {
6070
type Output = Result<Self, Error>;
6171

@@ -67,7 +77,7 @@ impl std::ops::Add for AggregationFormula {
6777
)));
6878
}
6979
let new_formula = self.formula + other.formula;
70-
Ok(Self::new(new_formula, self.metric, self.instructions_tx))
80+
Ok(FormulaParams::new(new_formula, self.metric, self.instructions_tx).into())
7181
}
7282
}
7383

@@ -82,7 +92,7 @@ impl std::ops::Sub for AggregationFormula {
8292
)));
8393
}
8494
let new_formula = self.formula - other.formula;
85-
Ok(Self::new(new_formula, self.metric, self.instructions_tx))
95+
Ok(FormulaParams::new(new_formula, self.metric, self.instructions_tx).into())
8696
}
8797
}
8898

src/logical_meter/formula/coalesce_formula.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
//! An coalesce formula.
55
6-
use super::Formula;
6+
use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider};
77
use crate::{
88
Error, Sample, logical_meter::logical_meter_actor, proto::common::v1::metrics::Metric,
99
};
@@ -22,21 +22,11 @@ impl std::fmt::Display for CoalesceFormula {
2222
}
2323
}
2424

25-
impl CoalesceFormula {
26-
pub(crate) fn new(
27-
formula: frequenz_microgrid_component_graph::CoalesceFormula,
28-
metric: Metric,
29-
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
30-
) -> Self {
31-
Self {
32-
formula,
33-
metric,
34-
instructions_tx,
35-
}
36-
}
25+
impl GraphFormulaProvider for CoalesceFormula {
26+
type GraphFormulaType = frequenz_microgrid_component_graph::CoalesceFormula;
3727
}
3828

39-
impl Formula for CoalesceFormula {
29+
impl FormulaSubscriber for CoalesceFormula {
4030
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample>, Error> {
4131
let (tx, rx) = oneshot::channel();
4232

@@ -55,3 +45,23 @@ impl Formula for CoalesceFormula {
5545
Ok(receiver)
5646
}
5747
}
48+
49+
impl From<FormulaParams<CoalesceFormula>> for CoalesceFormula {
50+
fn from(params: FormulaParams<CoalesceFormula>) -> Self {
51+
Self {
52+
formula: params.formula,
53+
metric: params.metric,
54+
instructions_tx: params.instructions_tx,
55+
}
56+
}
57+
}
58+
59+
impl From<CoalesceFormula> for FormulaParams<CoalesceFormula> {
60+
fn from(formula: CoalesceFormula) -> Self {
61+
FormulaParams {
62+
formula: formula.formula,
63+
metric: formula.metric,
64+
instructions_tx: formula.instructions_tx,
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)