Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ path = "src/lib.rs"

[dependencies]
chrono = "0.4"
frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "94dc826" }
frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "e0567c0" }
frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "b2fd616" }
frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "463414a" }
frequenz-resampling = { git = "https://github.com/frequenz-floss/frequenz-resampling-rs.git", rev = "ce84d66" }
prost = "0.13"
prost-types = "0.13"
Expand All @@ -19,6 +19,8 @@ tonic = "0.13"
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3" }

[dev-dependencies]
tracing-subscriber = { version = "0.3", features = ["std", "env-filter"] }

[build-dependencies]
tonic-build = "0.13"
Expand Down
28 changes: 23 additions & 5 deletions examples/logical_meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ use chrono::TimeDelta;
use frequenz_microgrid::{
Error, Formula, LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, metric,
};
use tracing_subscriber::{
EnvFilter,
fmt::{self},
prelude::*,
};

#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt::fmt()
.with_file(true)
.with_line_number(true)
tracing_subscriber::registry()
.with(EnvFilter::new("info,frequenz_microgrid=debug"))
.with(fmt::layer().with_file(true).with_line_number(true))
.init();

let client = MicrogridClientHandle::new("http://[::1]:8800");
Expand All @@ -36,7 +41,7 @@ async fn main() -> Result<(), Error> {
let mut battery_rx = formula_battery.subscribe().await?;
let mut consumer_rx = formula_consumer.subscribe().await?;

for _ in 0..10 {
for _ in 0..3 {
let sample = rx.recv().await.unwrap();
let grid_sample = grid_rx.recv().await.unwrap();
let battery_sample = battery_rx.recv().await.unwrap();
Expand All @@ -49,9 +54,22 @@ async fn main() -> Result<(), Error> {
sample.value().unwrap()
);
}
let formula_grid_voltage = logical_meter
.battery(None, metric::AcVoltagePhase1N)?
.coalesce(logical_meter.pv(None, metric::AcVoltagePhase1N)?)?;

let formula_grid_voltage = logical_meter.grid(metric::AcVoltagePhase1N)?;
tracing::info!("formula_grid_voltage: {}", formula_grid_voltage);
let mut grid_voltage_rx = formula_grid_voltage.subscribe().await?;
for _ in 0..3 {
let sample = grid_voltage_rx.recv().await.unwrap();
tracing::info!("grid voltage: {}", sample.value().unwrap());
}

drop(rx);
drop(grid_rx);
drop(battery_rx);
drop(consumer_rx);

loop {
let sample = grid_voltage_rx.recv().await.unwrap();
tracing::info!("grid voltage: {}", sample.value().unwrap());
Expand Down
101 changes: 98 additions & 3 deletions src/logical_meter/formula.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,111 @@

//! Formula module for the logical meter.

use frequenz_microgrid_component_graph::Formula as _;
mod aggregation_formula;
mod coalesce_formula;
pub(crate) mod graph_formula_provider;
pub use aggregation_formula::AggregationFormula;
pub use coalesce_formula::CoalesceFormula;

use crate::{Error, Sample};
use tokio::sync::broadcast;
use crate::{Error, Sample, proto::common::v1::metrics::Metric};
use tokio::sync::{broadcast, mpsc};

use super::logical_meter_actor;

/// Connects logical meter formulas to the component graph formulas.
pub(crate) trait GraphFormulaProvider: std::fmt::Display {
type GraphFormulaType: frequenz_microgrid_component_graph::Formula;
}

/// Defines a formula that can be subscribed to for receiving samples.
pub trait Formula: std::fmt::Display {
pub(crate) trait FormulaSubscriber: std::fmt::Display {
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send;
Comment thread
shsms marked this conversation as resolved.
}

/// Parameters for creating a logical meter formula.
pub(super) struct FormulaParams<F: GraphFormulaProvider> {
pub(super) formula: F::GraphFormulaType,
pub(super) metric: Metric,
pub(super) instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
}

impl<F: GraphFormulaProvider> FormulaParams<F> {
pub(super) fn new(
formula: F::GraphFormulaType,
metric: Metric,
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
) -> Self {
Self {
formula,
metric,
instructions_tx,
}
}
}

/// A trait that defines generic formula operations.
pub trait Formula: std::fmt::Display + Sized {
fn coalesce(self, other: Self) -> Result<Self, Error>;
fn min(self, other: Self) -> Result<Self, Error>;
fn max(self, other: Self) -> Result<Self, Error>;
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send;
Comment thread
shsms marked this conversation as resolved.
}

impl<T> Formula for T
where
T: FormulaSubscriber
+ GraphFormulaProvider
+ From<FormulaParams<T>>
+ Into<FormulaParams<T>>
+ std::fmt::Display,
{
fn coalesce(self, other: Self) -> Result<Self, Error> {
let mut params_self: FormulaParams<T> = self.into();
let params_other: FormulaParams<T> = other.into();

if params_self.metric != params_other.metric {
return Err(Error::invalid_metric(format!(
"Cannot coalesce formulas with different metrics: {} and {}",
params_self.metric.as_str_name(),
params_other.metric.as_str_name()
)));
}
params_self.formula = params_self.formula.coalesce(params_other.formula);
Ok(params_self.into())
}

fn min(self, other: Self) -> Result<Self, Error> {
let mut params_self: FormulaParams<T> = self.into();
let params_other: FormulaParams<T> = other.into();

if params_self.metric != params_other.metric {
return Err(Error::invalid_metric(format!(
"Cannot take min of formulas with different metrics: {} and {}",
params_self.metric.as_str_name(),
params_other.metric.as_str_name()
)));
}
params_self.formula = params_self.formula.min(params_other.formula);
Ok(params_self.into())
}

fn max(self, other: Self) -> Result<Self, Error> {
let mut params_self: FormulaParams<T> = self.into();
let params_other: FormulaParams<T> = other.into();

if params_self.metric != params_other.metric {
return Err(Error::invalid_metric(format!(
"Cannot take max of formulas with different metrics: {} and {}",
params_self.metric.as_str_name(),
params_other.metric.as_str_name()
)));
}
params_self.formula = params_self.formula.max(params_other.formula);
Ok(params_self.into())
Comment thread
shsms marked this conversation as resolved.
Comment thread
shsms marked this conversation as resolved.
Comment thread
shsms marked this conversation as resolved.
}

fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send {
<T as FormulaSubscriber>::subscribe(self)
}
}
42 changes: 26 additions & 16 deletions src/logical_meter/formula/aggregation_formula.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

//! An formula that supports aggregation operations.

use super::Formula;
use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider};
use crate::{
Error, Sample, logical_meter::logical_meter_actor, proto::common::v1::metrics::Metric,
};
Expand All @@ -22,21 +22,11 @@ impl std::fmt::Display for AggregationFormula {
}
}

impl AggregationFormula {
pub(crate) fn new(
formula: frequenz_microgrid_component_graph::AggregationFormula,
metric: Metric,
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
) -> Self {
Self {
formula,
metric,
instructions_tx,
}
}
impl GraphFormulaProvider for AggregationFormula {
type GraphFormulaType = frequenz_microgrid_component_graph::AggregationFormula;
}

impl Formula for AggregationFormula {
impl FormulaSubscriber for AggregationFormula {
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample>, Error> {
let (tx, rx) = oneshot::channel();

Expand All @@ -56,6 +46,26 @@ impl Formula for AggregationFormula {
}
}

impl From<FormulaParams<AggregationFormula>> for AggregationFormula {
fn from(params: FormulaParams<AggregationFormula>) -> Self {
Self {
formula: params.formula,
metric: params.metric,
instructions_tx: params.instructions_tx,
}
}
}

impl From<AggregationFormula> for FormulaParams<AggregationFormula> {
fn from(formula: AggregationFormula) -> Self {
FormulaParams {
formula: formula.formula,
metric: formula.metric,
instructions_tx: formula.instructions_tx,
}
}
}

impl std::ops::Add for AggregationFormula {
type Output = Result<Self, Error>;

Expand All @@ -67,7 +77,7 @@ impl std::ops::Add for AggregationFormula {
)));
}
let new_formula = self.formula + other.formula;
Ok(Self::new(new_formula, self.metric, self.instructions_tx))
Ok(FormulaParams::new(new_formula, self.metric, self.instructions_tx).into())
}
}

Expand All @@ -82,7 +92,7 @@ impl std::ops::Sub for AggregationFormula {
)));
}
let new_formula = self.formula - other.formula;
Ok(Self::new(new_formula, self.metric, self.instructions_tx))
Ok(FormulaParams::new(new_formula, self.metric, self.instructions_tx).into())
}
}

