Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
856981e
Update resampler time at the start of resampling
shsms Aug 25, 2025
c4d4abb
Rename timer field to resampler_timer for clarity
shsms Aug 25, 2025
bb82ad5
Refactor resampling logic into a dedicated function
shsms Aug 25, 2025
196e222
Add `kind` method to `Error` for retrieving error kind
shsms Aug 25, 2025
e260df0
Refactor resampler cleanup
shsms Aug 25, 2025
fa24e91
Rename `do_next` to `evaluate_formulas`
shsms Aug 25, 2025
7b666b6
Add quantity module with support for various physical quantities
shsms Aug 25, 2025
98613ed
Add tests for the quantity types
shsms Sep 30, 2025
ab13386
Make `Sample` struct generic over value type
shsms Aug 25, 2025
aa2d311
Add QuantityType to Metric trait as an associated type
shsms Aug 25, 2025
112c200
Update Formula trait to support generic Quantity type
shsms Aug 25, 2025
0b61acb
Update LogicalMeterFormula to support generic Quantity type
shsms Aug 25, 2025
7201570
Update `evaluate_formulas` to support generic transformation
shsms Aug 25, 2025
fd57f5e
Add TypedFormulaResponseSender enum and TryFrom implementation
shsms Aug 25, 2025
9ce335f
Add `Formulas` struct to manage logical meter formulas
shsms Aug 25, 2025
cb25391
Change log level to debug for missing metric data in LogicalMeterActor
shsms Aug 25, 2025
082ee30
Refactor resampler initialization into a dedicated method
shsms Aug 25, 2025
af95e6e
Implement `Quantity` types in logical meter streams
shsms Aug 25, 2025
254f0bf
Remove invalid metric checks from formula operations
shsms Aug 25, 2025
a181981
Remove default generic type for `Formula` and `Sample`
shsms Aug 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ macro_rules! ErrorKind {
}
}
)*

