Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ path = "src/lib.rs"

[dependencies]
chrono = "0.4"
frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "ab3b998" }
frequenz-microgrid-component-graph = { git = "https://github.com/frequenz-floss/frequenz-microgrid-component-graph-rs.git", rev = "94dc826" }
frequenz-microgrid-formula-engine = { git = "https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs.git", rev = "e0567c0" }
frequenz-resampling = { git = "https://github.com/frequenz-floss/frequenz-resampling-rs.git", rev = "ce84d66" }
prost = "0.13"
Expand Down
23 changes: 15 additions & 8 deletions examples/logical_meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use chrono::TimeDelta;
use frequenz_microgrid::{
Error, LogicalMeterConfig, LogicalMeterHandle, Metric, MicrogridClientHandle,
Error, Formula, LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, metric,
};

#[tokio::main]
Expand All @@ -23,20 +23,20 @@ async fn main() -> Result<(), Error> {
.await?;

// Create a formula that calculates `grid_power - battery_power`.
let formula_grid = logical_meter.grid(Metric::AcActivePower)?;
let formula_battery = logical_meter.battery(None, Metric::AcActivePower)?;
let formula_consumer = logical_meter.consumer(Metric::AcActivePower)?;
let formula_grid = logical_meter.grid(metric::AcActivePower)?;
let formula_battery = logical_meter.battery(None, metric::AcActivePower)?;
let formula_consumer = logical_meter.consumer(metric::AcActivePower)?;

let formula = (logical_meter.grid(Metric::AcActivePower)?
- logical_meter.battery(None, Metric::AcActivePower)?
+ logical_meter.consumer(Metric::AcActivePower)?)?;
let formula = (logical_meter.grid(metric::AcActivePower)?
- logical_meter.battery(None, metric::AcActivePower)?
+ logical_meter.consumer(metric::AcActivePower)?)?;

let mut rx = formula.subscribe().await?;
let mut grid_rx = formula_grid.subscribe().await?;
let mut battery_rx = formula_battery.subscribe().await?;
let mut consumer_rx = formula_consumer.subscribe().await?;

loop {
for _ in 0..10 {
let sample = rx.recv().await.unwrap();
let grid_sample = grid_rx.recv().await.unwrap();
let battery_sample = battery_rx.recv().await.unwrap();
Expand All @@ -49,4 +49,11 @@ async fn main() -> Result<(), Error> {
sample.value().unwrap()
);
}

let formula_grid_voltage = logical_meter.grid(metric::AcVoltagePhase1N)?;
let mut grid_voltage_rx = formula_grid_voltage.subscribe().await?;
loop {
let sample = grid_voltage_rx.recv().await.unwrap();
tracing::info!("grid voltage: {}", sample.value().unwrap());
}
}
32 changes: 16 additions & 16 deletions src/client/microgrid_client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl MicrogridClientActor {

let mut component_streams: HashMap<u64, broadcast::Sender<ComponentData>> = HashMap::new();

let (stream_stopped_tx, mut stream_stopped_rx) = mpsc::channel(50);
let (stream_status_tx, mut stream_status_rx) = mpsc::channel(50);
let mut retry_timer = tokio::time::interval(std::time::Duration::from_secs(1));
retry_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut components_to_retry = HashMap::new();
Expand All @@ -71,12 +71,12 @@ impl MicrogridClientActor {
&mut client,
&mut component_streams,
instruction,
stream_stopped_tx.clone(),
stream_status_tx.clone(),
).await {
tracing::error!("MicrogridClientActor: Error handling instruction: {e}");
}
}
stream_status = stream_stopped_rx.recv() => {
stream_status = stream_status_rx.recv() => {
match stream_status {
Some(StreamStatus::Failed(component_id)) => {
components_to_retry.entry(component_id).or_insert_with(
Expand All @@ -100,7 +100,7 @@ impl MicrogridClientActor {
&mut client,
&mut component_streams,
&mut components_to_retry,
stream_stopped_tx.clone(),
stream_status_tx.clone(),
now,
).await {
tracing::error!("MicrogridClientActor: Error handling retry timer: {e}");
Expand All @@ -116,7 +116,7 @@ async fn handle_instruction(
client: &mut MicrogridClient<Channel>,
component_streams: &mut HashMap<u64, broadcast::Sender<ComponentData>>,
instruction: Option<Instruction>,
stream_stopped_tx: mpsc::Sender<StreamStatus>,
stream_status_tx: mpsc::Sender<StreamStatus>,
) -> Result<(), Error> {
match instruction {
Some(Instruction::GetComponentDataStream {
Expand All @@ -137,7 +137,7 @@ async fn handle_instruction(
// API service into the channel.
let (tx, rx) = broadcast::channel::<ComponentData>(100);
component_streams.insert(component_id, tx.clone());
start_component_data_stream(client, component_id, tx, stream_stopped_tx).await?;
start_component_data_stream(client, component_id, tx, stream_status_tx).await?;

response_tx.send(rx).map_err(|_| {
tracing::error!("failed to send response");
Expand Down Expand Up @@ -189,7 +189,7 @@ async fn handle_retry_timer(
client: &mut MicrogridClient<Channel>,
component_streams: &mut HashMap<u64, broadcast::Sender<ComponentData>>,
components_to_retry: &mut HashMap<u64, RetryTracker>,
stream_stopped_tx: mpsc::Sender<StreamStatus>,
stream_status_tx: mpsc::Sender<StreamStatus>,
now: tokio::time::Instant,
) -> Result<(), Error> {
for item in components_to_retry.iter_mut() {
Expand All @@ -200,7 +200,7 @@ async fn handle_retry_timer(
item.1.mark_new_retry();
let (component_id, _) = item;
if let Some(tx) = component_streams.get(component_id).cloned() {
start_component_data_stream(client, *component_id, tx, stream_stopped_tx.clone())
start_component_data_stream(client, *component_id, tx, stream_status_tx.clone())
.await?;
} else {
tracing::error!("Component stream not found for retry: {component_id}");
Expand All @@ -213,13 +213,13 @@ async fn handle_retry_timer(
Ok(())
}

/// Creates anew data stream for the given component ID and starts a task to
/// Creates a new data stream for the given component ID and starts a task to
/// fetch data from it in a loop.
async fn start_component_data_stream(
client: &mut MicrogridClient<Channel>,
component_id: u64,
tx: broadcast::Sender<ComponentData>,
stream_stopped_tx: mpsc::Sender<StreamStatus>,
stream_status_tx: mpsc::Sender<StreamStatus>,
) -> Result<(), Error> {
let stream = match client
.receive_component_data_stream(ReceiveComponentDataStreamRequest {
Expand All @@ -230,7 +230,7 @@ async fn start_component_data_stream(
{
Ok(s) => s.into_inner(),
Err(e) => {
stream_stopped_tx
stream_status_tx
.send(StreamStatus::Failed(component_id))
.await
.map_err(|e| {
Expand All @@ -244,7 +244,7 @@ async fn start_component_data_stream(
}
};

stream_stopped_tx
stream_status_tx
.send(StreamStatus::Connected(component_id))
.await
.map_err(|e| {
Expand All @@ -255,7 +255,7 @@ async fn start_component_data_stream(

// create a task to fetch data from the stream in a loop and put into a channel.
tokio::spawn(
run_component_data_stream(stream, component_id, tx, stream_stopped_tx).in_current_span(),
run_component_data_stream(stream, component_id, tx, stream_status_tx).in_current_span(),
);
Ok(())
}
Expand All @@ -264,15 +264,15 @@ async fn run_component_data_stream(
mut stream: tonic::Streaming<ReceiveComponentDataStreamResponse>,
component_id: u64,
tx: broadcast::Sender<ComponentData>,
stream_stopped_tx: mpsc::Sender<StreamStatus>,
stream_status_tx: mpsc::Sender<StreamStatus>,
) {
loop {
if tx.receiver_count() == 0 {
tracing::debug!(
"Dropping ComponentData stream for component_id:{:?}",
component_id
);
stream_stopped_tx
stream_status_tx
.send(StreamStatus::Ended(component_id))
.await
.unwrap_or_else(|e| {
Expand Down Expand Up @@ -315,7 +315,7 @@ async fn run_component_data_stream(
};
}

if let Err(e) = stream_stopped_tx
if let Err(e) = stream_status_tx
.send(StreamStatus::Failed(component_id))
.await
{
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ mod sample;
pub use sample::Sample;

mod logical_meter;
pub use logical_meter::{Formula, LogicalMeterConfig, LogicalMeterHandle, Metric};
pub use logical_meter::{
AggregationFormula, Formula, LogicalMeterConfig, LogicalMeterHandle, metric,
};
5 changes: 2 additions & 3 deletions src/logical_meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ mod formula;
mod logical_meter_actor;
mod logical_meter_handle;
pub use logical_meter_handle::LogicalMeterHandle;
mod metric;
pub use metric::Metric;
pub mod metric;

pub use config::LogicalMeterConfig;
pub use formula::Formula;
pub use formula::{AggregationFormula, Formula};
133 changes: 11 additions & 122 deletions src/logical_meter/formula.rs
Original file line number Diff line number Diff line change
@@ -1,129 +1,18 @@
// License: MIT
// Copyright © 2025 Frequenz Energy-as-a-Service GmbH

//! A composable formula type, that can be subscribed to.
//! Formula module for the logical meter.

use tokio::sync::{broadcast, mpsc, oneshot};
mod aggregation_formula;
mod coalesce_formula;
pub(crate) mod graph_formula_provider;
pub use aggregation_formula::AggregationFormula;
pub use coalesce_formula::CoalesceFormula;

use crate::{Error, Metric, Sample};
use crate::{Error, Sample};
use tokio::sync::broadcast;

Copy link

Copilot AI Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trait method uses impl Future return type which requires importing std::future::Future. This import is missing and will cause compilation errors.

Suggested change
use std::future::Future;

Copilot uses AI. Check for mistakes.
use super::logical_meter_actor;

#[derive(Clone)]
pub struct Formula {
formula: frequenz_microgrid_component_graph::Formula,
metric: Metric,
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
}

impl std::fmt::Display for Formula {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.formula.fmt(f)
}
}

impl Formula {
pub(super) fn new(
formula: frequenz_microgrid_component_graph::Formula,
metric: Metric,
instructions_tx: mpsc::Sender<logical_meter_actor::Instruction>,
) -> Self {
Self {
formula,
metric,
instructions_tx,
}
}

pub async fn subscribe(&self) -> Result<broadcast::Receiver<Sample>, Error> {
let (tx, rx) = oneshot::channel();

self.instructions_tx
.send(logical_meter_actor::Instruction::SubscribeFormula {
formula: self.formula.to_string(),
metric: self.metric,
response_tx: tx,
})
.await
.map_err(|e| Error::connection_failure(format!("Could not send instruction: {e}")))?;
let receiver = rx.await.map_err(|e| {
Error::connection_failure(format!("Could not receive instruction: {e}"))
})?;

Ok(receiver)
}
}

impl std::ops::Add for Formula {
type Output = Result<Self, Error>;

fn add(self, other: Self) -> Self::Output {
if self.metric != other.metric {
return Err(Error::invalid_metric(format!(
"Cannot add formulas with different metrics: {} and {}",
self.metric as isize, other.metric as isize
)));
}
let new_formula = self.formula + other.formula;
Ok(Self::new(new_formula, self.metric, self.instructions_tx))
}
}

impl std::ops::Sub for Formula {
type Output = Result<Self, Error>;

fn sub(self, other: Self) -> Self::Output {
if self.metric != other.metric {
return Err(Error::invalid_metric(format!(
"Cannot subtract formulas with different metrics: {} and {}",
self.metric as isize, other.metric as isize
)));
}
let new_formula = self.formula - other.formula;
Ok(Self::new(new_formula, self.metric, self.instructions_tx))
}
}

impl std::ops::Add<Formula> for Result<Formula, Error> {
type Output = Result<Formula, Error>;

fn add(self, other: Formula) -> Self::Output {
match self {
Ok(left) => left + other,
Err(e) => Err(e),
}
}
}

impl std::ops::Sub<Formula> for Result<Formula, Error> {
type Output = Result<Formula, Error>;

fn sub(self, other: Formula) -> Self::Output {
match self {
Ok(left) => left - other,
Err(e) => Err(e),
}
}
}

impl std::ops::Add<Result<Formula, Error>> for Formula {
type Output = Result<Formula, Error>;

fn add(self, other: Result<Formula, Error>) -> Self::Output {
match other {
Ok(right) => self + right,
Err(e) => Err(e),
}
}
}

impl std::ops::Sub<Result<Formula, Error>> for Formula {
type Output = Result<Formula, Error>;

fn sub(self, other: Result<Formula, Error>) -> Self::Output {
match other {
Ok(right) => self - right,
Err(e) => Err(e),
}
}
/// Defines a formula that can be subscribed to for receiving samples.
pub trait Formula: std::fmt::Display {
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send;
}
Loading