Expand Down
38 changes: 24 additions & 14 deletions src/logical_meter/formula/coalesce_formula.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

//! An coalesce formula.

use super::Formula;
use super::{FormulaParams, FormulaSubscriber, GraphFormulaProvider};
use crate::{
Error, Sample, logical_meter::logical_meter_actor, proto::common::v1::metrics::Metric,
};
Expand All @@ -22,21 +22,11 @@ impl std::fmt::Display for CoalesceFormula {
}
}

impl CoalesceFormula {
pub(crate) fn new(
formula: frequenz_microgrid_component_graph::CoalesceFormula,
metric: Metric,
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
) -> Self {
Self {
formula,
metric,
instructions_tx,
}
}
impl GraphFormulaProvider for CoalesceFormula {
type GraphFormulaType = frequenz_microgrid_component_graph::CoalesceFormula;
}

impl Formula for CoalesceFormula {
impl FormulaSubscriber for CoalesceFormula {
async fn subscribe(&self) -> Result<broadcast::Receiver<Sample>, Error> {
let (tx, rx) = oneshot::channel();

Expand All @@ -55,3 +45,23 @@ impl Formula for CoalesceFormula {
Ok(receiver)
}
}

impl From<FormulaParams<CoalesceFormula>> for CoalesceFormula {
fn from(params: FormulaParams<CoalesceFormula>) -> Self {
Self {
formula: params.formula,
metric: params.metric,
instructions_tx: params.instructions_tx,
}
}
}

impl From<CoalesceFormula> for FormulaParams<CoalesceFormula> {
fn from(formula: CoalesceFormula) -> Self {
FormulaParams {
formula: formula.formula,
metric: formula.metric,
instructions_tx: formula.instructions_tx,
}
}
}
Loading