/// Returns the kind of error that occurred.
pub fn kind(&self) -> ErrorKind {
self.kind.clone()
}
}
};
}
Expand All @@ -51,6 +56,7 @@ ErrorKind!(
(ComponentGraphError, component_graph_error),
(ConnectionFailure, connection_failure),
(ChronoError, chrono_error),
(DroppedUnusedFormulas, dropped_unused_formulas),
(FormulaEngineError, formula_engine_error),
(InvalidMetric, invalid_metric),
(Internal, internal)
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub use client::MicrogridClientHandle;
mod error;
pub use error::{Error, ErrorKind};

mod quantity;

mod proto;

mod sample;
Expand Down
7 changes: 4 additions & 3 deletions src/logical_meter/formula.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub(crate) mod graph_formula_provider;
pub use aggregation_formula::AggregationFormula;
pub use coalesce_formula::CoalesceFormula;

use crate::{Error, Sample, proto::common::v1alpha8::metrics::Metric};
use crate::{Error, Sample, proto::common::v1alpha8::metrics::Metric, quantity::Quantity};
use tokio::sync::{broadcast, mpsc};

use super::logical_meter_actor;
Expand Down Expand Up @@ -47,20 +47,21 @@ impl<F: GraphFormulaProvider> FormulaParams<F> {
}

/// A trait that defines generic formula operations.
pub trait Formula: std::fmt::Display + Sized {
pub trait Formula<Q: Quantity = f32>: std::fmt::Display + Sized {
fn coalesce(self, other: Self) -> Result<Self, Error>;
fn min(self, other: Self) -> Result<Self, Error>;
fn max(self, other: Self) -> Result<Self, Error>;
fn subscribe(&self) -> impl Future<Output = Result<broadcast::Receiver<Sample>, Error>> + Send;
}

impl<T> Formula for T
impl<T, Q> Formula<Q> for T
where
T: FormulaSubscriber
+ GraphFormulaProvider
+ From<FormulaParams<T>>
+ Into<FormulaParams<T>>
+ std::fmt::Display,
Q: Quantity,
{
fn coalesce(self, other: Self) -> Result<Self, Error> {
let mut params_self: FormulaParams<T> = self.into();
Expand Down
135 changes: 83 additions & 52 deletions src/logical_meter/logical_meter_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ use std::collections::{HashMap, HashSet};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::time::{MissedTickBehavior, interval};

use crate::ErrorKind;
use crate::proto::common::v1alpha8::metrics::{Metric, metric_value_variant::MetricValueVariant};
use crate::quantity::Quantity;
use crate::{
Error, MicrogridClientHandle, Sample,
proto::common::v1alpha8::microgrid::electrical_components::ElectricalComponentTelemetry,
};

use super::config::LogicalMeterConfig;

struct LogicalMeterFormula {
struct LogicalMeterFormula<Q: Quantity = f32> {
Comment thread
shsms marked this conversation as resolved.
formula: FormulaEngine<f32>,
sender: broadcast::Sender<Sample>,
sender: broadcast::Sender<Sample<Q>>,
}

struct ComponentDataResampler {
Expand All @@ -44,8 +46,8 @@ pub(super) struct LogicalMeterActor {
instructions_rx: mpsc::Receiver<Instruction>,
client: MicrogridClientHandle,
config: LogicalMeterConfig,
next_ts: DateTime<Utc>,
timer: tokio::time::Interval,
resampler_ts: DateTime<Utc>,
resampler_timer: tokio::time::Interval,
}

/// Returns the next timestamp aligned to the epoch based on the given interval.
Expand All @@ -58,8 +60,7 @@ pub(crate) fn epoch_align(timestamp: DateTime<Utc>, interval: TimeDelta) -> Opti

let aligned_timestamp = DateTime::from_timestamp_millis(aligned_millis_since_epoch)?;

let next_aligned_ts = aligned_timestamp + interval;
Some(next_aligned_ts)
Some(aligned_timestamp)
Comment thread
shsms marked this conversation as resolved.
}

impl LogicalMeterActor {
Expand All @@ -69,7 +70,7 @@ impl LogicalMeterActor {
config: LogicalMeterConfig,
) -> Result<Self, Error> {
let now = Utc::now();
let next_ts = epoch_align(now, config.resampling_interval).ok_or_else(|| {
let last_aligned_ts = epoch_align(now, config.resampling_interval).ok_or_else(|| {
Error::chrono_error("Failed to align current time to the epoch".to_string())
})?;
let mut timer =
Expand All @@ -80,7 +81,7 @@ impl LogicalMeterActor {

// The next tick should be at the next aligned timestamp.
timer.reset_after(
(next_ts - now)
(last_aligned_ts + config.resampling_interval - now)
.to_std()
.map_err(|e| Error::chrono_error(format!("Failed to calculate time delta: {e}")))?,
);
Expand All @@ -89,8 +90,8 @@ impl LogicalMeterActor {
instructions_rx,
client,
config,
next_ts,
timer,
resampler_ts: last_aligned_ts,
resampler_timer: timer,
})
}

Expand All @@ -100,9 +101,23 @@ impl LogicalMeterActor {

loop {
tokio::select! {
_ = self.timer.tick() => {
if let Err(err) = self.do_next(&mut resamplers, &mut formulas) {
tracing::error!("Error resampling: {}", err);
_ = self.resampler_timer.tick() => {
self.resampler_ts += self.config.resampling_interval;

let mut resampled = match self.resample_metrics(&mut resamplers) {
Ok(resampled) => resampled,
Err(err) => {
tracing::error!("Error resampling metrics: {}", err);
continue;
}
};

if let Err(err) = self.evaluate_formulas(&mut resampled, &mut formulas) {
if err.kind() == ErrorKind::DroppedUnusedFormulas {
self.cleanup_resamplers(&formulas, &mut resamplers);
} else {
tracing::error!("Error evaluating formulas: {}", err);
}
};
}
instruction = self.instructions_rx.recv() => {
Expand Down Expand Up @@ -203,30 +218,11 @@ impl LogicalMeterActor {
}

/// Resamples component data and evaluates formulas for the next timestamp.
fn do_next(
fn evaluate_formulas(
&mut self,
resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>,
resampled_metrics: &mut HashMap<Metric, HashMap<u64, Option<f32>>>,
formulas: &mut HashMap<(String, Metric), LogicalMeterFormula>,
) -> Result<(), Error> {
let mut resampled_metrics: HashMap<Metric, HashMap<u64, Option<f32>>> = HashMap::new();

for (_, resampler) in resamplers.iter_mut() {
while let Ok(data) = resampler.receiver.try_recv() {
self.push_to_resampler(resampler, data, resampler.metric);
}
let resampled = resampler.resampler.resample(self.next_ts);
if resampled.len() != 1 {
return Err(Error::connection_failure(format!(
"Resampling produced {} values",
resampled.len()
)));
}
resampled_metrics
.entry(resampler.metric)
.or_default()
.insert(resampler.component_id, resampled[0].clone().value());
}

let mut formulas_to_drop = vec![];
for (formula_key, formula) in formulas.iter_mut() {
let result = formula
Expand All @@ -236,7 +232,7 @@ impl LogicalMeterActor {
Error::formula_engine_error(format!("Failed to evaluate formula: {e}"))
})?;

if let Err(e) = formula.sender.send(Sample::new(self.next_ts, result)) {
if let Err(e) = formula.sender.send(Sample::new(self.resampler_ts, result)) {
tracing::debug!(
"No remaining subscribers for formula: {}:({}). Err: {e}",
formula_key.1.as_str_name(),
Expand All @@ -257,28 +253,63 @@ impl LogicalMeterActor {
}
}
if !formulas_to_drop.is_empty() {
let mut components = HashSet::<(u64, Metric)>::new();
for ((_, metric), formula) in formulas.iter() {
components.extend(formula.formula.components().iter().map(|&id| (id, *metric)));
}
resamplers.retain(|component_id, _| {
if components.contains(component_id) {
true
} else {
tracing::debug!(
"Dropping resampler for component {}:{}",
component_id.0,
component_id.1.as_str_name()
);
false
}
});
return Err(Error::dropped_unused_formulas("Dropped unused formulas"));
}

self.next_ts += self.config.resampling_interval;
Ok(())
}

/// Resamples component telemetry
fn resample_metrics(
&mut self,
resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>,
) -> Result<HashMap<Metric, HashMap<u64, Option<f32>>>, Error> {
let mut resampled_metrics: HashMap<Metric, HashMap<u64, Option<f32>>> = HashMap::new();

for (_, resampler) in resamplers.iter_mut() {
while let Ok(data) = resampler.receiver.try_recv() {
self.push_to_resampler(resampler, data, resampler.metric);
}
let resampled = resampler.resampler.resample(self.resampler_ts);
if resampled.len() != 1 {
return Err(Error::connection_failure(format!(
"Resampling produced {} values",
resampled.len()
)));
}
resampled_metrics
.entry(resampler.metric)
.or_default()
.insert(resampler.component_id, resampled[0].clone().value());
}

Ok(resampled_metrics)
}

/// Cleans up resamplers that are no longer needed by any formula.
fn cleanup_resamplers(
&mut self,
formulas: &HashMap<(String, Metric), LogicalMeterFormula>,
resamplers: &mut HashMap<(u64, Metric), ComponentDataResampler>,
) {
let mut components = HashSet::<(u64, Metric)>::new();
for ((_, metric), formula) in formulas.iter() {
components.extend(formula.formula.components().iter().map(|&id| (id, *metric)));
}
resamplers.retain(|component_id, _| {
if components.contains(component_id) {
true
} else {
tracing::debug!(
"Dropping resampler for component {}:{}",
component_id.0,
component_id.1.as_str_name()
);
false
}
});
}

/// Extracts the given metric from the given ComponentData and pushes it to
/// the resampler's internal buffer.
fn push_to_resampler(
Expand Down
41 changes: 24 additions & 17 deletions src/logical_meter/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ use super::formula;
pub trait Metric: std::fmt::Display + std::fmt::Debug + Clone + Copy + PartialEq + Eq {
type FormulaType: formula::Formula + formula::graph_formula_provider::GraphFormulaProvider;

type QuantityType: crate::quantity::Quantity;

const METRIC: MetricPb;
}

macro_rules! define_metric {
($({name: $metric_name:ident, formula: $formula:ident}),+ $(,)?) => {
($({
name: $metric_name:ident,
formula: $formula:ident,
quantity: $quantity:ident
}),+ $(,)?) => {
$(
// Define a metric
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand All @@ -23,6 +29,7 @@ macro_rules! define_metric {
// Implement the AcMetric trait for the metric
impl Metric for $metric_name {
type FormulaType = formula::$formula;
type QuantityType = crate::quantity::$quantity;

const METRIC: MetricPb = MetricPb::$metric_name;
}
Expand All @@ -38,20 +45,20 @@ macro_rules! define_metric {
}

define_metric! {
{name: AcPowerActive, formula: AggregationFormula},
{name: AcPowerReactive, formula: AggregationFormula},
{name: AcCurrent, formula: AggregationFormula},
{name: AcCurrentPhase1, formula: AggregationFormula},
{name: AcCurrentPhase2, formula: AggregationFormula},
{name: AcCurrentPhase3, formula: AggregationFormula},

{name: AcVoltage, formula: CoalesceFormula},
{name: AcVoltagePhase1N, formula: CoalesceFormula},
{name: AcVoltagePhase2N, formula: CoalesceFormula},
{name: AcVoltagePhase3N, formula: CoalesceFormula},
{name: AcVoltagePhase1Phase2, formula: CoalesceFormula},
{name: AcVoltagePhase2Phase3, formula: CoalesceFormula},
{name: AcVoltagePhase3Phase1, formula: CoalesceFormula},

{name: AcFrequency, formula: CoalesceFormula},
{ name: AcPowerActive, formula: AggregationFormula, quantity: Power },
{ name: AcPowerReactive, formula: AggregationFormula, quantity: ReactivePower },
{ name: AcCurrent, formula: AggregationFormula, quantity: Current },
{ name: AcCurrentPhase1, formula: AggregationFormula, quantity: Current },
{ name: AcCurrentPhase2, formula: AggregationFormula, quantity: Current },
{ name: AcCurrentPhase3, formula: AggregationFormula, quantity: Current },

{ name: AcVoltage, formula: CoalesceFormula, quantity: Voltage },
{ name: AcVoltagePhase1N, formula: CoalesceFormula, quantity: Voltage },
{ name: AcVoltagePhase2N, formula: CoalesceFormula, quantity: Voltage },
{ name: AcVoltagePhase3N, formula: CoalesceFormula, quantity: Voltage },
{ name: AcVoltagePhase1Phase2, formula: CoalesceFormula, quantity: Voltage },
{ name: AcVoltagePhase2Phase3, formula: CoalesceFormula, quantity: Voltage },
{ name: AcVoltagePhase3Phase1, formula: CoalesceFormula, quantity: Voltage },

{ name: AcFrequency, formula: CoalesceFormula, quantity: Frequency },
}
Loading