Skip to content

Commit 65c5e4a

Browse files
authored
Customizable resampling functions (#29)
This PR adds configurable resampling behavior to the logical meter, allowing callers to change the default resampling function and override it per metric or per (component_id, metric) pair. **Changes:** - Add `LogicalMeterConfig` builder-style API for default resampling function and per-metric/per-component overrides. - Update `LogicalMeterActor` to select the resampling function using override precedence (component-specific → metric-only → default → average). - Update examples/tests and bump `frequenz-resampling` dependency to `0.2`.
2 parents 31fdaa3 + 2367e40 commit 65c5e4a

6 files changed

Lines changed: 137 additions & 26 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ async-trait = "0.1.89"
1515
chrono = "0.4"
1616
frequenz-microgrid-component-graph = "0.2.0"
1717
frequenz-microgrid-formula-engine = "0.1.0"
18-
frequenz-resampling = "0.1.0"
18+
frequenz-resampling = "0.2"
1919
futures = "0.3.31"
2020
prost = "0.14"
2121
tokio = { version = "1.48", features = ["rt", "rt-multi-thread"] }

RELEASE_NOTES.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@
22

33
## Summary
44

5-
This release makes incremental improvements to the client and the quantity types.
5+
<!-- Here goes a general summary of what this release is about -->
66

77
## Upgrading
88

9-
- The `MicrogridClientHandle::list_electrical_components` method now expects `ElectricalComponentCategory` enum values instead of `i32`, to filter by component category.
9+
- `LogicalMeterConfig` instances can't be created directly anymore, and need to be created using the `LogicalMeterConfig::new` method. This helps avoid future breaking changes, as we add more config parameters.
1010

1111
## New Features
1212

13-
- The new `MicrogridClientHandle::augment_electrical_component_bounds` method can be used to augment the bounds for specific metrics of electrical components.
13+
- It is now possible to change the default resampling function, and to override the resampling function for specific metrics.
1414

15-
- All methods on `Quantity` types are now `const`.
15+
## Bug Fixes
1616

17-
- `Quantity` types have two new methods `min` and `max`, similar to the `min` and `max` methods on fundamental numerical types.
17+
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->

examples/logical_meter.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ async fn main() -> Result<(), Error> {
2121
let client = MicrogridClientHandle::try_new("http://[::1]:8800").await?;
2222
let mut logical_meter = LogicalMeterHandle::try_new(
2323
client,
24-
LogicalMeterConfig {
25-
resampling_interval: TimeDelta::try_seconds(1).unwrap(),
26-
},
24+
LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()),
2725
)
2826
.await?;
2927

src/logical_meter/config.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,56 @@
33

44
//! This module defines the configuration for the logical meter.
55
6+
use crate::Sample;
7+
use crate::proto::common::metrics::Metric;
68
use chrono::TimeDelta;
9+
use frequenz_resampling::ResamplingFunction;
10+
use std::collections::HashMap;
711

812
pub struct LogicalMeterConfig {
913
/// The resampling interval for the logical meter.
10-
pub resampling_interval: TimeDelta,
14+
pub(crate) resampling_interval: TimeDelta,
15+
/// Resampler function.
16+
pub(crate) resampling_function: Option<ResamplingFunction<f32, Sample<f32>>>,
17+
/// Resampler overrides.
18+
pub(crate) resampling_overrides: HashMap<Metric, ResamplingFunction<f32, Sample<f32>>>,
19+
}
20+
21+
impl LogicalMeterConfig {
22+
/// Creates a new `LogicalMeterConfig` with the given resampling interval.
23+
pub fn new(resampling_interval: TimeDelta) -> Self {
24+
Self {
25+
resampling_interval,
26+
resampling_function: None,
27+
resampling_overrides: HashMap::new(),
28+
}
29+
}
30+
31+
/// Sets the default resampling function.
32+
///
33+
/// This function will be used for all metrics that do not have a specific
34+
/// override set.
35+
///
36+
/// If no default resampling function is set, the logical meter will default
37+
/// to using the `Average` resampling function.
38+
pub fn with_default_resampling_function(
39+
mut self,
40+
function: ResamplingFunction<f32, Sample<f32>>,
41+
) -> Self {
42+
self.resampling_function = Some(function);
43+
self
44+
}
45+
46+
/// Sets a resampling function override for a specific metric.
47+
///
48+
/// If this function is called multiple times for the same metric, the last
49+
/// function provided will be used.
50+
pub fn override_resampling_function<M: crate::metric::Metric>(
51+
mut self,
52+
function: ResamplingFunction<f32, Sample<f32>>,
53+
) -> Self {
54+
self.resampling_overrides.insert(M::METRIC, function);
55+
56+
self
57+
}
1158
}

src/logical_meter/logical_meter_actor.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,16 @@ impl LogicalMeterActor {
373373
metric,
374374
resampler: frequenz_resampling::Resampler::new(
375375
self.config.resampling_interval,
376-
ResamplingFunction::Average,
376+
self.config
377+
// Look for a specific metric override first
378+
.resampling_overrides
379+
.get(&metric)
380+
.cloned()
381+
// Then look for a configured default
382+
.or_else(|| self.config.resampling_function.clone())
383+
// Finally, default to average if no default is
384+
// configured
385+
.unwrap_or(ResamplingFunction::Average),
377386
3,
378387
self.resampler_ts,
379388
false,

src/logical_meter/logical_meter_handle.rs

Lines changed: 72 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ impl LogicalMeterHandle {
186186
#[cfg(test)]
187187
mod tests {
188188
use chrono::TimeDelta;
189+
use frequenz_resampling::ResamplingFunction;
189190
use tokio_stream::{StreamExt, wrappers::BroadcastStream};
190191

191192
use crate::{
@@ -198,7 +199,7 @@ mod tests {
198199
quantity::Quantity,
199200
};
200201

201-
async fn new_logical_meter_handle() -> LogicalMeterHandle {
202+
async fn new_logical_meter_handle(config: Option<LogicalMeterConfig>) -> LogicalMeterHandle {
202203
let api_client = MockMicrogridApiClient::new(
203204
// Grid connection point
204205
MockComponent::grid(1).with_children(vec![
@@ -251,17 +252,15 @@ mod tests {
251252

252253
LogicalMeterHandle::try_new(
253254
MicrogridClientHandle::new_from_client(api_client),
254-
LogicalMeterConfig {
255-
resampling_interval: TimeDelta::try_seconds(1).unwrap(),
256-
},
255+
config.unwrap_or_else(|| LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap())),
257256
)
258257
.await
259258
.unwrap()
260259
}
261260

262261
#[tokio::test]
263262
async fn test_formula_display() {
264-
let mut lm = new_logical_meter_handle().await;
263+
let mut lm = new_logical_meter_handle(None).await;
265264

266265
let formula = lm.grid(crate::metric::AcPowerActive).unwrap();
267266
assert_eq!(formula.to_string(), "METRIC_AC_POWER_ACTIVE::(#2)");
@@ -342,7 +341,7 @@ mod tests {
342341

343342
#[tokio::test(start_paused = true)]
344343
async fn test_grid_power_formula() {
345-
let formula = new_logical_meter_handle()
344+
let formula = new_logical_meter_handle(None)
346345
.await
347346
.grid(crate::metric::AcPowerActive)
348347
.unwrap();
@@ -352,6 +351,7 @@ mod tests {
352351
check_samples(
353352
samples,
354353
|q| q.as_watts(),
354+
TimeDelta::try_seconds(1).unwrap(),
355355
vec![
356356
Some(5.8),
357357
Some(6.0),
@@ -369,7 +369,7 @@ mod tests {
369369

370370
#[tokio::test(start_paused = true)]
371371
async fn test_pv_reactive_power_formula() {
372-
let formula = new_logical_meter_handle()
372+
let formula = new_logical_meter_handle(None)
373373
.await
374374
.pv(None, crate::metric::AcPowerReactive)
375375
.unwrap();
@@ -379,6 +379,7 @@ mod tests {
379379
check_samples(
380380
samples,
381381
|q| q.as_volt_amperes_reactive(),
382+
TimeDelta::try_seconds(1).unwrap(),
382383
vec![
383384
Some(-1.4),
384385
Some(-0.5),
@@ -396,7 +397,7 @@ mod tests {
396397

397398
#[tokio::test(start_paused = true)]
398399
async fn test_battery_voltage_formula() {
399-
let formula = new_logical_meter_handle()
400+
let formula = new_logical_meter_handle(None)
400401
.await
401402
.battery(None, crate::metric::AcVoltage)
402403
.unwrap();
@@ -405,6 +406,7 @@ mod tests {
405406
check_samples(
406407
samples,
407408
|q| q.as_volts(),
409+
TimeDelta::try_seconds(1).unwrap(),
408410
vec![
409411
Some(398.0),
410412
Some(397.67),
@@ -420,9 +422,60 @@ mod tests {
420422
)
421423
}
422424

425+
#[tokio::test(start_paused = true)]
426+
async fn test_resampling_functions() {
427+
let lm_config = Some(
428+
LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
429+
.with_default_resampling_function(ResamplingFunction::Count)
430+
.override_resampling_function::<crate::metric::AcVoltage>(ResamplingFunction::Last),
431+
);
432+
let mut lm = new_logical_meter_handle(lm_config).await;
433+
let bat_volt_formula = lm.battery(None, crate::metric::AcVoltage).unwrap();
434+
435+
let samples = fetch_samples(bat_volt_formula, 10).await;
436+
check_samples(
437+
samples,
438+
|q| q.as_volts(),
439+
TimeDelta::try_milliseconds(200).unwrap(),
440+
vec![
441+
Some(400.0),
442+
Some(400.0),
443+
Some(398.0),
444+
Some(396.0),
445+
Some(396.0),
446+
Some(396.0),
447+
Some(396.0),
448+
Some(396.0),
449+
None,
450+
None,
451+
],
452+
);
453+
454+
let cons_pow_formula = lm.consumer(crate::metric::AcPowerActive).unwrap();
455+
456+
let samples = fetch_samples(cons_pow_formula, 10).await;
457+
check_samples(
458+
samples,
459+
|q| q.as_watts(),
460+
TimeDelta::try_milliseconds(200).unwrap(),
461+
vec![
462+
Some(1.0),
463+
Some(2.0),
464+
Some(3.0),
465+
Some(3.0),
466+
Some(3.0),
467+
Some(3.0),
468+
Some(2.0),
469+
Some(1.0),
470+
Some(0.0),
471+
Some(0.0),
472+
],
473+
);
474+
}
475+
423476
#[tokio::test(start_paused = true)]
424477
async fn test_consumer_current_formula() {
425-
let formula = new_logical_meter_handle()
478+
let formula = new_logical_meter_handle(None)
426479
.await
427480
.consumer(crate::metric::AcCurrent)
428481
.unwrap();
@@ -431,6 +484,7 @@ mod tests {
431484
check_samples(
432485
samples,
433486
|q| q.as_amperes(),
487+
TimeDelta::try_seconds(1).unwrap(),
434488
vec![
435489
Some(15.0),
436490
Some(14.75),
@@ -460,27 +514,30 @@ mod tests {
460514
fn check_samples<Q: Quantity>(
461515
samples: Vec<Sample<Q>>,
462516
extractor: impl Fn(Q) -> f32,
517+
expected_interval: TimeDelta,
463518
expected_values: Vec<Option<f32>>,
464519
) {
465520
let values = samples
466521
.iter()
467522
.map(|res| res.value().map(|v| extractor(v)))
468523
.collect::<Vec<_>>();
469524

470-
let one_second = TimeDelta::try_seconds(1).unwrap();
471-
472525
samples.as_slice().windows(2).for_each(|w| {
473-
assert_eq!(w[1].timestamp() - w[0].timestamp(), one_second);
526+
assert_eq!(
527+
w[1].timestamp() - w[0].timestamp(),
528+
expected_interval,
529+
"Samples are not spaced at the expected interval"
530+
);
474531
});
475532

476-
for (v, ev) in values.iter().zip(expected_values.iter()) {
533+
for (id, (v, ev)) in values.iter().zip(expected_values.iter()).enumerate() {
477534
match (v, ev) {
478535
(Some(v), Some(ev)) => assert!(
479536
(v - ev).abs() < 0.01,
480-
"expected value {ev:?}, got value {v:?}"
537+
"Item {id} - expected value {ev:?}, got value {v:?}"
481538
),
482539
(None, None) => {}
483-
_ => panic!("expected value {ev:?}, got value {v:?}"),
540+
_ => panic!("Item {id} - expected value {ev:?}, got value {v:?}"),
484541
}
485542
}
486543
}

0 commit comments

Comments
 (0)